View source
<?php
namespace Drupal\purge\Plugin\Purge\Queue;
use Drupal\Core\Database\Connection;
use Drupal\Core\Queue\DatabaseQueue as CoreDatabaseQueue;
use Symfony\Component\DependencyInjection\ContainerInterface;
class DatabaseQueue extends CoreDatabaseQueue implements QueueInterface {
use QueueBasePageTrait;
const TABLE_NAME = 'purge_queue';
protected $tableExists = FALSE;
public final function __construct(Connection $connection) {
parent::__construct('purge', $connection);
$this
->ensureTableExists();
}
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static($container
->get('database'));
}
public function createItem($data) {
$query = $this->connection
->insert(static::TABLE_NAME)
->fields([
'data' => serialize($data),
'created' => time(),
]);
if ($id = $query
->execute()) {
return (int) $id;
}
return FALSE;
}
public function createItemMultiple(array $items) {
$item_ids = $records = [];
$time = time();
foreach ($items as $data) {
$records[] = [
'data' => serialize($data),
'created' => $time,
];
}
$query = $this->connection
->insert(static::TABLE_NAME, [])
->fields([
'data',
'created',
]);
foreach ($records as $record) {
$query
->values($record);
}
if ($id = $query
->execute()) {
$id = (int) $id;
for ($i = 1; $i <= count($records); $i++) {
$item_ids[] = $id;
$id++;
}
return $item_ids;
}
else {
return FALSE;
}
}
public function numberOfItems() {
$query = $this->connection
->select(static::TABLE_NAME);
$query
->addExpression('COUNT(*)');
return (int) $query
->execute()
->fetchField();
}
public function claimItem($lease_time = 3600) {
while (TRUE) {
$conditions = [
':now' => time(),
];
$item = $this->connection
->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE ((expire = 0) OR (:now > expire)) ORDER BY created, item_id ASC', 0, 1, $conditions)
->fetchObject();
if ($item) {
$item->item_id = (int) $item->item_id;
$item->expire = (int) $item->expire;
$update = $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => time() + $lease_time,
])
->condition('item_id', $item->item_id);
if ($update
->execute()) {
$item->data = unserialize($item->data);
return $item;
}
}
else {
return FALSE;
}
}
}
public function claimItemMultiple($claims = 10, $lease_time = 3600) {
$returned_items = $item_ids = [];
$conditions = [
':now' => time(),
];
$items = $this->connection
->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE ((expire = 0) OR (:now > expire)) ORDER BY created, item_id ASC', 0, $claims, $conditions);
foreach ($items as $item) {
if (!$item) {
continue;
}
$item_ids[] = $item->item_id;
$item->item_id = (int) $item->item_id;
$item->expire = (int) $item->expire;
$item->data = unserialize($item->data);
$returned_items[] = $item;
}
if (count($returned_items)) {
$this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => time() + $lease_time,
])
->condition('item_id', $item_ids, 'IN')
->execute();
}
return $returned_items;
}
public function releaseItem($item) {
return (bool) $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => 0,
'data' => serialize($item->data),
])
->condition('item_id', $item->item_id)
->execute();
}
public function releaseItemMultiple(array $items) {
$items_data = [];
foreach ($items as $item) {
$items_data[intval($item->item_id)] = serialize($item->data);
}
$originals = $this->connection
->select(static::TABLE_NAME, 'q')
->fields('q', [
'item_id',
'data',
])
->condition('item_id', array_keys($items_data), 'IN')
->execute();
foreach ($originals as $original) {
$item_id = intval($original->item_id);
if ($original->data !== $items_data[$item_id]) {
$this->connection
->update(static::TABLE_NAME)
->fields([
'data' => $items_data[$item_id],
])
->condition('item_id', $item_id)
->execute();
}
}
$update = $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => 0,
])
->condition('item_id', array_keys($items_data), 'IN')
->execute();
if ($update) {
return [];
}
else {
return $items;
}
}
public function deleteItemMultiple(array $items) {
$item_ids = [];
foreach ($items as $item) {
$item_ids[] = $item->item_id;
}
$this->connection
->delete(static::TABLE_NAME)
->condition('item_id', $item_ids, 'IN')
->execute();
}
public function createQueue() {
}
public function deleteQueue() {
$this->connection
->delete(static::TABLE_NAME)
->execute();
}
protected function ensureTableExists() {
if (!$this->tableExists) {
if (parent::ensureTableExists()) {
$this->tableExists = TRUE;
return TRUE;
}
}
return $this->tableExists;
}
public function schemaDefinition() {
$schema = parent::schemaDefinition();
unset($schema['fields']['name']);
unset($schema['indexes']['name_created']);
$schema['description'] = "Queue items for the purge database queue plugin.";
$schema['indexes']['created'] = [
'created',
];
return $schema;
}
public function selectPage($page = 1) {
if ($page < 1 || !is_int($page)) {
throw new \LogicException('Parameter $page has to be a positive integer.');
}
$items = [];
$limit = $this
->selectPageLimit();
$resultset = $this->connection
->select(static::TABLE_NAME, 'q')
->fields('q', [
'item_id',
'expire',
'data',
])
->orderBy('q.created', 'DESC')
->range(($page - 1) * $limit, $limit)
->execute();
foreach ($resultset as $item) {
if (!$item) {
continue;
}
$item->item_id = (int) $item->item_id;
$item->expire = (int) $item->expire;
$item->data = unserialize($item->data);
$items[] = $item;
}
return $items;
}
}