You are here

function notifications_process_queue in Notifications 6.2

Same name and namespace in other branches
  1. 5 notifications.cron.inc \notifications_process_queue()
  2. 6 notifications.cron.inc \notifications_process_queue()
  3. 6.3 notifications.cron.inc \notifications_process_queue()

Process subscriptions queue

The subscriptions queue has the following fields sqid, uid, eid, sid, digest

This function should be able of splitting the whole processing in several steps. It will be called multiple times for each send interval

Messages will be processed for each send interval, send_method, user

@ TODO Review time conditions @ TODO Per module queue processing

Parameters

$send_interval: Send interval to process

$max_sqid: Max queue id to process

Return value

Number of rows processed

3 calls to notifications_process_queue()
NotificationsContentTests::testNotificationsContent in tests/notifications_content.test
Play with creating, retrieving, deleting a pair subscriptions
NotificationsLiteTests::testNotificationsLite in tests/notifications_lite.test
Test simple sending cases
notifications_process_run in ./notifications.cron.inc
Function to be called on cron by the main notifications_cron

File

./notifications.cron.inc, line 291

Code

function notifications_process_queue($send_interval, $max_sqid) {
  notifications_log('Starting queue processing', array(
    'send interval' => $send_interval,
    'max squid' => $max_sqid,
  ));
  $test = notifications_process('option', 'test');

  // Count processed rows
  $count = 0;

  // This is the time from which stored rows will be sent
  $timelimit = time() - $send_interval;

  // Check remaining rows to process to adjust query limits for both users and rows
  $step_users = NOTIFICATIONS_STEP_USERS;
  $step_rows = NOTIFICATIONS_STEP_ROWS;
  if ($row_limit = notifications_process('limit', 'row')) {
    $remaining_rows = $row_limit - notifications_process('current', 'row');
    if ($remaining_rows > 0) {
      $step_users = min($remaining_rows, $step_users);
      $step_rows = min($remaining_rows, $step_rows);
    }
  }

  // Get users to process messages for, with this time interval and ordered by squid
  // Order by last sent for this send interval
  // Note: If we get the users with more messages pending first this may save some time
  $sql = "SELECT q.uid, q.destination, q.module, q.send_method, count(*) AS count_rows FROM {notifications_queue} q ";
  $sql .= " LEFT JOIN {notifications_sent} su ON q.uid = su.uid AND q.send_interval = su.send_interval AND q.send_method = su.send_method ";
  $sql .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.sqid <= %d";
  $sql .= " AND (su.uid IS NULL OR su.sent < %d) ";

  // Note: the group by su.sent seems to be needed by pgsql
  $sql .= " GROUP BY q.uid, q.destination, q.module, q.send_method, su.sent ORDER BY su.sent";
  $result = db_query_range($sql, $send_interval, $max_sqid, $timelimit, 0, $step_users);

  // We create a bach for each user, destination method and handle it to notifications_process_rows()
  while (($queue = db_fetch_object($result)) && notifications_process('check')) {
    notifications_log('Queue processing', array(
      'user' => $queue->uid,
      'rows' => $queue->count_rows,
      'send method' => $queue->send_method,
    ));
    $module = $queue->module;
    $events = $subscriptions = $processed = array();

    // Process all rows for this user. With some hard limit to prevent process lock ups.
    // In case we have too many rows, we go updating step by step
    if ($queue->count_rows > $step_rows) {
      $limit = $step_rows;
      $update = TRUE;
    }
    else {
      $limit = $queue->count_rows;
      $update = FALSE;
    }
    $batch = array(
      'uid' => $queue->uid,
      'destination' => $queue->destination,
      'module' => $queue->module,
      'send_method' => $queue->send_method,
      'send_interval' => $send_interval,
      'cron' => 1,
      'max_sqid' => $max_sqid,
    );

    // These rows may be processed by a different module. Defaults to notifications_process_rows()
    $processed = notifications_callback($queue->module, 'process_rows', $batch, $limit, $update);
    $count += $processed;
    if ($processed && !$test && !$update) {
      notifications_queue_done($batch);
    }
  }

  // If not doing a test run, update event counter and return count
  // If doing a test run, return 0 so we don't go through this again
  if (!$test) {
    notifications_event_tracker('update');
    return $count;
  }
  else {
    return 0;
  }
}