You are here

function notifications_process_queue in Notifications 6

Same name and namespace in other branches
  1. 5 notifications.cron.inc \notifications_process_queue()
  2. 6.2 notifications.cron.inc \notifications_process_queue()
  3. 6.3 notifications.cron.inc \notifications_process_queue()

Process subscriptions queue

The subscriptions queue has the following fields sqid, uid, eid, sid, digest

This function should be able of splitting the whole processing in several steps. It will be called multiple times for each send interval

Messages will be processed for each send interval, send_method, user

@ TODO Review time conditions @ TODO Per module queue processing

Parameters

$send_interval: Send interval to process

$max_sqid: Max queue id to process

Return value

Number of rows processed

2 calls to notifications_process_queue()
NotificationsContentTests::testNotificationsContent in tests/notifications_content.test
Play with creating, retrieving, deleting a pair subscriptions
notifications_process_run in ./notifications.cron.inc
Function to be called on cron by the main notifications_cron

File

./notifications.cron.inc, line 235

Code

function notifications_process_queue($send_interval, $max_sqid) {
  notifications_log('Starting queue processing', array(
    'send interval' => $send_interval,
    'max squid' => $max_sqid,
  ));
  $test = notifications_process('option', 'test');
  $count = 0;

  // This is the time from which stored rows will be sent
  $timelimit = time() - $send_interval;

  // Get users to process messages for, with this time interval and ordered by squid
  // Order by last sent for this send interval
  // Note: If we get the users with more messages pending first this may save some time
  $sql = "SELECT q.uid, q.module, q.send_method, count(*) AS count FROM {notifications_queue} q ";
  $sql .= " LEFT JOIN {notifications_sent} su ON q.uid = su.uid AND q.send_interval = su.send_interval AND q.send_method = su.send_method ";
  $sql .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.sqid <= %d";
  $sql .= " AND (su.uid IS NULL OR su.sent < %d) ";

  // Note: the group by su.sent seems to be needed by pgsql
  $sql .= " GROUP BY q.uid, q.module, q.send_method, su.sent ORDER BY su.sent";
  $result = db_query_range($sql, $send_interval, $max_sqid, $timelimit, 0, NOTIFICATIONS_STEP_USERS);
  $sqid = 0;

  // @ TODO Add time conditions
  while (($user = db_fetch_object($result)) && notifications_process('check')) {
    notifications_log('Queue processing', array(
      'user' => $user->uid,
      'rows' => $user->count,
      'send method' => $user->send_method,
    ));
    $module = $user->module;
    $events = $subscriptions = $processed = array();
    $send_method = $user->send_method;

    // Users may be handled by a different module
    $account = notifications_callback($module, 'load_user', $user->uid);

    // Process all rows for this user. With some hard limit to prevent process lock ups.
    $result_subs = db_query_range("SELECT * FROM {notifications_queue} WHERE cron = 1 AND send_interval = '%d' AND uid = %d AND sqid <= %d ORDER BY send_method, sqid", $send_interval, $account->uid, $max_sqid, 0, NOTIFICATIONS_STEP_ROWS);
    while (($queue = db_fetch_object($result_subs)) && notifications_process('count', 'row')) {
      $count++;
      $processed[] = $sqid = $queue->sqid;

      // Load event, check it exists and check the user has access to the event objects
      if ($event = notifications_load_event($queue->eid, TRUE)) {
        notifications_event_tracker('count', $event);
        notifications_log('Processing queued', array(
          'queue sid' => $queue->sid,
          'event' => $queue->eid,
          'type' => $event->type,
          'action' => $event->action,
          'send method' => $send_method,
        ));
        if (notifications_callback($module, 'user_allowed', 'event', $account, $event)) {

          // This will take care of duplicated events
          $events[$queue->eid] = $event;

          // We keep track also of subscriptions originating this event
          $subscriptions[$queue->eid][] = $queue->sid;
        }
        else {
          notifications_log('Access denied for event', array(
            'account' => $user->uid,
            'event' => $queue->eid,
            'type' => $event->type,
            'action' => $event->action,
          ));
        }
      }
      else {
        notifiations_log('Cannot load event', array(
          'eid' => $queue->eid,
          'queue sid' => $queue->sid,
        ));
      }
    }
    if ($events) {
      notifications_process_send($account, $events, $subscriptions, $send_method, $send_interval);
      if (!$test) {
        notifications_update_sent($user->uid, $send_method, $send_interval, time());
      }
    }
    if ($processed && !$test) {
      notifications_queue_done(array(
        'uid' => $user->uid,
        'send_interval' => $send_interval,
        'send_method' => $send_method,
        'max_sqid' => $sqid,
      ));
    }
  }

  // Update event counter
  if (!$test) {
    notifications_event_tracker('update');
  }

  // If doing a test run, return 0 so we don't go through this again
  return $test ? 0 : $count;
}