function notifications_process_queue in Notifications 6
Same name and namespace in other branches
- 5 notifications.cron.inc \notifications_process_queue()
- 6.2 notifications.cron.inc \notifications_process_queue()
- 6.3 notifications.cron.inc \notifications_process_queue()
Process subscriptions queue
The subscriptions queue has the following fields sqid, uid, eid, sid, digest
This function should be able of splitting the whole processing in several steps. It will be called multiple times for each send interval
Messages will be processed for each send interval, send_method, user
@ TODO Review time conditions @ TODO Per module queue processing
Parameters
$send_interval: Send interval to process
$max_sqid: Max queue id to process
Return value
Number of rows processed
2 calls to notifications_process_queue()
- NotificationsContentTests::testNotificationsContent in tests/
notifications_content.test - Play with creating, retrieving, deleting a pair subscriptions
- notifications_process_run in ./
notifications.cron.inc - Function to be called on cron by the main notifications_cron
File
- ./
notifications.cron.inc, line 235
Code
function notifications_process_queue($send_interval, $max_sqid) {
notifications_log('Starting queue processing', array(
'send interval' => $send_interval,
'max squid' => $max_sqid,
));
$test = notifications_process('option', 'test');
$count = 0;
// This is the time from which stored rows will be sent
$timelimit = time() - $send_interval;
// Get users to process messages for, with this time interval and ordered by squid
// Order by last sent for this send interval
// Note: If we get the users with more messages pending first this may save some time
$sql = "SELECT q.uid, q.module, q.send_method, count(*) AS count FROM {notifications_queue} q ";
$sql .= " LEFT JOIN {notifications_sent} su ON q.uid = su.uid AND q.send_interval = su.send_interval AND q.send_method = su.send_method ";
$sql .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.sqid <= %d";
$sql .= " AND (su.uid IS NULL OR su.sent < %d) ";
// Note: the group by su.sent seems to be needed by pgsql
$sql .= " GROUP BY q.uid, q.module, q.send_method, su.sent ORDER BY su.sent";
$result = db_query_range($sql, $send_interval, $max_sqid, $timelimit, 0, NOTIFICATIONS_STEP_USERS);
$sqid = 0;
// @ TODO Add time conditions
while (($user = db_fetch_object($result)) && notifications_process('check')) {
notifications_log('Queue processing', array(
'user' => $user->uid,
'rows' => $user->count,
'send method' => $user->send_method,
));
$module = $user->module;
$events = $subscriptions = $processed = array();
$send_method = $user->send_method;
// Users may be handled by a different module
$account = notifications_callback($module, 'load_user', $user->uid);
// Process all rows for this user. With some hard limit to prevent process lock ups.
$result_subs = db_query_range("SELECT * FROM {notifications_queue} WHERE cron = 1 AND send_interval = '%d' AND uid = %d AND sqid <= %d ORDER BY send_method, sqid", $send_interval, $account->uid, $max_sqid, 0, NOTIFICATIONS_STEP_ROWS);
while (($queue = db_fetch_object($result_subs)) && notifications_process('count', 'row')) {
$count++;
$processed[] = $sqid = $queue->sqid;
// Load event, check it exists and check the user has access to the event objects
if ($event = notifications_load_event($queue->eid, TRUE)) {
notifications_event_tracker('count', $event);
notifications_log('Processing queued', array(
'queue sid' => $queue->sid,
'event' => $queue->eid,
'type' => $event->type,
'action' => $event->action,
'send method' => $send_method,
));
if (notifications_callback($module, 'user_allowed', 'event', $account, $event)) {
// This will take care of duplicated events
$events[$queue->eid] = $event;
// We keep track also of subscriptions originating this event
$subscriptions[$queue->eid][] = $queue->sid;
}
else {
notifications_log('Access denied for event', array(
'account' => $user->uid,
'event' => $queue->eid,
'type' => $event->type,
'action' => $event->action,
));
}
}
else {
notifiations_log('Cannot load event', array(
'eid' => $queue->eid,
'queue sid' => $queue->sid,
));
}
}
if ($events) {
notifications_process_send($account, $events, $subscriptions, $send_method, $send_interval);
if (!$test) {
notifications_update_sent($user->uid, $send_method, $send_interval, time());
}
}
if ($processed && !$test) {
notifications_queue_done(array(
'uid' => $user->uid,
'send_interval' => $send_interval,
'send_method' => $send_method,
'max_sqid' => $sqid,
));
}
}
// Update event counter
if (!$test) {
notifications_event_tracker('update');
}
// If doing a test run, return 0 so we don't go through this again
return $test ? 0 : $count;
}