You are here

class MongoDBQueue in MongoDB 7

Hierarchy

Expanded class hierarchy of MongoDBQueue

File

mongodb_queue/mongodb_queue.inc, line 12
Contains \MongoDBQueue.

View source
class MongoDBQueue implements DrupalQueueInterface {

  /**
   * The collection name for the queue.
   *
   * @var string
   */
  protected $collection;

  /**
   * Start working with a queue.
   *
   * @param string $name
   *   The name of the queue.
   */
  public function __construct($name) {

    // @todo: make sure that $name is a valid collection name?
    $this->collection = 'queue.' . $name;
  }

  /**
   * Add a queue item and store it directly to the queue.
   *
   * @param object $data
   *   Arbitrary data to be associated with the new task in the queue.
   *
   * @return array|bool
   *   If the item was successfully created and added to the queue.
   *
   * @throws \MongoConnectionException
   * @throws \MongoCursorException
   * @throws \MongoCursorTimeoutException
   * @throws \MongoException
   */
  public function createItem($data) {
    $item = array(
      'data' => $data,
      'created' => time(),
      'expire' => 0,
    );
    return mongodb_collection($this->collection)
      ->insert($item, mongodb_default_write_options());
  }

  /**
   * Retrieve the number of items in the queue.
   *
   * @return int
   *   An integer estimate of the number of items in the queue.
   *
   * @throws \MongoConnectionException
   */
  public function numberOfItems() {
    return mongodb_collection($this->collection)
      ->count();
  }

  /**
   * Claim an item in the queue for processing.
   *
   * @param int $lease_time
   *   How long the processing is expected to take in seconds.
   *
   * @return object|bool
   *   On success we return an item object. If the queue is unable to claim
   *   an item it returns false.
   *
   * @throws \MongoConnectionException
   */
  public function claimItem($lease_time = 30) {
    $query = array(
      'expire' => 0,
    );
    $newobj = array(
      'expire' => time() + $lease_time,
    );
    $cmd = array(
      'findandmodify' => mongodb_collection_name($this->collection),
      'query' => $query,
      'update' => array(
        '$set' => $newobj,
      ),
      'sort' => array(
        'created' => 1,
      ),
    );
    if (($result = mongodb_collection($this->collection)->db
      ->command($cmd)) && $result['ok'] == 1 && !empty($result['value'])) {
      return (object) $result['value'];
    }
  }

  /**
   * Release an item that the worker could not process.
   *
   * @param object $item
   *   The item to release.
   *
   * @return bool
   *   Did the update succeed?
   *
   * @throws \MongoConnectionException
   * @throws \MongoCursorException
   */
  public function releaseItem($item) {
    return mongodb_collection($this->collection)
      ->update(array(
      '_id' => $item->_id,
    ), array(
      '$set' => array(
        'expire' => 0,
      ),
    ), mongodb_default_write_options());
  }

  /**
   * Delete a finished item from the queue.
   *
   * @param object $item
   *   The item to delete.
   *
   * @throws \MongoConnectionException
   * @throws \MongoCursorException
   * @throws \MongoCursorTimeoutException
   */
  public function deleteItem($item) {
    mongodb_collection($this->collection)
      ->remove(array(
      '_id' => $item->_id,
    ), mongodb_default_write_options());
  }

  /**
   * Create a queue.
   */
  public function createQueue() {

    // Create the index.
    mongodb_collection($this->collection)
      ->ensureIndex(array(
      'expire' => 1,
      'created' => 1,
    ));
  }

  /**
   * Delete a queue and every item in the queue.
   */
  public function deleteQueue() {
    mongodb_collection($this->collection)
      ->drop();
  }

}

Members

Namesort descending Modifiers Type Description Overrides
MongoDBQueue::$collection protected property The collection name for the queue.
MongoDBQueue::claimItem public function Claim an item in the queue for processing. Overrides DrupalQueueInterface::claimItem
MongoDBQueue::createItem public function Add a queue item and store it directly to the queue. Overrides DrupalQueueInterface::createItem
MongoDBQueue::createQueue public function Create a queue. Overrides DrupalQueueInterface::createQueue
MongoDBQueue::deleteItem public function Delete a finished item from the queue. Overrides DrupalQueueInterface::deleteItem
MongoDBQueue::deleteQueue public function Delete a queue and every item in the queue. Overrides DrupalQueueInterface::deleteQueue
MongoDBQueue::numberOfItems public function Retrieve the number of items in the queue. Overrides DrupalQueueInterface::numberOfItems
MongoDBQueue::releaseItem public function Release an item that the worker could not process. Overrides DrupalQueueInterface::releaseItem
MongoDBQueue::__construct public function Start working with a queue.