View source
<?php
namespace Drupal\purge\Plugin\Purge\Queue;
use Drupal\Component\Plugin\PluginManagerInterface;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\DestructableInterface;
use Drupal\purge\Logger\LoggerServiceInterface;
use Drupal\purge\ModifiableServiceBaseTrait;
use Drupal\purge\Plugin\Purge\Invalidation\InvalidationInterface;
use Drupal\purge\Plugin\Purge\Invalidation\InvalidationsServiceInterface;
use Drupal\purge\Plugin\Purge\Purger\PurgersServiceInterface;
use Drupal\purge\Plugin\Purge\Queue\Exception\UnexpectedServiceConditionException;
use Drupal\purge\Plugin\Purge\Queuer\QueuerInterface;
use Drupal\purge\ServiceBase;
class QueueService extends ServiceBase implements QueueServiceInterface, DestructableInterface {
use ModifiableServiceBaseTrait;
protected $buffer;
protected $configFactory;
protected $logger;
protected $purgeInvalidationFactory;
protected $purgeLogger;
protected $purgePurgers;
protected $purgeQueueStats;
protected $queue;
const DEFAULT_PLUGIN = 'database';
const FALLBACK_PLUGIN = 'null';
public function __construct(PluginManagerInterface $plugin_manager, LoggerServiceInterface $purge_logger, ConfigFactoryInterface $config_factory, TxBufferInterface $purge_queue_txbuffer, StatsTrackerInterface $purge_queue_stats, InvalidationsServiceInterface $purge_invalidation_factory, PurgersServiceInterface $purge_purgers) {
$this->pluginManager = $plugin_manager;
$this->purgeLogger = $purge_logger;
$this->configFactory = $config_factory;
$this->purgeInvalidationFactory = $purge_invalidation_factory;
$this->purgePurgers = $purge_purgers;
$this->purgeQueueStats = $purge_queue_stats;
$this->buffer = $purge_queue_txbuffer;
$this->logger = $this->purgeLogger
->get('queue');
}
public function add(QueuerInterface $queuer, array $invalidations) {
foreach ($invalidations as $invalidation) {
if (!$this->buffer
->has($invalidation)) {
$this->buffer
->set($invalidation, TxBuffer::ADDING);
}
}
$this->logger
->debug("@queuer: added @no items.", [
'@queuer' => $queuer
->getPluginId(),
'@no' => count($invalidations),
]);
}
public function claim($claims = NULL, $lease_time = NULL) {
$this
->commitAdding();
$this
->commitReleasing();
$this
->commitDeleting();
$tracker = $this->purgePurgers
->capacityTracker();
if (is_null($claims)) {
if (!($claims = $tracker
->getRemainingInvalidationsLimit())) {
$this->logger
->debug("no more items can be claimed within this request.");
return [];
}
}
if (is_null($lease_time)) {
$lease_time = $tracker
->getLeaseTimeHint($claims);
}
else {
$lease_time = $claims * $lease_time;
}
$this
->initializeQueue();
if ($claims === 1) {
if (!($item = $this->queue
->claimItem($lease_time))) {
$this->logger
->debug("attempt to claim 1 item failed.");
return [];
}
$items = [
$item,
];
}
elseif (!($items = $this->queue
->claimItemMultiple($claims, $lease_time))) {
$this->logger
->debug("attempt to claim @no items failed.", [
'@no' => $claims,
]);
return [];
}
foreach ($items as $i => $item) {
if (!($inv = $this->buffer
->getByProperty('item_id', $item->item_id))) {
$inv = $this->purgeInvalidationFactory
->getFromQueueData($item->data);
}
$this->buffer
->set($inv, TxBuffer::CLAIMED);
$this->buffer
->setProperty($inv, 'item_id', $item->item_id);
$this->buffer
->setProperty($inv, 'created', $item->created);
$items[$i] = $inv;
}
$this->logger
->debug("claimed @no items.", [
'@no' => count($items),
]);
return $items;
}
public function commit($sync_stat = TRUE) {
if (!count($this->buffer)) {
return;
}
$this
->commitAdding();
$this
->commitReleasing();
$this
->commitDeleting();
if ($sync_stat) {
$this->purgeQueueStats
->numberOfItems()
->set($this
->numberOfItems());
}
}
private function commitAdding() {
$items = $this->buffer
->getFiltered(TxBuffer::ADDING);
if (!($items_count = count($items))) {
return;
}
$this
->initializeQueue();
$getProxiedData = function ($invalidation) {
$proxy = new ProxyItem($invalidation, $this->buffer);
return $proxy->data;
};
if ($items_count === 1) {
$invalidation = current($items);
if (!($id = $this->queue
->createItem($getProxiedData($invalidation)))) {
throw new UnexpectedServiceConditionException("The queue returned FALSE on createItem().");
}
else {
$this->buffer
->set($invalidation, TxBuffer::RELEASED);
$this->buffer
->setProperty($invalidation, 'item_id', $id);
$this->buffer
->setProperty($invalidation, 'created', time());
}
}
else {
$item_chunks = array_chunk($items, 1000);
if ($item_chunks) {
foreach ($item_chunks as $chunk) {
$data_items = [];
foreach ($chunk as $invalidation) {
$data_items[] = $getProxiedData($invalidation);
}
if (!($ids = $this->queue
->createItemMultiple($data_items))) {
throw new UnexpectedServiceConditionException("The queue returned FALSE on createItemMultiple().");
}
foreach ($chunk as $key => $invalidation) {
$this->buffer
->set($invalidation, TxBuffer::ADDED);
$this->buffer
->setProperty($invalidation, 'item_id', $ids[$key]);
$this->buffer
->setProperty($invalidation, 'created', time());
}
}
}
}
$this->purgeQueueStats
->updateTotals($items);
}
private function commitReleasing() {
$items = $this->buffer
->getFiltered(TxBuffer::RELEASING);
if (!($items_count = count($items))) {
return;
}
$this
->initializeQueue();
if ($items_count === 1) {
$invalidation = current($items);
$this->queue
->releaseItem(new ProxyItem($invalidation, $this->buffer));
$this->buffer
->set($invalidation, TxBuffer::RELEASED);
}
else {
$proxyitems = [];
foreach ($items as $item) {
$proxyitems[] = new ProxyItem($item, $this->buffer);
}
$this->queue
->releaseItemMultiple($proxyitems);
$this->buffer
->set($items, TxBuffer::RELEASED);
}
$this->purgeQueueStats
->updateTotals($items);
}
private function commitDeleting() {
$items = $this->buffer
->getFiltered(TxBuffer::DELETING);
if (!($items_count = count($items))) {
return;
}
$this
->initializeQueue();
if ($items_count === 1) {
$invalidation = current($items);
$this->queue
->deleteItem(new ProxyItem($invalidation, $this->buffer));
$this->buffer
->delete($invalidation);
}
else {
$proxyitems = [];
foreach ($items as $item) {
$proxyitems[] = new ProxyItem($item, $this->buffer);
}
$this->queue
->deleteItemMultiple($proxyitems);
$this->buffer
->delete($items);
}
$this->purgeQueueStats
->updateTotals($items);
}
public function delete(array $invalidations) {
$this->buffer
->set($invalidations, TxBuffer::DELETING);
$count = count($invalidations);
$this->logger
->debug("deleting @no items.", [
'@no' => $count,
]);
}
public function destruct() {
$this
->commit();
}
public function emptyQueue() {
$this
->initializeQueue();
$this->buffer
->deleteEverything();
$this->queue
->deleteQueue();
$this->purgeQueueStats
->numberOfItems()
->set(0);
$this->logger
->debug("emptied the queue.");
}
public function getDescription() {
return $this
->getPlugins()[current($this
->getPluginsEnabled())]['description'];
}
public function getLabel() {
return $this
->getPlugins()[current($this
->getPluginsEnabled())]['label'];
}
public function getPlugins() {
if (is_null($this->plugins)) {
$this->plugins = $this->pluginManager
->getDefinitions();
unset($this->plugins[self::FALLBACK_PLUGIN]);
}
return $this->plugins;
}
public function getPluginsEnabled() {
if (is_null($this->pluginsEnabled)) {
$plugin_ids = array_keys($this
->getPlugins());
$this->pluginsEnabled = [];
$plugin_id = $this->configFactory
->get('purge.plugins')
->get('queue');
if (is_null($plugin_id)) {
$this->pluginsEnabled[] = self::DEFAULT_PLUGIN;
}
elseif (!in_array($plugin_id, $plugin_ids)) {
$this->pluginsEnabled[] = self::FALLBACK_PLUGIN;
}
else {
$this->pluginsEnabled[] = $plugin_id;
}
}
return $this->pluginsEnabled;
}
protected function initializeQueue() {
if (!is_null($this->queue)) {
return;
}
$plugin_id = current($this
->getPluginsEnabled());
$this->logger
->debug("backend: @plugin_id.", [
'@plugin_id' => $plugin_id,
]);
$this->queue = $this->pluginManager
->createInstance($plugin_id);
}
public function handleResults(array $invalidations) {
$counters = [
'succeeded' => 0,
'failed' => 0,
'new' => 0,
];
foreach ($invalidations as $invalidation) {
$invalidation
->setStateContext(NULL);
if ($invalidation
->getState() === InvalidationInterface::SUCCEEDED) {
$this->buffer
->set($invalidation, TxBuffer::DELETING);
$counters['succeeded']++;
}
else {
if (!$this->buffer
->has($invalidation)) {
$this->buffer
->set($invalidation, TxBuffer::ADDING);
$counters['new']++;
}
else {
$this->buffer
->set($invalidation, TxBuffer::RELEASING);
$counters['failed']++;
}
}
}
if ($this->logger
->isDebuggingEnabled()) {
$this->logger
->debug("handled @no returned items: @results", [
'@no' => count($invalidations),
'@results' => json_encode($counters),
]);
}
}
public function numberOfItems() {
$this
->commit(FALSE);
$this
->initializeQueue();
return $this->queue
->numberOfItems();
}
public function release(array $invalidations) {
$this->buffer
->set($invalidations, TxBuffer::RELEASING);
$this->logger
->debug("deleting @no items.", [
'@no' => count($invalidations),
]);
}
public function reload() {
parent::reload();
if (!is_null($this->queue)) {
$this
->commit(FALSE);
}
$this->buffer
->deleteEverything();
$this->configFactory = \Drupal::configFactory();
$this->queue = NULL;
}
public function selectPage($page = 1) {
$this
->initializeQueue();
$this
->commit();
$immutables = [];
foreach ($this->queue
->selectPage($page) as $item) {
$immutables[] = $this->purgeInvalidationFactory
->getImmutableFromQueueData($item->data);
}
return $immutables;
}
public function selectPageLimit($set_limit_to = NULL) {
$this
->initializeQueue();
return $this->queue
->selectPageLimit($set_limit_to);
}
public function selectPageMax() {
$this
->initializeQueue();
return $this->queue
->selectPageMax();
}
public function setPluginsEnabled(array $plugin_ids) {
if (count($plugin_ids) !== 1) {
throw new \LogicException('Incorrect number of arguments.');
}
$plugin_id = current($plugin_ids);
if (!isset($this->pluginManager
->getDefinitions()[$plugin_id])) {
throw new \LogicException('Invalid plugin_id.');
}
$this->configFactory
->getEditable('purge.plugins')
->set('queue', $plugin_id)
->save();
$this->logger
->debug("switched backend to @id.", [
'@id' => $plugin_id,
]);
$this
->reload();
$this
->emptyQueue();
}
}