View source
<?php
namespace Drupal\acquia_contenthub\Controller;
use Drupal\acquia_contenthub\ContentHubEntityDependency;
use Drupal\Component\Utility\Html;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Symfony\Component\DependencyInjection\ContainerInterface;
class ContentHubExportQueueController extends ControllerBase {
protected $queueFactory;
protected $queueManager;
protected $config;
protected $logger;
public function __construct(QueueFactory $queue_factory, QueueWorkerManagerInterface $queue_manager, ConfigFactoryInterface $config_factory, LoggerChannelFactoryInterface $logger_factory) {
$this->queueFactory = $queue_factory;
$this->queueManager = $queue_manager;
$this->config = $config_factory
->get('acquia_contenthub.entity_config');
$this->logger = $logger_factory
->get('acquia_contenthub');
}
public static function create(ContainerInterface $container) {
$queue_factory = $container
->get('queue');
$queue_manager = $container
->get('plugin.manager.queue_worker');
$config_factory = $container
->get('config.factory');
$logger_factory = $container
->get('logger.factory');
return new static($queue_factory, $queue_manager, $config_factory, $logger_factory);
}
public function enqueueExportEntities(array $candidate_entities) {
$queued_entities = [];
$exported_entities = [];
foreach ($candidate_entities as $candidate_entity) {
$entity_type = $candidate_entity
->getEntityTypeId();
$post_dependency_types = ContentHubEntityDependency::getPostDependencyEntityTypes();
if (in_array($entity_type, $post_dependency_types)) {
continue;
}
$entity_id = $candidate_entity
->id();
$entity_uuid = $candidate_entity
->uuid();
$exported_entities[] = [
'entity_type' => $entity_type,
'entity_id' => $entity_id,
'entity_uuid' => $entity_uuid,
];
$queued_entities[$entity_uuid] = $candidate_entity;
}
unset($candidate_entities);
$queue = $this->queueFactory
->get('acquia_contenthub_export_queue');
$entities_per_item = $this->config
->get('export_queue_entities_per_item');
$entities_per_item = $entities_per_item ?: 1;
$chunks = array_chunk($exported_entities, $entities_per_item);
foreach ($chunks as $entities_chunk) {
$uuids = [];
foreach ($entities_chunk as $entity_chunk) {
$uuids[] = $entity_chunk['entity_uuid'];
unset($entity_chunk['entity_uuid']);
}
$item = new \stdClass();
$item->data = $entities_chunk;
if ($queue
->createItem($item) === FALSE) {
$messages = [];
foreach ($uuids as $uuid) {
$message = new TranslatableMarkup('(Type = @type, ID = @id, UUID = @uuid)', [
'@type' => $queued_entities[$uuid]
->getEntityTypeId(),
'@id' => $queued_entities[$uuid]
->id(),
'@uuid' => $uuid,
]);
$messages[] = $message
->jsonSerialize();
unset($queued_entities[$uuid]);
}
$this->logger
->debug('There was an error trying to enqueue the following entities for export: @entities.', [
'@entities' => implode(', ', $messages),
]);
}
}
return $queued_entities;
}
public function getQueueCount() {
$queue = $this->queueFactory
->get('acquia_contenthub_export_queue');
return $queue
->numberOfItems();
}
public function purgeQueue() {
$queue = $this->queueFactory
->get('acquia_contenthub_export_queue');
$queue
->deleteQueue();
}
public function getWaitingTime() {
$waiting_time = $this->config
->get('export_queue_waiting_time');
return $waiting_time ?: 3;
}
public function processQueueItems($number_of_items = 'all') {
$batch = [
'title' => $this
->t("Process Content Hub Export Queue"),
'operations' => [],
'finished' => '\\Drupal\\acquia_contenthub\\Controller\\ContentHubExportQueueController::batchFinished',
];
$queue = $this->queueFactory
->get('acquia_contenthub_export_queue');
$number_of_items = !is_numeric($number_of_items) ? $queue
->numberOfItems() : $number_of_items;
$batch_size = $this->config
->get('export_queue_batch_size');
$batch_size = $batch_size ?: 1;
for ($i = 0; $i < ceil($number_of_items / $batch_size); $i++) {
$batch['operations'][] = [
'\\Drupal\\acquia_contenthub\\Controller\\ContentHubExportQueueController::batchProcess',
[
$number_of_items,
],
];
}
batch_set($batch);
}
public static function batchProcess($number_of_items, &$context) {
$queue_factory = \Drupal::service('queue');
$queue = $queue_factory
->get('acquia_contenthub_export_queue');
$queue_manager = \Drupal::service('plugin.manager.queue_worker');
$queue_worker = $queue_manager
->createInstance('acquia_contenthub_export_queue');
$config_factory = \Drupal::service('config.factory');
$config = $config_factory
->get('acquia_contenthub.entity_config');
$batch_size = $config
->get('export_queue_batch_size');
$batch_size = !empty($batch_size) && is_numeric($batch_size) ? $batch_size : 1;
$number_of_queue = $number_of_items < $batch_size ? $number_of_items : $batch_size;
for ($i = 0; $i < $number_of_queue; $i++) {
if ($item = $queue
->claimItem()) {
try {
$entities = $item->data->data;
$entities_list = [];
foreach ($entities as $entity) {
$entities_list[] = new TranslatableMarkup('(@entity_type, @entity_id)', [
'@entity_type' => $entity['entity_type'],
'@entity_id' => $entity['entity_id'],
]);
}
try {
$entities_processed = $queue_worker
->processItem($item->data);
} catch (RequeueException $ex) {
$entities_processed = FALSE;
}
if ($entities_processed == FALSE) {
if ($entities_processed === FALSE) {
$message = new TranslatableMarkup('There was an error processing entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
'@entities' => implode(',', $entities_list),
]);
}
else {
$message = new TranslatableMarkup('No processing was done for entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
'@entities' => implode(',', $entities_list),
]);
}
$context['message'] = Html::escape($message
->jsonSerialize());
$context['results'][] = Html::escape($message
->jsonSerialize());
}
else {
$queue
->deleteItem($item);
$message = new TranslatableMarkup('Processed entities: @entities and their dependencies (@count @label sent).', [
'@entities' => implode(',', $entities_list),
'@count' => $entities_processed,
'@label' => $entities_processed == 1 ? new TranslatableMarkup('entity') : new TranslatableMarkup('entities'),
]);
$context['message'] = Html::escape($message
->jsonSerialize());
$context['results'][] = Html::escape($message
->jsonSerialize());
}
} catch (SuspendQueueException $e) {
$queue
->releaseItem($item);
break;
}
}
}
}
public static function batchFinished($success, array $results, array $operations) {
if ($success) {
\Drupal::messenger()
->addStatus(t("The contents are successfully exported."));
}
else {
$error_operation = reset($operations);
\Drupal::messenger()
->addStatus(t('An error occurred while processing @operation with arguments : @args', [
'@operation' => $error_operation[0],
'@args' => print_r($error_operation[0], TRUE),
]));
}
$elements = [
'#theme' => 'item_list',
'#type' => 'ul',
'#items' => $results,
];
$queue_report = \Drupal::service('renderer')
->render($elements);
\Drupal::messenger()
->addStatus($queue_report);
}
}