View source
<?php
namespace Drupal\queue_example\Forms;
use Drupal\Component\Utility\Html;
use Drupal\Core\Queue\QueueGarbageCollectionInterface;
use Drupal\Core\CronInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\Form\FormBase;
use Drupal\Core\Form\FormStateInterface;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Site\Settings;
use Symfony\Component\DependencyInjection\ContainerInterface;
class QueueExampleForm extends FormBase {
protected $queueFactory;
protected $database;
protected $cron;
protected $queueType;
public function __construct(QueueFactory $queue_factory, Connection $database, CronInterface $cron, Settings $settings) {
$this->queueFactory = $queue_factory;
$this->queueType = $settings
->get('queue_default', 'queue.database');
$this->database = $database;
$this->cron = $cron;
}
public static function create(ContainerInterface $container) {
$form = new static($container
->get('queue'), $container
->get('database'), $container
->get('cron'), $container
->get('settings'));
$form
->setMessenger($container
->get('messenger'));
return $form;
}
public function getFormId() {
return 'queue_example';
}
public function buildForm(array $form, FormStateInterface $form_state) {
if (empty($form_state
->get('insert_counter'))) {
$form_state
->set('insert_counter', 1);
}
$queue_name = $form_state
->getValue('queue_name') ?: 'queue_example_first_queue';
$items = $this
->retrieveQueue($queue_name);
$form['help'] = [
'#type' => 'markup',
'#markup' => '<div>' . $this
->t('This page is an interface on the Drupal queue API. You can add new items to the queue, "claim" one (retrieve the next item and keep a lock on it), and delete one (remove it from the queue). Note that claims are not expired until cron runs, so there is a special button to run cron to perform any necessary expirations.') . '</div>',
];
$form['wrong_queue_warning'] = [
'#type' => 'markup',
'#markup' => '<div>' . $this
->t('Note: the example works only with the default queue implementation, which is not currently configured!!') . '</div>',
'#access' => !$this
->doesQueueUseDB(),
];
$queue_names = [
'queue_example_first_queue',
'queue_example_second_queue',
];
$form['queue_name'] = [
'#type' => 'select',
'#title' => $this
->t('Choose queue to examine'),
'#options' => array_combine($queue_names, $queue_names),
'#default_value' => $queue_name,
];
$form['queue_show'] = [
'#type' => 'submit',
'#value' => $this
->t('Show queue'),
'#submit' => [
'::submitShowQueue',
],
];
$form['status_fieldset'] = [
'#type' => 'fieldset',
'#title' => $this
->t('Queue status for @name', [
'@name' => $queue_name,
]),
'#collapsible' => TRUE,
];
if (count($items) > 0) {
$form['status_fieldset']['status'] = [
'#theme' => 'table',
'#header' => [
$this
->t('Item ID'),
$this
->t('Claimed/Expiration'),
$this
->t('Created'),
$this
->t('Content/Data'),
],
'#rows' => array_map([
$this,
'processQueueItemForTable',
], $items),
];
}
else {
$form['status_fieldset']['status'] = [
'#type' => 'markup',
'#markup' => $this
->t('There are no items in the queue.'),
];
}
$form['insert_fieldset'] = [
'#type' => 'fieldset',
'#title' => $this
->t('Insert into @name', [
'@name' => $queue_name,
]),
];
$form['insert_fieldset']['string_to_add'] = [
'#type' => 'textfield',
'#size' => 10,
'#default_value' => $this
->t('item @counter', [
'@counter' => $form_state
->get('insert_counter'),
]),
];
$form['insert_fieldset']['add_item'] = [
'#type' => 'submit',
'#value' => $this
->t('Insert into queue'),
'#submit' => [
'::submitAddQueueItem',
],
];
$form['claim_fieldset'] = [
'#type' => 'fieldset',
'#title' => $this
->t('Claim from queue'),
'#collapsible' => TRUE,
];
$form['claim_fieldset']['claim_time'] = [
'#type' => 'radios',
'#title' => $this
->t('Claim time, in seconds'),
'#options' => [
0 => $this
->t('none'),
5 => $this
->t('5 seconds'),
60 => $this
->t('60 seconds'),
],
'#description' => $this
->t('This time is only valid if cron runs during this time period. You can run cron manually below.'),
'#default_value' => $form_state
->getValue('claim_time') ?: 5,
];
$form['claim_fieldset']['claim_item'] = [
'#type' => 'submit',
'#value' => $this
->t('Claim the next item from the queue'),
'#submit' => [
'::submitClaimItem',
],
];
$form['claim_fieldset']['claim_and_delete_item'] = [
'#type' => 'submit',
'#value' => $this
->t('Claim the next item and delete it'),
'#submit' => [
'::submitClaimDeleteItem',
],
];
$form['claim_fieldset']['run_cron'] = [
'#type' => 'submit',
'#value' => $this
->t('Run cron manually to expire claims'),
'#submit' => [
'::submitRunCron',
],
];
$form['delete_queue'] = [
'#type' => 'submit',
'#value' => $this
->t('Delete the queue and items in it'),
'#submit' => [
'::submitDeleteQueue',
],
];
return $form;
}
public function submitForm(array &$form, FormStateInterface $form_state) {
}
public function retrieveQueue($queue_name) {
$items = [];
if (!$this
->doesQueueUseDb()) {
return $items;
}
if ($this->queueFactory
->get($queue_name)
->numberOfItems() >= 1) {
$result = $this->database
->query('SELECT item_id, data, expire, created FROM {' . DatabaseQueue::TABLE_NAME . '} WHERE name = :name ORDER BY item_id', [
':name' => $queue_name,
], [
'fetch' => \PDO::FETCH_ASSOC,
]);
foreach ($result as $item) {
$items[] = $item;
}
}
return $items;
}
protected function doesQueueUseDb() {
return $this->queueType == 'queue.database';
}
public function submitShowQueue(array &$form, FormStateInterface $form_state) {
$queue = $this->queueFactory
->get($form_state
->getValue('queue_name'));
$queue
->createQueue();
$count = $queue
->numberOfItems();
$form_state
->set('insert_counter', $count + 1);
$form_state
->unsetValue('string_to_add');
$form_state
->setRebuild();
}
public function submitAddQueueItem(array &$form, FormStateInterface $form_state) {
$queue = $this->queueFactory
->get($form_state
->getValue('queue_name'));
$queue
->createQueue();
$queue
->createItem($form_state
->getValue('string_to_add'));
$count = $queue
->numberOfItems();
$this
->messenger()
->addMessage($this
->t('Queued your string (@string_to_add). There are now @count items in the queue.', [
'@count' => $count,
'@string_to_add' => $form_state
->getValue('string_to_add'),
]));
$form_state
->setRebuild();
$form_state
->unsetValue('string_to_add');
$form_state
->set('insert_counter', $count + 1);
}
public function submitClaimItem(array &$form, FormStateInterface $form_state) {
$queue = $this->queueFactory
->get($form_state
->getValue('queue_name'));
$queue
->createQueue();
$item = $queue
->claimItem($form_state
->getValue('claim_time'));
$count = $queue
->numberOfItems();
if (!empty($item)) {
$this
->messenger()
->addMessage($this
->t('Claimed item id=@item_id string=@string for @seconds seconds. There are @count items in the queue.', [
'@count' => $count,
'@item_id' => $item->item_id,
'@string' => $item->data,
'@seconds' => $form_state
->getValue('claim_time'),
]));
}
else {
$this
->messenger()
->addMessage($this
->t('There were no items in the queue available to claim. There are @count items in the queue.', [
'@count' => $count,
]));
}
$form_state
->setRebuild();
}
public function submitClaimDeleteItem(array &$form, FormStateInterface $form_state) {
$queue = $this->queueFactory
->get($form_state
->getValue('queue_name'));
$queue
->createQueue();
$count = $queue
->numberOfItems();
$item = $queue
->claimItem(60);
if (!empty($item)) {
$this
->messenger()
->addMessage($this
->t('Claimed and deleted item id=@item_id string=@string for @seconds seconds. There are @count items in the queue.', [
'@count' => $count,
'@item_id' => $item->item_id,
'@string' => $item->data,
'@seconds' => $form_state
->getValue('claim_time'),
]));
$queue
->deleteItem($item);
$count = $queue
->numberOfItems();
$this
->messenger()
->addMessage($this
->t('There are now @count items in the queue.', [
'@count' => $count,
]));
}
else {
$count = $queue
->numberOfItems();
$this
->messenger()
->addMessage($this
->t('There were no items in the queue available to claim/delete. There are currently @count items in the queue.', [
'@count' => $count,
]));
}
$form_state
->setRebuild();
}
public function submitRunCron(array &$form, FormStateInterface $form_state) {
$this->cron
->run();
$queue = $this->queueFactory
->get($form_state
->getValue('queue_name'));
if ($queue instanceof QueueGarbageCollectionInterface) {
$queue
->garbageCollection();
}
$queue
->createQueue();
$count = $queue
->numberOfItems();
$this
->messenger()
->addMessage($this
->t('Ran cron. If claimed items expired, they should be expired now. There are now @count items in the queue', [
'@count' => $count,
]));
$form_state
->setRebuild();
}
public function submitDeleteQueue(array &$form, FormStateInterface $form_state) {
$queue = $this->queueFactory
->get($form_state
->getValue('queue_name'));
$queue
->deleteQueue();
$this
->messenger()
->addMessage($this
->t('Deleted the @queue_name queue and all items in it', [
'@queue_name' => $form_state
->getValue('queue_name'),
]));
}
private function processQueueItemForTable(array $item) {
if ($item['expire'] > 0) {
$item['expire'] = $this
->t('Claimed: expires %expire', [
'%expire' => date('r', $item['expire']),
]);
}
else {
$item['expire'] = $this
->t('Unclaimed');
}
$item['created'] = date('r', $item['created']);
$item['content'] = Html::escape(unserialize($item['data']));
unset($item['data']);
return $item;
}
}