public function QueueWorker::queueCallback in Ultimate Cron 8.2
Cron callback for queue worker cron jobs.
File
- src/
QueueWorker.php, line 53
Class
- QueueWorker
- Defines the queue worker.
Namespace
Drupal\ultimate_cronCode
public function queueCallback(CronJobInterface $job) {
$queue_name = str_replace(CronJobInterface::QUEUE_ID_PREFIX, '', $job
->id());
$queue_manager = $this->pluginManagerQueueWorker;
$queue_factory = $this->queue;
$config = $this->configFactory
->get('ultimate_cron.settings');
$info = $queue_manager
->getDefinition($queue_name);
// Make sure every queue exists. There is no harm in trying to recreate
// an existing queue.
$queue_factory
->get($queue_name)
->createQueue();
/** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
$queue_worker = $queue_manager
->createInstance($queue_name);
$end = microtime(TRUE) + (isset($info['cron']['time']) ? $info['cron']['time'] : $config
->get('queue.timeouts.time'));
/** @var \Drupal\Core\Queue\QueueInterface $queue */
$queue = $queue_factory
->get($queue_name);
$items = 0;
while (microtime(TRUE) < $end) {
// Check kill signal.
if ($job
->getSignal('kill')) {
\Drupal::logger('ultimate_cron')
->warning('Kill signal received for job @job_id', [
'@job_id' => $job
->id(),
]);
break;
}
$item = $queue
->claimItem($config
->get('queue.timeouts.lease_time'));
// If there is no item, check the empty delay setting and wait if
// configured.
if (!$item) {
if ($config
->get('queue.delays.empty_delay')) {
usleep($config
->get('queue.delays.empty_delay') * 1000000);
continue;
}
else {
break;
}
}
try {
// We have an item, check if we need to wait.
if ($config
->get('queue.delays.item_delay')) {
if ($items == 0) {
// Move the boundary if using a throttle,
// to avoid waiting for nothing.
$end -= $config
->get('queue.delays.item_delay');
}
else {
// Sleep before retrieving.
usleep($config
->get('queue.delays.item_delay') * 1000000);
}
}
$queue_worker
->processItem($item->data);
$queue
->deleteItem($item);
$items++;
} catch (RequeueException $e) {
// The worker requested the task be immediately requeued.
$queue
->releaseItem($item);
} catch (SuspendQueueException $e) {
// If the worker indicates there is a problem with the whole queue,
// release the item and skip to the next queue.
$queue
->releaseItem($item);
watchdog_exception('cron', $e);
} catch (\Exception $e) {
// In case of any other kind of exception, log it and leave the item
// in the queue to be processed again later.
watchdog_exception('ultimate_cron_queue', $e);
}
}
}