function Notifications_Queue::process_queue in Notifications 6.4
Process subscriptions queue
The subscriptions queue has the following fields sqid, uid, eid, sid, digest
This function should be able of splitting the whole processing in several steps. It will be called multiple times for each send interval
Messages will be processed for each send interval, send_method, user
Parameters
$send_interval: Send interval to process
$max_sqid: Max queue id to process
Return value
Number of rows processed
1 call to Notifications_Queue::process_queue()
- Notifications_Queue::process_run in includes/
notifications_queue.class.inc - Function to be called on cron by the main notifications_cron
File
- includes/
notifications_queue.class.inc, line 404
Class
- Notifications_Queue
- Queue management and processing
Code
function process_queue($send_interval, $max_sqid = NULL) {
$max_sqid = isset($max_sqid) ? $max_sqid : $this
->process_prepare();
$language = $this->process_language;
notifications_log('Starting queue processing', array(
'send interval' => $send_interval,
'max sqid' => $max_sqid,
));
// Option for test running, marking messages as test, nor updating not sending
$test = $this
->process_option('test');
// Option for normal running but without updating the queue records
$keep = $this
->process_option('keep');
// Count processed rows
$count = 0;
// For scheduled notifications, send just rows after this time
$send_time = time();
// This is the time from which stored rows will be sent
$timelimit = $send_time - $send_interval;
// Check remaining rows to process to adjust query limits for both users and rows
$step_users = NOTIFICATIONS_STEP_USERS;
$step_rows = NOTIFICATIONS_STEP_ROWS;
if ($row_limit = $this
->process_limit('row')) {
$remaining_rows = $row_limit - $this
->process_control('current', 'row');
if ($remaining_rows > 0) {
$step_users = min($remaining_rows, $step_users);
$step_rows = min($remaining_rows, $step_rows);
}
}
// Common batch parts for processing rows
$default_batch = array(
'cron' => 1,
'max_sqid' => $max_sqid,
'send_interval' => $send_interval,
'send_time_after' => $send_time,
);
// Get users to process messages for, with this time interval and ordered by squid
// Order by last sent for this send interval
// Note: If we get the users with more messages pending first this may save some time
$sql_select = "SELECT q.mdid, q.send_method, q.module, COUNT(q.sqid) AS count_rows FROM {notifications_queue} q ";
$sql_select .= " LEFT JOIN {notifications_sent} su ON q.mdid = su.mdid AND q.send_interval = su.send_interval ";
$sql_select .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.send_time < %d AND q.sqid <= %d";
$sql_select .= " AND (su.mdid IS NULL OR su.sent < %d) ";
// Note: the group by su.sent seems to be needed by pgsql
$sql_group = " GROUP BY q.mdid, q.send_method, q.module, su.sent ORDER BY su.sent";
// If processing by language some things change
if ($language) {
$sql_select .= " AND q.language = '%s' ";
$default_batch['language'] = $language->language;
$result = db_query_range($sql_select . $sql_group, $send_interval, $send_time, $max_sqid, $timelimit, $language->language, 0, $step_users);
}
else {
$result = db_query_range($sql_select . $sql_group, $send_interval, $send_time, $max_sqid, $timelimit, 0, $step_users);
}
// We create a bach for each mdid (user, destination, method) and handle it to notifications_process_rows()
while (($queue = db_fetch_object($result)) && $this
->process_control('check')) {
$module = $queue->module;
$processed = array();
// Process all rows for this user. With some hard limit to prevent process lock ups.
// In case we have too many rows, we go updating step by step
if ($queue->count_rows > $step_rows) {
$limit = $step_rows;
// Still if we want to keep data, we don't update as we go
$update = !$keep;
}
else {
$limit = $queue->count_rows;
$update = FALSE;
}
// Prepare batch query for actual row processing
$batch = $default_batch + array(
'mdid' => $queue->mdid,
'send_method' => $queue->send_method,
'module' => $queue->module,
);
notifications_log('Queue processing', $batch);
// These rows may be processed by a different module. Defaults to notifications_process_rows()
$processed = $this
->process_callback($queue->module, 'process_rows', $batch, $limit, $update);
$count += $processed;
if ($processed && !$test && !$update && !$keep) {
$this
->queue_done($batch);
}
}
// If not doing a test run, update event counter and return count
// If doing a test run, return 0 so we don't go through this again
if (!$test && !$keep) {
Notifications_Event::track_update();
return $count;
}
else {
return 0;
}
}