You are here

messaging.store.inc in Messaging 6.3

Same filename and directory in other branches
  1. 5 messaging.store.inc
  2. 6 messaging.store.inc
  3. 6.2 messaging.store.inc

Database storage for the messaging framework

File

messaging.store.inc
View source
<?php

/**
 * @file
 *   Database storage for the messaging framework
 */

// Max number of rows to process for each step before clean up
define('MESSAGING_STEP_ROWS', 1000);

// Minimum amount of seconds the process will need for clean-up tasks
// Just to make sure that after exhausting cron assigned time we'll have a few spare seconds for some cleanup
define('MESSAGING_TIME_MARGIN', 5);

/**
 * Process and send messages in queue, to be called from cron
 * 
 * It will check for predefined limits and repeat the cycle
 *   [fetch] -> [send] -> [check]
 * until the queue is empty or any of the limits are met
 * 
 * @param $timeout
 *   Optional time out to use instead of cron, just for this api to be testable
 */
function messaging_store_queue_process($timeout = 0) {
  $limit = variable_get('messaging_process_limit', array(
    'message' => 0,
    'time' => 0,
    'percent' => 0,
  ));

  // Calculate time limit. We get the smaller of all these times in seconds
  if ($timeout) {
    $timelimit[] = time() + $timeout;
  }
  else {
    $timelimit[] = variable_get('cron_semaphore', 0) + ini_get('max_execution_time') - MESSAGING_TIME_MARGIN;
  }
  if ($limit['time']) {
    $timelimit[] = time() + $limit['time'];
  }
  if ($limit['percent']) {
    $timelimit[] = time() + ini_get('max_execution_time') * $limit['percent'] / 100;
    unset($limit['percent']);
  }
  $limit['time'] = min($timelimit);

  // Processing loop. Will stop when we run out of rows or reach time / messages limit
  $count = 0;
  $max = !empty($limit['message']) ? $limit['message'] : 0;
  do {
    $step = $max ? min(MESSAGING_STEP_ROWS, $max - $count) : MESSAGING_STEP_ROWS;
    $number = messaging_store_queue_process_step($step, $limit['time']);
    $count += $number;
  } while ($number == $step && time() <= $limit['time'] && (!$max || $max > $count));
}

/**
 * Retrieve and send queued messages
 * 
 * @param $limit
 *   Maximum number of queued messages to process for this step
 * @param $timeout
 *   Optional time limit for processing, will return when if reached during processing
 * @return
 *   Number of messages processed in this step
 */
function messaging_store_queue_process_step($limit, $timeout = 0) {
  $count = 0;
  $sent = $unsent = array();
  $result = db_query_range("SELECT * FROM {messaging_store} WHERE queue = 1 AND cron = 1 ORDER BY mqid", 0, $limit);
  while ($message = db_fetch_object($result)) {
    messaging_store_unpack($message, TRUE);

    // Actual sending functions function
    $message->process = TRUE;
    $message
      ->send();
    if ($message->success) {
      $sent[] = $message->mqid;
    }
    else {
      $unsent[] = $message->mqid;
    }
    $count++;

    // Check timeout after each message
    if ($timeout && time() > $timeout) {
      break;
    }
  }
  if ($sent) {
    messaging_store_sent($sent);
  }
  if ($unsent) {
    messaging_store_sent($unsent, TRUE);
  }
  return $count;
}

/**
 * Queue clean up
 * - Remove expired logs
 * - @ TODO Remove expired queued messages
 */
function messaging_store_queue_cleanup() {
  if ($expire = variable_get('messaging_log', 0)) {
    db_query('DELETE FROM {messaging_store} WHERE log = 1 AND queue = 0 AND sent < %d', time() - $expire);
  }
}

/**
 * Retrieve from messaging database storage
 * 
 * @param $params
 *   Array of field value pairs
 * @param $order
 *   Optional array of field names to order by
 * @param $limit
 *   Optional maximum number of rows to retrieve
 * @param $pager
 *   Optional pager element for pager queries
 * @param $unpack
 *   Optional fully load stored data
 */
function messaging_store_get($params, $order = NULL, $limit = NULL, $pager = NULL, $unpack = TRUE) {
  $messages = $where = $args = array();
  list($where, $args) = messaging_store_query($params);
  $sql = 'SELECT * FROM {messaging_store}';
  $sql .= $where ? ' WHERE ' . implode(' AND ', $where) : '';
  $sql .= $order ? ' ORDER BY ' . implode(', ', $order) : '';
  if (!is_null($pager)) {
    $result = pager_query($sql, $limit, $pager, NULL, $args);
  }
  elseif ($limit) {
    $result = db_query_range($sql, $args, 0, $limit);
  }
  else {
    $result = db_query($sql, $args);
  }
  while ($msg = db_fetch_object($result)) {
    if ($unpack) {
      $messages[$msg->mqid] = messaging_store_unpack($msg, 'Messaging_Message');
    }
    else {
      $messages[$msg->mqid] = $msg;
    }
  }
  return $messages;
}

/**
 * Load single message from store
 */
function messaging_store_load($mqid) {
  if ($message = db_fetch_object(db_query('SELECT * FROM {messaging_store} WHERE mqid = %d', $mqid))) {
    return messaging_store_unpack($message, 'Messaging_Message');
  }
}

/**
 * Build query with field conditions
 * 
 * This function supports IN() conditions when passing array field values
 * @param $query
 *   Array of field => value pars
 */
function messaging_store_query($fields) {
  $where = $args = array();
  foreach ($fields as $key => $value) {
    if (is_array($value)) {

      // Special processing for array parameters. Many ints are expected for 'mqid' field
      $type = $key == 'mqid' ? 'int' : 'varchar';
      $where[] = $key . ' IN(' . db_placeholders($value, $type) . ')';
      $args = array_merge($args, $value);
    }
    else {
      $where[] = $key . " = '%s'";
      $args[] = $value;
    }
  }
  return array(
    $where,
    $args,
  );
}

/**
 * Unpack stored messages
 * 
 * @param $message
 *   Array as retrieved from the db store
 * @param $full
 *   True for loading the account data if this message is intended for a user
 *   And loading the file objects associated too
 */
function messaging_store_unpack_message(&$message, $full = FALSE) {
  if ($message->uid && $full) {
    $message->account = messaging_load_user($message->uid);
  }
  if ($message->sender && $full) {
    $message->sender_account = messaging_load_user($message->sender);
  }

  // Check destinations array, in case it was not properly filled
  if (empty($message->destinations)) {
    if (!empty($message->account) && ($userdest = messaging_user_destination($message->account, $message->method, $message))) {
      $message->destinations = array(
        $userdest,
      );
    }
    elseif (!empty($message->destination)) {
      $message->destinations = array(
        $message->destination,
      );
    }
  }
}

/**
 * Mark messages as sent, either deleting them, or keeping logs
 * 
 * @param $mqid
 *   Single message id or array of message ids
 * @param $error
 *   Optional, just mark as error move queue messages to log, for messages on which sending failed 
 */
function messaging_store_sent($mqid, $error = FALSE) {
  $mqid = is_array($mqid) ? $mqid : array(
    $mqid,
  );
  list($where, $args) = messaging_store_query(array(
    'mqid' => $mqid,
  ));
  if ($error) {

    // Error, log them all, sent = 0
    $sent = 0;
  }
  else {

    // First delete the ones that are not for logging, then mark as sent
    db_query("DELETE FROM {messaging_store} WHERE log = 0 AND " . implode(' AND ', $where), $args);
    $sent = time();
  }

  // Now unmark the rest for queue processing, as logs
  $args = array_merge(array(
    $sent,
  ), $args);
  db_query("UPDATE {messaging_store} SET queue = 0, cron = 0, log = 1, sent = %d WHERE " . implode(' AND ', $where), $args);
}

/**
 * Delete messages from queue
 */
function messaging_store_del($params) {
  list($where, $args) = messaging_store_query($params);
  db_query("DELETE FROM {messaging_store} WHERE " . implode(' AND ', $where), $args);
}

/**
 * Put into database storage, create one line for each destination
 * 
 * If there are many destinations they will be stored as 'multiple'
 * 
 * @param $message
 *   Message object
 */
function messaging_store_save($message) {
  messaging_store_save_object($message, 'messaging_store', 'mqid', $message
    ->data_fields());
}

/**
 * Delete message object from store
 */
function messaging_store_delete($mqid) {
  db_query("DELETE FROM {messaging_store} WHERE mqid = %d", $mqid);
}

/**
 * Save object
 */
function messaging_store_save_object($object, $table, $key, $fields = NULL) {

  // Serialize fields
  if ($fields) {
    foreach ($fields as $field) {
      if (isset($object->{$field})) {
        $object->data[$field] = $object->{$field};
      }
    }
  }

  // Create / update depending on key field
  if (empty($object->{$key})) {
    $update = array();
    $object->created = $object->updated = time();
  }
  else {
    $update = $key;
    $object->updated = time();
  }
  return drupal_write_record($table, $object, $update);
}

/**
 * Load object/s
 */
function messaging_store_load_object($table, $field, $value, $class = NULL) {
  if ($stored = db_fetch_object(db_query("SELECT * FROM {$table} WHERE {$field} = %d", $value))) {

    // The class name may be in the database too
    $class = $class ? $class : $stored->class;
    return $this
      ->unpack($stored, $class);
  }
}

/**
 * Unpack data as specific class object
 */
function messaging_store_unpack($stored, $class) {
  drupal_unpack($stored);
  if (is_a($stored, $class)) {

    // If the object is already that class, just return
    return $stored;
  }
  else {
    $object = new $class($stored);
    return $object;
  }
}

Functions

Namesort descending Description
messaging_store_del Delete messages from queue
messaging_store_delete Delete message object from store
messaging_store_get Retrieve from messaging database storage
messaging_store_load Load single message from store
messaging_store_load_object Load object/s
messaging_store_query Build query with field conditions
messaging_store_queue_cleanup Queue clean up
messaging_store_queue_process Process and send messages in queue, to be called from cron
messaging_store_queue_process_step Retrieve and send queued messages
messaging_store_save Put into database storage, create one line for each destination
messaging_store_save_object Save object
messaging_store_sent Mark messages as sent, either deleting them, or keeping logs
messaging_store_unpack Unpack data as specific class object
messaging_store_unpack_message Unpack stored messages

Constants