You are here

function mongodb_queue_cron in MongoDB 7

Implements hook_cron().

File

mongodb_queue/mongodb_queue.module, line 11
MongoDB Queue.

Code

function mongodb_queue_cron() {

  // Reset expired items.
  $aliases = variable_get('mongodb_connections', array());
  $collections = variable_get('mongodb_collections', array());
  $aliases['default'] = TRUE;
  $update_query = array(
    'expire' => array(
      '$gt' => 0,
      '$lt' => REQUEST_TIME,
    ),
  );
  foreach (array_keys($aliases) as $alias) {

    // Call mongodb()->selectConnection() directly to avoid prefixing and DB
    // mapping tricks mongodb_collection() plays.
    $db = mongodb($alias);
    $collection = $db
      ->selectCollection('system.namespaces');
    $n = strlen($db) + 1;

    // We won't need to check for system collections because those start with
    // system. and we search for queue.
    $find = array(
      'name' => new MongoRegex('/^' . $db . '\\.queue\\./'),
    );
    $queues = $collection
      ->find($find);
    foreach ($queues as $collection_data) {
      $collection_name = $collection_data['name'];

      // According to the PECL mongodb source $ means an index. Also check for
      // a valid collection name.
      if (strpos($collection_name, '$') === FALSE && substr($collection_name, 0, $n) == "{$db}.") {
        $queue_name = substr($collection_data['name'], $n);

        // Make sure we are on the correct alias for a given queue. If no
        // connection is defined this is the default alias.
        if (isset($collections[$queue_name])) {
          if (is_array($collections[$queue_name]) && isset($collections[$queue_name]['db_connection']) && $collections[$queue_name]['db_connection'] != $alias) {
            continue;
          }
          elseif (!is_array($collections[$queue_name]) && $collections[$queue_name] != $alias) {
            continue;
          }
        }
        elseif ($alias != 'default') {
          continue;
        }
        $db
          ->selectCollection($queue_name)
          ->update($update_query, array(
          '$set' => array(
            'expire' => 0,
          ),
        ), array(
          'multiple' => TRUE,
        ) + mongodb_default_write_options());
      }
    }
  }
}