class DatabaseQueue in Purge 8.3
A QueueInterface compliant database backed queue.
Plugin annotation
@PurgeQueue(
id = "database",
label = @Translation("Database"),
description = @Translation("A scalable database backed queue."),
)
Hierarchy
- class \Drupal\Core\Queue\DatabaseQueue implements QueueGarbageCollectionInterface, ReliableQueueInterface uses DependencySerializationTrait
- class \Drupal\purge\Plugin\Purge\Queue\DatabaseQueue implements QueueInterface uses QueueBasePageTrait
Expanded class hierarchy of DatabaseQueue
File
- src/
Plugin/ Purge/ Queue/ DatabaseQueue.php, line 18
Namespace
Drupal\purge\Plugin\Purge\QueueView source
class DatabaseQueue extends CoreDatabaseQueue implements QueueInterface {
use QueueBasePageTrait;
/**
* The active Drupal database connection object.
*/
const TABLE_NAME = 'purge_queue';
/**
* Static boolean to determine if we've checked for table installation.
*
* @var bool
*/
protected $tableExists = FALSE;
/**
* Construct a DatabaseQueue object.
*
* @param \Drupal\Core\Database\Connection $connection
* The Connection object containing the key-value tables.
*/
public final function __construct(Connection $connection) {
parent::__construct('purge', $connection);
$this
->ensureTableExists();
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static($container
->get('database'));
}
/**
* {@inheritdoc}
*/
public function createItem($data) {
$query = $this->connection
->insert(static::TABLE_NAME)
->fields([
'data' => serialize($data),
'created' => time(),
]);
if ($id = $query
->execute()) {
return (int) $id;
}
return FALSE;
}
/**
* {@inheritdoc}
*/
public function createItemMultiple(array $items) {
$item_ids = $records = [];
// Build a array with all exactly records as they should turn into rows.
$time = time();
foreach ($items as $data) {
$records[] = [
'data' => serialize($data),
'created' => $time,
];
}
// Insert all of them using just one multi-row query.
$query = $this->connection
->insert(static::TABLE_NAME, [])
->fields([
'data',
'created',
]);
foreach ($records as $record) {
$query
->values($record);
}
// Execute the query and finish the call.
if ($id = $query
->execute()) {
$id = (int) $id;
// A multiple row-insert doesn't give back all the individual IDs, so
// calculate them back by applying subtraction.
for ($i = 1; $i <= count($records); $i++) {
$item_ids[] = $id;
$id++;
}
return $item_ids;
}
else {
return FALSE;
}
}
/**
* {@inheritdoc}
*/
public function numberOfItems() {
$query = $this->connection
->select(static::TABLE_NAME);
$query
->addExpression('COUNT(*)');
return (int) $query
->execute()
->fetchField();
}
/**
* {@inheritdoc}
*
* @todo
* \Drupal\Core\Queue\DatabaseQueue::claimItem() doesn't included expired
* items in its query which means that its essentially broken and makes our
* tests fail. Therefore we overload the implementation with one that does
* it accurately. However, this should flow back to core.
*/
public function claimItem($lease_time = 3600) {
// Claim an item by updating its expire fields. If claim is not successful
// another thread may have claimed the item in the meantime. Therefore loop
// until an item is successfully claimed or we are reasonably sure there
// are no unclaimed items left.
while (TRUE) {
$conditions = [
':now' => time(),
];
$item = $this->connection
->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE ((expire = 0) OR (:now > expire)) ORDER BY created, item_id ASC', 0, 1, $conditions)
->fetchObject();
if ($item) {
$item->item_id = (int) $item->item_id;
$item->expire = (int) $item->expire;
// Try to update the item. Only one thread can succeed in UPDATEing the
// same row. We cannot rely on REQUEST_TIME because items might be
// claimed by a single consumer which runs longer than 1 second. If we
// continue to use REQUEST_TIME instead of the current time(), we steal
// time from the lease, and will tend to reset items before the lease
// should really expire.
$update = $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => time() + $lease_time,
])
->condition('item_id', $item->item_id);
// If there are affected rows, this update succeeded.
if ($update
->execute()) {
$item->data = unserialize($item->data);
return $item;
}
}
else {
// No items currently available to claim.
return FALSE;
}
}
}
/**
* {@inheritdoc}
*/
public function claimItemMultiple($claims = 10, $lease_time = 3600) {
$returned_items = $item_ids = [];
// Retrieve all items in one query.
$conditions = [
':now' => time(),
];
$items = $this->connection
->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE ((expire = 0) OR (:now > expire)) ORDER BY created, item_id ASC', 0, $claims, $conditions);
// Iterate all returned items and unpack them.
foreach ($items as $item) {
if (!$item) {
continue;
}
$item_ids[] = $item->item_id;
$item->item_id = (int) $item->item_id;
$item->expire = (int) $item->expire;
$item->data = unserialize($item->data);
$returned_items[] = $item;
}
// Update the items (marking them claimed) in one query.
if (count($returned_items)) {
$this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => time() + $lease_time,
])
->condition('item_id', $item_ids, 'IN')
->execute();
}
// Return the generated items, whether its empty or not.
return $returned_items;
}
/**
* Implements \Drupal\Core\Queue\QueueInterface::releaseItem().
*/
public function releaseItem($item) {
return (bool) $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => 0,
'data' => serialize($item->data),
])
->condition('item_id', $item->item_id)
->execute();
}
/**
* {@inheritdoc}
*/
public function releaseItemMultiple(array $items) {
// Extract item IDs and serialized data so comparing becomes easier.
$items_data = [];
foreach ($items as $item) {
$items_data[intval($item->item_id)] = serialize($item->data);
}
// Figure out which items have changed their data and update just those.
$originals = $this->connection
->select(static::TABLE_NAME, 'q')
->fields('q', [
'item_id',
'data',
])
->condition('item_id', array_keys($items_data), 'IN')
->execute();
foreach ($originals as $original) {
$item_id = intval($original->item_id);
if ($original->data !== $items_data[$item_id]) {
$this->connection
->update(static::TABLE_NAME)
->fields([
'data' => $items_data[$item_id],
])
->condition('item_id', $item_id)
->execute();
}
}
// Update the lease time in one single query and resolve what to return.
$update = $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => 0,
])
->condition('item_id', array_keys($items_data), 'IN')
->execute();
if ($update) {
return [];
}
else {
return $items;
}
}
/**
* {@inheritdoc}
*/
public function deleteItemMultiple(array $items) {
$item_ids = [];
foreach ($items as $item) {
$item_ids[] = $item->item_id;
}
$this->connection
->delete(static::TABLE_NAME)
->condition('item_id', $item_ids, 'IN')
->execute();
}
/**
* {@inheritdoc}
*/
public function createQueue() {
// All tasks are stored in a single database table (which is created when
// this class instantiates) so there is nothing we need to do to create
// a new queue.
}
/**
* {@inheritdoc}
*/
public function deleteQueue() {
$this->connection
->delete(static::TABLE_NAME)
->execute();
}
/**
* {@inheritdoc}
*/
protected function ensureTableExists() {
// Wrap ::ensureTableExists() to prevent expensive duplicate code paths.
if (!$this->tableExists) {
if (parent::ensureTableExists()) {
$this->tableExists = TRUE;
return TRUE;
}
}
return $this->tableExists;
}
/**
* {@inheritdoc}
*/
public function schemaDefinition() {
// Reuse core's schema as was around Drupal 8.1.7. However, we are in no way
// fully depending on core and can - when required - hardcode the full
// schema if core decided to change it significantly.
$schema = parent::schemaDefinition();
unset($schema['fields']['name']);
unset($schema['indexes']['name_created']);
$schema['description'] = "Queue items for the purge database queue plugin.";
$schema['indexes']['created'] = [
'created',
];
return $schema;
}
/**
* {@inheritdoc}
*/
public function selectPage($page = 1) {
if ($page < 1 || !is_int($page)) {
throw new \LogicException('Parameter $page has to be a positive integer.');
}
$items = [];
$limit = $this
->selectPageLimit();
$resultset = $this->connection
->select(static::TABLE_NAME, 'q')
->fields('q', [
'item_id',
'expire',
'data',
])
->orderBy('q.created', 'DESC')
->range(($page - 1) * $limit, $limit)
->execute();
foreach ($resultset as $item) {
if (!$item) {
continue;
}
$item->item_id = (int) $item->item_id;
$item->expire = (int) $item->expire;
$item->data = unserialize($item->data);
$items[] = $item;
}
return $items;
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
DatabaseQueue:: |
protected | property | The database connection. | |
DatabaseQueue:: |
protected | property | The name of the queue this instance is working with. | |
DatabaseQueue:: |
protected | property | Static boolean to determine if we've checked for table installation. | |
DatabaseQueue:: |
protected | function | Act on an exception when queue might be stale. | |
DatabaseQueue:: |
public | function |
@todo
\Drupal\Core\Queue\DatabaseQueue::claimItem() doesn't included expired
items in its query which means that its essentially broken and makes our
tests fail. Therefore we overload the implementation with one that does
it accurately. However,… Overrides DatabaseQueue:: |
|
DatabaseQueue:: |
public | function |
Claims multiple items from the queue for processing. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public static | function |
Creates an instance of the plugin. Overrides ContainerFactoryPluginInterface:: |
|
DatabaseQueue:: |
public | function |
Adds a queue item and store it directly to the queue. Overrides DatabaseQueue:: |
|
DatabaseQueue:: |
public | function |
Add multiple items to the queue and store them efficiently. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function |
Creates a queue. Overrides DatabaseQueue:: |
|
DatabaseQueue:: |
public | function |
Deletes a finished item from the queue. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function |
Delete multiple items from the queue at once. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function |
Deletes a queue and every item in the queue. Overrides DatabaseQueue:: |
|
DatabaseQueue:: |
protected | function | Adds a queue item and store it directly to the queue. | |
DatabaseQueue:: |
protected | function |
Check if the table exists and create it if not. Overrides DatabaseQueue:: |
|
DatabaseQueue:: |
public | function |
Cleans queues of garbage. Overrides QueueGarbageCollectionInterface:: |
|
DatabaseQueue:: |
public | function |
Retrieves the number of items in the queue. Overrides DatabaseQueue:: |
|
DatabaseQueue:: |
public | function |
Implements \Drupal\Core\Queue\QueueInterface::releaseItem(). Overrides DatabaseQueue:: |
|
DatabaseQueue:: |
public | function |
Release multiple items that the worker could not process. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function |
Defines the schema for the queue table. Overrides DatabaseQueue:: |
|
DatabaseQueue:: |
public | function |
Select a page of queue data with a limited number of items. Overrides QueueInterface:: |
|
DatabaseQueue:: |
constant |
The active Drupal database connection object. Overrides DatabaseQueue:: |
||
DatabaseQueue:: |
final public | function |
Construct a DatabaseQueue object. Overrides DatabaseQueue:: |
|
DependencySerializationTrait:: |
protected | property | An array of entity type IDs keyed by the property name of their storages. | |
DependencySerializationTrait:: |
protected | property | An array of service IDs keyed by property name used for serialization. | |
DependencySerializationTrait:: |
public | function | 1 | |
DependencySerializationTrait:: |
public | function | 2 | |
QueueBasePageTrait:: |
protected | property | The configured limit of items on selected data pages. | |
QueueBasePageTrait:: |
public | function | ||
QueueBasePageTrait:: |
public | function |