class AcquiaPurgeQueueEfficient in Acquia Purge 7
Efficient query bundling database queue.
Enriches SystemQueue with methods defined in AcquiaPurgeQueueInterface which attempt to reduce database communication as much as possible. By bundling items into single queries, total queries and roundtrips reduce drastically!
Hierarchy
- class \SystemQueue implements DrupalReliableQueueInterface
- class \AcquiaPurgeQueueEfficient implements AcquiaPurgeQueueInterface
Expanded class hierarchy of AcquiaPurgeQueueEfficient
File
- lib/
queue/ AcquiaPurgeQueueEfficient.php, line 15 - Contains EfficientQueue.
View source
class AcquiaPurgeQueueEfficient extends SystemQueue implements AcquiaPurgeQueueInterface {
/**
* The queue item class to spawn queue items with.
*
* @var string
*/
protected $class_queue_item;
/**
* The state storage which holds the counter state items.
*
* @var AcquiaPurgeStateStorageInterface
*/
protected $state;
/**
* Construct a AcquiaPurgeQueueEfficient instance.
*
* @param AcquiaPurgeStateStorageInterface $state
* The state storage required for the queue counters.
*/
public function __construct(AcquiaPurgeStateStorageInterface $state) {
$this->state = $state;
$this->class_queue_item = _acquia_purge_load(array(
'_acquia_purge_queue_item_interface',
'_acquia_purge_queue_item',
));
parent::__construct('acquia_purge');
}
/**
* {@inheritdoc}
*/
public function bad() {
return $this->state
->getCounter('qbad');
}
/**
* {@inheritdoc}
*/
public function good() {
return $this->state
->getCounter('qgood');
}
/**
* {@inheritdoc}
*/
public function total() {
return $this->state
->getCounter('qtotal');
}
/**
* {@inheritdoc}
*
* SystemQueue::claimItem() doesn't included expired items in its query
* which means that it essentially breaks its own interface promise. Therefore
* we overload the implementation with one that does do this accurately. This
* should however flow back to core, which I'm doing as part of my D8 work.
*/
public function claimItem($lease_time = 30) {
// Claim an item by updating its expire fields. If claim is not successful
// another thread may have claimed the item in the meantime. Therefore loop
// until an item is successfully claimed or we are reasonably sure there
// are no unclaimed items left.
while (TRUE) {
$conditions = array(
':name' => $this->name,
':now' => time(),
);
$item = db_query_range('SELECT * FROM {queue} q
WHERE name = :name AND ((expire = 0) OR (:now > expire))
ORDER BY created, item_id
ASC', 0, 1, $conditions)
->fetchObject();
if ($item) {
$item = new $this->class_queue_item((int) $item->created, unserialize($item->data), (int) $item->expire, (int) $item->item_id);
// Try to update the item. Only one thread can succeed in UPDATEing the
// same row. We cannot rely on REQUEST_TIME because items might be
// claimed by a single consumer which runs longer than 1 second. If we
// continue to use REQUEST_TIME instead of the current time(), we steal
// time from the lease, and will tend to reset items before the lease
// should really expire.
$update = db_update('queue')
->fields(array(
'expire' => time() + $lease_time,
))
->condition('item_id', $item->item_id);
// If there are affected rows, this update succeeded.
if ($update
->execute()) {
return $item;
}
}
else {
// No items currently available to claim.
return FALSE;
}
}
}
/**
* {@inheritdoc}
*/
public function claimItemMultiple($claims = 10, $lease_time = 30) {
$returned_items = $item_ids = array();
// Retrieve all items in one query.
$conditions = array(
':name' => $this->name,
':now' => time(),
);
$items = db_query_range('SELECT * FROM {queue} q
WHERE name = :name AND ((expire = 0) OR (:now > expire))
ORDER BY created, item_id
ASC', 0, $claims, $conditions);
// Iterate all returned items and unpack them.
while ($item = $items
->fetchObject()) {
$item_ids[] = (int) $item->item_id;
$returned_items[] = new $this->class_queue_item((int) $item->created, unserialize($item->data), (int) $item->expire, (int) $item->item_id);
}
// Update the items (marking them claimed) in one query.
if (count($returned_items)) {
db_update('queue')
->fields(array(
'expire' => time() + $lease_time,
))
->condition('item_id', $item_ids, 'IN')
->execute();
}
// Return the generated items, whether its empty or not.
return $returned_items;
}
/**
* {@inheritdoc}
*/
public function createItem($data) {
if (parent::createItem($data)) {
$this
->total()
->increase();
return TRUE;
}
return FALSE;
}
/**
* {@inheritdoc}
*/
public function createItemMultiple(array $items) {
$records = array();
// Build a array with all exactly records as they should turn into rows.
$time = time();
foreach ($items as $data) {
$records[] = array(
'name' => $this->name,
'data' => serialize($data),
'created' => $time,
);
}
// Insert all of them using just one multi-row query.
$query = db_insert('queue')
->fields(array(
'name',
'data',
'created',
));
foreach ($records as $record) {
$query
->values($record);
}
// Execute the query and finish the call.
if ($query
->execute()) {
$this
->total()
->increase(count($records));
return TRUE;
}
else {
return FALSE;
}
}
/**
* {@inheritdoc}
*/
public function deleteItemMultiple(array $items) {
if (empty($items)) {
return;
}
$item_ids = array();
foreach ($items as $item) {
$item_ids[] = $item->item_id;
}
db_delete('queue')
->condition('item_id', $item_ids, 'IN')
->execute();
$this
->good()
->increase(count($item_ids));
}
/**
* {@inheritdoc}
*/
public function deleteQueue() {
parent::deleteQueue();
$this
->total()
->set(0);
$this
->bad()
->set(0);
$this
->good()
->set(0);
}
/**
* {@inheritdoc}
*/
public function numberOfItems() {
return (int) parent::numberOfItems();
}
/**
* {@inheritdoc}
*/
public function releaseItemMultiple(array $items) {
if (empty($items)) {
return array();
}
$item_ids = array();
foreach ($items as $item) {
$item_ids[] = $item->item_id;
}
$update = db_update('queue')
->fields(array(
'expire' => 0,
))
->condition('item_id', $item_ids, 'IN')
->execute();
if ($update) {
$this
->bad()
->increase(count($item_ids));
return array();
}
else {
return $items;
}
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
AcquiaPurgeQueueEfficient:: |
protected | property | The queue item class to spawn queue items with. | |
AcquiaPurgeQueueEfficient:: |
protected | property | The state storage which holds the counter state items. | |
AcquiaPurgeQueueEfficient:: |
public | function |
Retrieve the failed purges counter. Overrides AcquiaPurgeQueueInterface:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
SystemQueue::claimItem() doesn't included expired items in its query
which means that it essentially breaks its own interface promise. Therefore
we overload the implementation with one that does do this accurately. This
should however flow back… Overrides SystemQueue:: |
1 |
AcquiaPurgeQueueEfficient:: |
public | function |
Claims multiple items from the queue for processing. Overrides AcquiaPurgeQueueInterface:: |
1 |
AcquiaPurgeQueueEfficient:: |
public | function |
Add a queue item and store it directly to the queue. Overrides SystemQueue:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
Add multiple items to the queue and store them efficiently. Overrides AcquiaPurgeQueueInterface:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
Delete multiple items from the queue at once. Overrides AcquiaPurgeQueueInterface:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
Delete a queue and every item in the queue. Overrides SystemQueue:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
Retrieve the successful purges counter. Overrides AcquiaPurgeQueueInterface:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
Retrieve the number of items in the queue. Overrides SystemQueue:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
Release multiple items that the worker could not process. Overrides AcquiaPurgeQueueInterface:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
Retrieve the total queue items counter. Overrides AcquiaPurgeQueueInterface:: |
|
AcquiaPurgeQueueEfficient:: |
public | function |
Construct a AcquiaPurgeQueueEfficient instance. Overrides SystemQueue:: |
1 |
SystemQueue:: |
protected | property | The name of the queue this instance is working with. | |
SystemQueue:: |
public | function |
Create a queue. Overrides DrupalQueueInterface:: |
|
SystemQueue:: |
public | function |
Delete a finished item from the queue. Overrides DrupalQueueInterface:: |
|
SystemQueue:: |
public | function |
Release an item that the worker could not process, so another
worker can come in and process it before the timeout expires. Overrides DrupalQueueInterface:: |