class MongoDBQueue in MongoDB 7
Hierarchy
- class \MongoDBQueue implements DrupalQueueInterface
Expanded class hierarchy of MongoDBQueue
File
- mongodb_queue/
mongodb_queue.inc, line 12 - Contains \MongoDBQueue.
View source
class MongoDBQueue implements DrupalQueueInterface {
/**
* The collection name for the queue.
*
* @var string
*/
protected $collection;
/**
* Start working with a queue.
*
* @param string $name
* The name of the queue.
*/
public function __construct($name) {
// @todo: make sure that $name is a valid collection name?
$this->collection = 'queue.' . $name;
}
/**
* Add a queue item and store it directly to the queue.
*
* @param object $data
* Arbitrary data to be associated with the new task in the queue.
*
* @return array|bool
* If the item was successfully created and added to the queue.
*
* @throws \MongoConnectionException
* @throws \MongoCursorException
* @throws \MongoCursorTimeoutException
* @throws \MongoException
*/
public function createItem($data) {
$item = array(
'data' => $data,
'created' => time(),
'expire' => 0,
);
return mongodb_collection($this->collection)
->insert($item, mongodb_default_write_options());
}
/**
* Retrieve the number of items in the queue.
*
* @return int
* An integer estimate of the number of items in the queue.
*
* @throws \MongoConnectionException
*/
public function numberOfItems() {
return mongodb_collection($this->collection)
->count();
}
/**
* Claim an item in the queue for processing.
*
* @param int $lease_time
* How long the processing is expected to take in seconds.
*
* @return object|bool
* On success we return an item object. If the queue is unable to claim
* an item it returns false.
*
* @throws \MongoConnectionException
*/
public function claimItem($lease_time = 30) {
$query = array(
'expire' => 0,
);
$newobj = array(
'expire' => time() + $lease_time,
);
$cmd = array(
'findandmodify' => mongodb_collection_name($this->collection),
'query' => $query,
'update' => array(
'$set' => $newobj,
),
'sort' => array(
'created' => 1,
),
);
if (($result = mongodb_collection($this->collection)->db
->command($cmd)) && $result['ok'] == 1 && !empty($result['value'])) {
return (object) $result['value'];
}
}
/**
* Release an item that the worker could not process.
*
* @param object $item
* The item to release.
*
* @return bool
* Did the update succeed?
*
* @throws \MongoConnectionException
* @throws \MongoCursorException
*/
public function releaseItem($item) {
return mongodb_collection($this->collection)
->update(array(
'_id' => $item->_id,
), array(
'$set' => array(
'expire' => 0,
),
), mongodb_default_write_options());
}
/**
* Delete a finished item from the queue.
*
* @param object $item
* The item to delete.
*
* @throws \MongoConnectionException
* @throws \MongoCursorException
* @throws \MongoCursorTimeoutException
*/
public function deleteItem($item) {
mongodb_collection($this->collection)
->remove(array(
'_id' => $item->_id,
), mongodb_default_write_options());
}
/**
* Create a queue.
*/
public function createQueue() {
// Create the index.
mongodb_collection($this->collection)
->ensureIndex(array(
'expire' => 1,
'created' => 1,
));
}
/**
* Delete a queue and every item in the queue.
*/
public function deleteQueue() {
mongodb_collection($this->collection)
->drop();
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
MongoDBQueue:: |
protected | property | The collection name for the queue. | |
MongoDBQueue:: |
public | function |
Claim an item in the queue for processing. Overrides DrupalQueueInterface:: |
|
MongoDBQueue:: |
public | function |
Add a queue item and store it directly to the queue. Overrides DrupalQueueInterface:: |
|
MongoDBQueue:: |
public | function |
Create a queue. Overrides DrupalQueueInterface:: |
|
MongoDBQueue:: |
public | function |
Delete a finished item from the queue. Overrides DrupalQueueInterface:: |
|
MongoDBQueue:: |
public | function |
Delete a queue and every item in the queue. Overrides DrupalQueueInterface:: |
|
MongoDBQueue:: |
public | function |
Retrieve the number of items in the queue. Overrides DrupalQueueInterface:: |
|
MongoDBQueue:: |
public | function |
Release an item that the worker could not process. Overrides DrupalQueueInterface:: |
|
MongoDBQueue:: |
public | function | Start working with a queue. |