You are here

function Notifications_Queue::process_queue in Notifications 6.4

Process subscriptions queue

The subscriptions queue has the following fields sqid, uid, eid, sid, digest

This function should be able of splitting the whole processing in several steps. It will be called multiple times for each send interval

Messages will be processed for each send interval, send_method, user

Parameters

$send_interval: Send interval to process

$max_sqid: Max queue id to process

Return value

Number of rows processed

1 call to Notifications_Queue::process_queue()
Notifications_Queue::process_run in includes/notifications_queue.class.inc
Function to be called on cron by the main notifications_cron

File

includes/notifications_queue.class.inc, line 404

Class

Notifications_Queue
Queue management and processing

Code

function process_queue($send_interval, $max_sqid = NULL) {
  $max_sqid = isset($max_sqid) ? $max_sqid : $this
    ->process_prepare();
  $language = $this->process_language;
  notifications_log('Starting queue processing', array(
    'send interval' => $send_interval,
    'max sqid' => $max_sqid,
  ));

  // Option for test running, marking messages as test, nor updating not sending
  $test = $this
    ->process_option('test');

  // Option for normal running but without updating the queue records
  $keep = $this
    ->process_option('keep');

  // Count processed rows
  $count = 0;

  // For scheduled notifications, send just rows after this time
  $send_time = time();

  // This is the time from which stored rows will be sent
  $timelimit = $send_time - $send_interval;

  // Check remaining rows to process to adjust query limits for both users and rows
  $step_users = NOTIFICATIONS_STEP_USERS;
  $step_rows = NOTIFICATIONS_STEP_ROWS;
  if ($row_limit = $this
    ->process_limit('row')) {
    $remaining_rows = $row_limit - $this
      ->process_control('current', 'row');
    if ($remaining_rows > 0) {
      $step_users = min($remaining_rows, $step_users);
      $step_rows = min($remaining_rows, $step_rows);
    }
  }

  // Common batch parts for processing rows
  $default_batch = array(
    'cron' => 1,
    'max_sqid' => $max_sqid,
    'send_interval' => $send_interval,
    'send_time_after' => $send_time,
  );

  // Get users to process messages for, with this time interval and ordered by squid
  // Order by last sent for this send interval
  // Note: If we get the users with more messages pending first this may save some time
  $sql_select = "SELECT q.mdid, q.send_method, q.module, COUNT(q.sqid) AS count_rows FROM {notifications_queue} q ";
  $sql_select .= " LEFT JOIN {notifications_sent} su ON q.mdid = su.mdid AND q.send_interval = su.send_interval ";
  $sql_select .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.send_time < %d AND q.sqid <= %d";
  $sql_select .= " AND (su.mdid IS NULL OR su.sent < %d) ";

  // Note: the group by su.sent seems to be needed by pgsql
  $sql_group = " GROUP BY q.mdid, q.send_method, q.module, su.sent ORDER BY su.sent";

  // If processing by language some things change
  if ($language) {
    $sql_select .= " AND q.language = '%s' ";
    $default_batch['language'] = $language->language;
    $result = db_query_range($sql_select . $sql_group, $send_interval, $send_time, $max_sqid, $timelimit, $language->language, 0, $step_users);
  }
  else {
    $result = db_query_range($sql_select . $sql_group, $send_interval, $send_time, $max_sqid, $timelimit, 0, $step_users);
  }

  // We create a bach for each mdid (user, destination, method) and handle it to notifications_process_rows()
  while (($queue = db_fetch_object($result)) && $this
    ->process_control('check')) {
    $module = $queue->module;
    $processed = array();

    // Process all rows for this user. With some hard limit to prevent process lock ups.
    // In case we have too many rows, we go updating step by step
    if ($queue->count_rows > $step_rows) {
      $limit = $step_rows;

      // Still if we want to keep data, we don't update as we go
      $update = !$keep;
    }
    else {
      $limit = $queue->count_rows;
      $update = FALSE;
    }

    // Prepare batch query for actual row processing
    $batch = $default_batch + array(
      'mdid' => $queue->mdid,
      'send_method' => $queue->send_method,
      'module' => $queue->module,
    );
    notifications_log('Queue processing', $batch);

    // These rows may be processed by a different module. Defaults to notifications_process_rows()
    $processed = $this
      ->process_callback($queue->module, 'process_rows', $batch, $limit, $update);
    $count += $processed;
    if ($processed && !$test && !$update && !$keep) {
      $this
        ->queue_done($batch);
    }
  }

  // If not doing a test run, update event counter and return count
  // If doing a test run, return 0 so we don't go through this again
  if (!$test && !$keep) {
    Notifications_Event::track_update();
    return $count;
  }
  else {
    return 0;
  }
}