You are here

function Notifications_Queue::process_rows in Notifications 6.4

Process rows given query conditions

This is the main notifications queue processing function, fetching queued notifications, loading all related objects and sending out messages.

Parameters

$conditions: Array of query conditions

$limit: Optional, limit the number of rows to process

$update: Optional, update queue rows and event counter after processing

Return value

int Number of rows processed

See also

queue_query()

File

includes/notifications_queue.class.inc, line 280

Class

Notifications_Queue
Queue management and processing

Code

function process_rows($conditions, $limit = 0, $update = TRUE) {
  notifications_log('Processing queue rows', $conditions + array(
    'limit' => $limit,
  ));
  $test = $this
    ->process_option('test');
  $count = 0;

  // Build query and fetch rows from queue
  $query = $this
    ->queue_query($conditions);
  $sql = "SELECT * FROM {notifications_queue}";
  $sql .= " WHERE " . implode(' AND ', $query['where']);
  $sql .= " ORDER BY module, mdid, send_method, send_interval";
  if ($limit) {
    $result = db_query_range($sql, $query['args'], 0, $limit);
  }
  else {
    $result = db_query($sql, $query['args']);
  }

  // Group rows by module, destination, send_interval before composing and sending
  // This loop has to run a final time after all rows have been fetched
  $last = $pending = NULL;
  while (($queue = db_fetch_object($result)) || $pending) {
    if (!$queue || $last && ($queue->module != $last->module || $queue->mdid != $last->mdid || $queue->send_method != $last->send_method || $queue->send_interval != $last->send_interval || $queue->language != $last->language)) {

      // New destination, send if not the first row and reset
      $count += $this
        ->process_group($pending, $update, $last);
      $pending = NULL;
    }
    if ($queue) {
      $this
        ->process_control('count', 'row');

      // Add queue row to pending list
      $pending[$queue->sqid] = $queue;
    }
    $last = $queue;
  }

  // Done with queue, update event tracker
  if ($update) {
    Notifications_Event::track_update();
  }

  // Return number of rows processed
  return $count;
}