function notifications_process_rows in Notifications 6.2
Same name and namespace in other branches
- 5 notifications.cron.inc \notifications_process_rows()
- 6 notifications.cron.inc \notifications_process_rows()
- 6.3 notifications.cron.inc \notifications_process_rows()
Process rows given query conditions
This is used by the immediate sending feature
Parameters
$conditions: Array of query conditions
$limit: Optional, limit the number of rows to process
$update: Optional, update queue rows and event counter after processing
See also
2 calls to notifications_process_rows()
- NotificationsLiteTests::sendLiteMessages in tests/
notifications_lite.test - notifications_exit in ./
notifications.module - Implementation of hook_exit()
1 string reference to 'notifications_process_rows'
- notifications_queue_operations in ./
notifications.admin.inc - List of queue operations
File
- ./
notifications.cron.inc, line 192
Code
function notifications_process_rows($conditions, $limit = 0, $update = TRUE) {
notifications_log('Processing queue rows', $conditions);
$account = $destination = NULL;
$subscriptions = $events = $processed = array();
$send_method = $send_interval = $module = NULL;
$test = notifications_process('option', 'test');
$count = 0;
// Build query and fetch rows from queue
$query = notifications_queue_query($conditions);
$sql = "SELECT * FROM {notifications_queue} ";
$sql .= " WHERE " . implode(' AND ', $query['where']);
$sql .= " ORDER BY module, uid, destination, send_method, send_interval";
if ($limit) {
$result = db_query_range($sql, $query['args'], 0, $limit);
}
else {
$result = db_query($sql, $query['args']);
}
// Group rows by user, send_method, send_interval before composing and sending
// This loop has to run a final time after all rows have been fetched
while (($queue = db_fetch_object($result)) || $processed) {
notifications_process('count', 'row');
if (!$account || !$queue || $queue->module != $module || $queue->uid != $account->uid || $queue->destination != $destination || $queue->send_method != $send_method || $queue->send_interval != $send_interval) {
// New user or sending method or destination, send if not the first row and reset
if ($account && $events && $subscriptions) {
$messages = notifications_callback($module, 'process_compose', $account, $events, $subscriptions, $send_method, $send_interval);
notifications_log('Composed messages', array(
'number' => count($messages),
'send_method' => $send_method,
));
// Note that we pass the testing parameter to notifications_process_send
notifications_callback($module, 'process_send', $account, $messages, $send_method, $test);
if (!$test) {
notifications_update_sent($account, $send_method, $send_interval, time());
}
}
if ($processed && $update) {
notifications_queue_done(array(
'sqid' => $processed,
));
}
$subscriptions = $events = $processed = array();
// Keep track of parameters that will trigger a sending when changing
if ($queue) {
$send_method = $queue->send_method;
$send_interval = $queue->send_interval;
$destination = $queue->destination;
$module = $queue->module;
// Users may be handled by a different module implementing the _load_user callback.
// I.e. for anonymous users it may load the name from somewhere
$account = notifications_callback($module, 'load_user', $queue->uid, $destination, $send_method);
}
}
// For every row in queue, compile everyting that will be available for sending
if ($queue) {
$count++;
$processed[] = $queue->sqid;
// Load event, check it exists and check the user has access to the event objects
if ($event = notifications_load_event($queue->eid)) {
notifications_event_tracker('count', $event);
notifications_log('Processing queued', array(
'queue sqid' => $queue->sqid,
'event' => $queue->eid,
'type' => $event->type,
'action' => $event->action,
'send method' => $send_method,
));
if (notifications_user_allowed('event', $account, $event)) {
// This will take care of duplicated events
$events[$queue->eid] = $event;
// We keep track also of subscriptions originating this event
$subscriptions[$queue->eid][] = $queue->sid;
}
else {
notifications_log('Access denied for event', array(
'account' => $user->uid,
'event' => $queue->eid,
));
}
}
else {
notifications_log('Cannot load event', array(
'eid' => $queue->eid,
'queue sid' => $queue->sid,
));
}
}
}
if ($update) {
notifications_event_tracker('update');
}
// Return number of rows processed
return $count;
}