You are here

function notifications_process_rows in Notifications 6.2

Same name and namespace in other branches
  1. 5 notifications.cron.inc \notifications_process_rows()
  2. 6 notifications.cron.inc \notifications_process_rows()
  3. 6.3 notifications.cron.inc \notifications_process_rows()

Process rows given query conditions

This is used by the immediate sending feature

Parameters

$conditions: Array of query conditions

$limit: Optional, limit the number of rows to process

$update: Optional, update queue rows and event counter after processing

See also

notifications_queue_query()

2 calls to notifications_process_rows()
NotificationsLiteTests::sendLiteMessages in tests/notifications_lite.test
notifications_exit in ./notifications.module
Implementation of hook_exit()
1 string reference to 'notifications_process_rows'
notifications_queue_operations in ./notifications.admin.inc
List of queue operations

File

./notifications.cron.inc, line 192

Code

function notifications_process_rows($conditions, $limit = 0, $update = TRUE) {
  notifications_log('Processing queue rows', $conditions);
  $account = $destination = NULL;
  $subscriptions = $events = $processed = array();
  $send_method = $send_interval = $module = NULL;
  $test = notifications_process('option', 'test');
  $count = 0;

  // Build query and fetch rows from queue
  $query = notifications_queue_query($conditions);
  $sql = "SELECT * FROM {notifications_queue} ";
  $sql .= " WHERE " . implode(' AND ', $query['where']);
  $sql .= " ORDER BY module, uid, destination, send_method, send_interval";
  if ($limit) {
    $result = db_query_range($sql, $query['args'], 0, $limit);
  }
  else {
    $result = db_query($sql, $query['args']);
  }

  // Group rows by user, send_method, send_interval before composing and sending
  // This loop has to run a final time after all rows have been fetched
  while (($queue = db_fetch_object($result)) || $processed) {
    notifications_process('count', 'row');
    if (!$account || !$queue || $queue->module != $module || $queue->uid != $account->uid || $queue->destination != $destination || $queue->send_method != $send_method || $queue->send_interval != $send_interval) {

      // New user or sending method or destination, send if not the first row and reset
      if ($account && $events && $subscriptions) {
        $messages = notifications_callback($module, 'process_compose', $account, $events, $subscriptions, $send_method, $send_interval);
        notifications_log('Composed messages', array(
          'number' => count($messages),
          'send_method' => $send_method,
        ));

        // Note that we pass the testing parameter to notifications_process_send
        notifications_callback($module, 'process_send', $account, $messages, $send_method, $test);
        if (!$test) {
          notifications_update_sent($account, $send_method, $send_interval, time());
        }
      }
      if ($processed && $update) {
        notifications_queue_done(array(
          'sqid' => $processed,
        ));
      }
      $subscriptions = $events = $processed = array();

      // Keep track of parameters that will trigger a sending when changing
      if ($queue) {
        $send_method = $queue->send_method;
        $send_interval = $queue->send_interval;
        $destination = $queue->destination;
        $module = $queue->module;

        // Users may be handled by a different module implementing the _load_user callback.
        // I.e. for anonymous users it may load the name from somewhere
        $account = notifications_callback($module, 'load_user', $queue->uid, $destination, $send_method);
      }
    }

    // For every row in queue, compile everyting that will be available for sending
    if ($queue) {
      $count++;
      $processed[] = $queue->sqid;

      // Load event, check it exists and check the user has access to the event objects
      if ($event = notifications_load_event($queue->eid)) {
        notifications_event_tracker('count', $event);
        notifications_log('Processing queued', array(
          'queue sqid' => $queue->sqid,
          'event' => $queue->eid,
          'type' => $event->type,
          'action' => $event->action,
          'send method' => $send_method,
        ));
        if (notifications_user_allowed('event', $account, $event)) {

          // This will take care of duplicated events
          $events[$queue->eid] = $event;

          // We keep track also of subscriptions originating this event
          $subscriptions[$queue->eid][] = $queue->sid;
        }
        else {
          notifications_log('Access denied for event', array(
            'account' => $user->uid,
            'event' => $queue->eid,
          ));
        }
      }
      else {
        notifications_log('Cannot load event', array(
          'eid' => $queue->eid,
          'queue sid' => $queue->sid,
        ));
      }
    }
  }
  if ($update) {
    notifications_event_tracker('update');
  }

  // Return number of rows processed
  return $count;
}