You are here

function notifications_process_queue in Notifications 6.3

Same name and namespace in other branches
  1. 5 notifications.cron.inc \notifications_process_queue()
  2. 6 notifications.cron.inc \notifications_process_queue()
  3. 6.2 notifications.cron.inc \notifications_process_queue()

Process subscriptions queue

The subscriptions queue has the following fields sqid, uid, eid, sid, digest

This function should be able of splitting the whole processing in several steps. It will be called multiple times for each send interval

Messages will be processed for each send interval, send_method, user

@ TODO Review time conditions @ TODO Per module queue processing

Parameters

$send_interval: Send interval to process

$max_sqid: Max queue id to process

$language: Optional language to process only rows in this language

Return value

Number of rows processed

3 calls to notifications_process_queue()
NotificationsContentTests::testNotificationsContent in tests/notifications_content.test
Play with creating, retrieving, deleting a pair subscriptions
NotificationsLiteTests::testNotificationsLite in tests/notifications_lite.test
Test simple sending cases
notifications_process_run in ./notifications.cron.inc
Function to be called on cron by the main notifications_cron

File

./notifications.cron.inc, line 337

Code

function notifications_process_queue($send_interval, $max_sqid, $language = NULL) {
  notifications_log('Starting queue processing', array(
    'send interval' => $send_interval,
    'max sqid' => $max_sqid,
  ));

  // Option for test running, marking messages as test, nor updating not sending
  $test = notifications_process('option', 'test');

  // Option for normal running but without updating the queue records
  $keep = notifications_process('option', 'keep');

  // Count processed rows
  $count = 0;

  // This is the time from which stored rows will be sent
  $timelimit = time() - $send_interval;

  // Get users to process messages for, with this time interval and ordered by squid
  // Order by last sent for this send interval
  // Note: If we get the users with more messages pending first this may save some time
  $sql = "SELECT q.uid, q.destination, q.module, q.send_method, count(*) AS count_rows FROM {notifications_queue} q ";
  $sql .= " LEFT JOIN {notifications_sent} su ON q.uid = su.uid AND q.send_interval = su.send_interval AND q.send_method = su.send_method ";
  $sql .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.sqid <= %d";
  if ($language) {
    $sql .= " AND q.language = '%s'";
  }
  $sql .= " AND (su.uid IS NULL OR su.sent < %d) ";

  // Note: the group by su.sent seems to be needed by pgsql
  $sql .= " GROUP BY q.uid, q.destination, q.module, q.send_method, su.sent ORDER BY su.sent";
  if ($language) {
    $result = db_query_range($sql, $send_interval, $max_sqid, $language->language, $timelimit, 0, NOTIFICATIONS_STEP_USERS);
  }
  else {
    $result = db_query_range($sql, $send_interval, $max_sqid, $timelimit, 0, NOTIFICATIONS_STEP_USERS);
  }

  // We create a bach for each user, destination, method and hand it over to notifications_process_rows()
  while (($queue = db_fetch_object($result)) && notifications_process('check')) {
    notifications_log('Queue processing', array(
      'user' => $queue->uid,
      'rows' => $queue->count_rows,
      'send method' => $queue->send_method,
    ));
    $module = $queue->module;
    $events = $subscriptions = $processed = array();

    // Process all rows for this user. With some hard limit to prevent process lock ups.
    // In case we have too many rows, we go updating step by step
    if ($queue->count_rows > NOTIFICATIONS_STEP_ROWS) {
      $limit = NOTIFICATIONS_STEP_ROWS;
      $update = TRUE;
    }
    else {
      $limit = $queue->count_rows;
      $update = FALSE;
    }
    $update = $update && !$keep;
    $batch = array(
      'uid' => $queue->uid,
      'destination' => $queue->destination,
      'module' => $queue->module,
      'send_method' => $queue->send_method,
      'send_interval' => $send_interval,
      'cron' => 1,
      'max_sqid' => $max_sqid,
    );
    if ($language) {
      $batch['language'] = $language->language;
    }

    // These rows may be processed by a different module. Defaults to notifications_process_rows()
    $processed = notifications_callback($queue->module, 'process_rows', $batch, $limit, $update);
    $count += $processed;

    // If we didn't update row by row ($update), we update all at once
    if ($processed && !$test && !$update && !$keep) {
      notifications_queue_done($batch);
    }
  }

  // If not doing a test run, update event counter and return count
  // If doing a test run, return 0 so we don't go through this again
  if (!$test && !$keep) {
    notifications_event_tracker('update');
    return $count;
  }
  else {
    return 0;
  }
}