You are here

function notifications_process_queue in Notifications 5

Same name and namespace in other branches
  1. 6 notifications.cron.inc \notifications_process_queue()
  2. 6.2 notifications.cron.inc \notifications_process_queue()
  3. 6.3 notifications.cron.inc \notifications_process_queue()

Process subscriptions queue

The subscriptions queue has the following fields sqid, uid, eid, sid, digest

This function should be able of splitting the whole processing in several steps. It will be called multiple times for each send interval

Messages will be processed for each send interval, send_method, user

@ TODO Review time conditions

Parameters

$send_interval: Send interval to process

$max_sqid: Max queue id to process

$module: Module's rows to process. There may be other types of rows sitting on the table

Return value

Number of rows processed

2 calls to notifications_process_queue()
Notifications_Content_Tests::testNotificationsContent in tests/notifications_content.test
Play with creating, retrieving, deleting a pair subscriptions
notifications_process_run in ./notifications.cron.inc
Function to be called on cron by the main notifications_cron

File

./notifications.cron.inc, line 265

Code

function notifications_process_queue($send_interval, $max_sqid, $module = 'notifications') {
  $count = 0;

  // This is the time from which stored rows will be sent
  $timelimit = time() - $send_interval;

  // Get users to process messages for, with this time interval and ordered by squid
  // Order by last sent for this send interval
  // Note: If we get the users with more messages pending first this may save some time
  $sql = "SELECT q.uid, q.send_method, count(*) AS count FROM {notifications_queue} q ";
  $sql .= " LEFT JOIN {notifications_sent} su ON q.uid = su.uid AND q.send_interval = su.send_interval AND q.send_method = su.send_method ";
  $sql .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.sqid <= %d AND q.module = '%s'";
  $sql .= " AND (su.uid IS NULL OR su.sent < %d) ";

  // Note: the group by su.sent seems to be needed by pgsql
  $sql .= " GROUP BY q.uid, q.send_method, su.sent ORDER BY su.sent";
  $result = db_query_range($sql, $send_interval, $max_sqid, $module, $timelimit, 0, NOTIFICATIONS_STEP_USERS);
  $sqid = 0;

  // @ TODO Add time conditions
  while (($user = db_fetch_object($result)) && notifications_process('check')) {
    notifications_log("Processing user {$user->uid}, rows {$user->count}, send_method {$user->send_method}");
    $events = $subscriptions = $processed = array();
    $send_method = $user->send_method;

    // Users may be handled by a different module
    $account = notifications_callback($module, 'load_user', $user->uid);

    // Process all rows for this user. With some hard limit to prevent process lock ups.
    $result_subs = db_query_range("SELECT * FROM {notifications_queue} WHERE cron = 1 AND send_interval = '%d' AND uid = %d AND sqid <= %d ORDER BY send_method, sqid", $send_interval, $account->uid, $max_sqid, 0, NOTIFICATIONS_STEP_ROWS);
    while (($queue = db_fetch_object($result_subs)) && notifications_process('count', 'row')) {
      $count++;
      $processed[] = $sqid = $queue->sqid;

      // Load event and check access
      $event = notifications_load_event($queue->eid, TRUE);
      if (notifications_callback($module, 'user_allowed', 'event', $account, $event)) {

        // This will take care of duplicated events
        $events[$queue->eid] = $event;

        // We keep track also of subscriptions originating this event
        $subscriptions[$queue->eid][] = $queue->sid;
        notifications_log("Processing queued sid={$queue->sid} event={$queue->eid} ({$event->type}, {$event->action}) send_method={$send_method}");
      }
      else {
        notifications_log("Access denied for queued event sid={$queue->sid} event={$queue->eid} ({$event->type}, {$event->action})");
      }
    }
    if ($events) {
      notifications_callback($module, 'process_send', $account, $events, $subscriptions, $send_method, $send_interval);
      notifications_callback($module, 'update_sent', $user->uid, $send_method, $send_interval, time());
    }
    if ($processed && !notifications_process('option', 'debug')) {
      notifications_queue_delete(array(
        'uid' => $user->uid,
        'send_interval' => $send_interval,
        'send_method' => $send_method,
        'max_sqid' => $sqid,
      ));
    }
  }
  return $count;
}