You are here

protected static function Messaging_Store::queue_process_step in Messaging 6.4

Retrieve and send queued messages

Parameters

$limit: Maximum number of queued messages to process for this step

$timeout: Optional time limit for processing, will return when if reached during processing

Return value

Array of sending results indexed by message id

1 call to Messaging_Store::queue_process_step()
Messaging_Store::queue_process in includes/messaging_store.class.inc
Process and send messages in queue, to be called from cron

File

includes/messaging_store.class.inc, line 109
Database storage for the messaging framework

Class

Messaging_Store
Default storage and queueing system for Messaging

Code

protected static function queue_process_step($limit, $timeout = 0) {
  $count = 0;
  $sent = $unsent = $processed = array();
  $result = self::select_query('*', array(
    'queue' => 1,
    'cron' => 1,
  ), array(
    'order' => array(
      self::DB_KEY,
    ),
    'limit' => $limit,
  ));
  while ($object = db_fetch_object($result)) {
    $message = self::message_unpack($object, TRUE);
    $success = self::queue_process_message($message);
    $processed[$message->mqid] = $success;
    if ($success) {
      $sent[] = $message->mqid;
      messaging_debug('Processed message from queue', array(
        'message' => $message,
        'success' => $success,
      ));
    }
    else {
      $unsent[] = $message->mqid;
      watchdog('messaging', 'Failed queue processing for @message', array(
        '@message' => (string) $message,
      ), WATCHDOG_WARNING);
    }
    $count++;

    // Check timeout after each message
    if ($timeout && time() > $timeout) {
      break;
    }
  }
  if ($sent) {
    self::message_sent($sent);
  }
  if ($unsent) {
    self::message_sent($unsent, TRUE);
  }
  return $processed;
}