ContentHubImportQueue.php in Acquia Content Hub 8.2
File
modules/acquia_contenthub_subscriber/src/ContentHubImportQueue.php
View source
<?php
namespace Drupal\acquia_contenthub_subscriber;
use Drupal\Core\DependencyInjection\DependencySerializationTrait;
use Drupal\Core\Entity\EntityStorageException;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueWorkerManager;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\StringTranslation\StringTranslationTrait;
class ContentHubImportQueue {
use StringTranslationTrait;
use DependencySerializationTrait;
protected $queue;
protected $queueManager;
public function __construct(QueueFactory $queue_factory, QueueWorkerManager $queue_manager) {
$this->queue = $queue_factory
->get('acquia_contenthub_subscriber_import');
$this->queueManager = $queue_manager;
}
public function getQueueCount() {
return $this->queue
->numberOfItems();
}
public function process() {
$batch = [
'title' => $this
->t('Process all entities to be imported'),
'operations' => [],
'finished' => [
[
$this,
'batchFinished',
],
[],
],
];
for ($i = 0; $i < $this
->getQueueCount(); $i++) {
$batch['operations'][] = [
[
$this,
'batchProcess',
],
[],
];
}
batch_set($batch);
}
public function batchProcess(&$context) {
$queueWorker = $this->queueManager
->createInstance('acquia_contenthub_subscriber_import');
if ($item = $this->queue
->claimItem()) {
try {
$queueWorker
->processItem($item->data);
$this->queue
->deleteItem($item);
} catch (SuspendQueueException $exception) {
$context['errors'][] = $exception
->getMessage();
$context['success'] = FALSE;
$this->queue
->releaseItem($item);
} catch (EntityStorageException $exception) {
$context['errors'][] = $exception
->getMessage();
$context['success'] = FALSE;
$this->queue
->releaseItem($item);
}
}
}
public static function batchFinished($success, array $result, array $operations) {
if ($success) {
\Drupal::messenger()
->addMessage(t("Processed all Content Hub entities."));
return;
}
$error_operation = reset($operations);
\Drupal::messenger()
->addMessage(t('An error occurred while processing @operation with arguments : @args', [
'@operation' => $error_operation[0],
'@args' => print_r($error_operation[0], TRUE),
]));
}
}