ContentHubImportQueue.php in Acquia Content Hub 8
File
src/Controller/ContentHubImportQueue.php
View source
<?php
namespace Drupal\acquia_contenthub\Controller;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Config\ImmutableConfig;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Entity\EntityStorageException;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\SuspendQueueException;
use Symfony\Component\DependencyInjection\ContainerInterface;
class ContentHubImportQueue extends ControllerBase {
protected $queueFactory;
protected $config;
public function __construct(ConfigFactoryInterface $config_factory, QueueFactory $queue_factory) {
$this->config = $config_factory;
$this->queueFactory = $queue_factory;
}
public static function create(ContainerInterface $container) {
return new static($container
->get('config.factory'), $container
->get('queue'));
}
public function process() {
$config = $this
->config('acquia_contenthub.entity_config');
$queue = $this->queueFactory
->get('acquia_contenthub_import_queue');
if ($queue
->numberOfItems() < 1) {
$this
->messenger()
->addStatus('No items to process.');
$this
->redirect('acquia_contenthub.import_queue');
}
$batch = [
'title' => $this
->t('Process all remaining entities'),
'operations' => [],
'finished' => [
self::class,
'batchFinished',
],
];
$batch_size = $config
->get('import_queue_batch_size');
$batch_size = !empty($batch_size) && is_numeric($batch_size) ? $batch_size : 1;
for ($i = 0; ceil($i < $queue
->numberOfItems() / $batch_size); $i++) {
$batch['operations'][] = [
[
self::class,
'batchProcess',
],
[
$config,
],
];
}
batch_set($batch);
return batch_process('/admin/config/services/acquia-contenthub/import-queue');
}
public static function batchProcess(ImmutableConfig $config, &$context) {
$queue_factory = \Drupal::service('queue');
$queue_manager = \Drupal::service('plugin.manager.queue_worker');
$queue = $queue_factory
->get('acquia_contenthub_import_queue');
$worker = $queue_manager
->createInstance('acquia_contenthub_import_queue');
$queue_batch_size = $config
->get('import_queue_batch_size');
$batch_size = $queue_batch_size === 'all' || $queue
->numberOfItems() < $queue_batch_size ? $queue
->numberOfItems() : $queue_batch_size;
for ($i = 0; $i < $batch_size; $i++) {
if ($item = $queue
->claimItem()) {
try {
$worker
->processItem($item->data);
$queue
->deleteItem($item);
} catch (SuspendQueueException $exception) {
$context['errors'][] = $exception
->getMessage();
$context['success'] = FALSE;
$queue
->releaseItem($item);
break;
} catch (EntityStorageException $exception) {
$context['errors'][] = $exception
->getMessage();
$context['success'] = FALSE;
$queue
->releaseItem($item);
break;
}
}
}
}
public static function batchFinished($success, array $result, array $operations) {
if ($success) {
\Drupal::messenger()
->addStatus(t("Processed all Content Hub entities."));
return;
}
$error_operation = reset($operations);
\Drupal::messenger()
->addStatus(t('An error occurred while processing @operation with arguments : @args', [
'@operation' => $error_operation[0],
'@args' => print_r($error_operation[0], TRUE),
]));
}
}