class ContentHubExportQueueController in Acquia Content Hub 8
Implements an Export Queue Controller for Content Hub.
Hierarchy
- class \Drupal\Core\Controller\ControllerBase implements ContainerInjectionInterface uses LoggerChannelTrait, MessengerTrait, LinkGeneratorTrait, RedirectDestinationTrait, UrlGeneratorTrait, StringTranslationTrait
- class \Drupal\acquia_contenthub\Controller\ContentHubExportQueueController
Expanded class hierarchy of ContentHubExportQueueController
4 files declare their use of ContentHubExportQueueController
- ContentHubExportQueueBase.php in src/
Plugin/ QueueWorker/ ContentHubExportQueueBase.php - ContentHubExportQueueControllerTest.php in tests/
src/ Unit/ Controller/ ContentHubExportQueueControllerTest.php - ContentHubExportQueueForm.php in src/
Form/ ContentHubExportQueueForm.php - ContentHubExportQueueMessageSubscriber.php in src/
EventSubscriber/ ContentHubExportQueueMessageSubscriber.php
1 string reference to 'ContentHubExportQueueController'
1 service uses ContentHubExportQueueController
File
- src/
Controller/ ContentHubExportQueueController.php, line 20
Namespace
Drupal\acquia_contenthub\ControllerView source
class ContentHubExportQueueController extends ControllerBase {
/**
* The Queue Factory.
*
* @var \Drupal\Core\Queue\QueueFactory
*/
protected $queueFactory;
/**
* The Queue Manager.
*
* @var \Drupal\Core\Queue\QueueWorkerManager
*/
protected $queueManager;
/**
* Drupal\Core\Config\ImmutableConfig.
*
* @var \Drupal\Core\Config\ImmutableConfig
*/
protected $config;
/**
* Logger Channel.
*
* @var \Drupal\Core\Logger\LoggerChannelInterface
*/
protected $logger;
/**
* {@inheritdoc}
*/
public function __construct(QueueFactory $queue_factory, QueueWorkerManagerInterface $queue_manager, ConfigFactoryInterface $config_factory, LoggerChannelFactoryInterface $logger_factory) {
$this->queueFactory = $queue_factory;
$this->queueManager = $queue_manager;
$this->config = $config_factory
->get('acquia_contenthub.entity_config');
$this->logger = $logger_factory
->get('acquia_contenthub');
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container) {
/** @var \Drupal\Core\Queue\QueueFactory $queue_factory */
$queue_factory = $container
->get('queue');
/** @var \Drupal\Core\Queue\QueueWorkerManager $queue_manager */
$queue_manager = $container
->get('plugin.manager.queue_worker');
/** @var \Drupal\Core\Config\ConfigFactoryInterface $config_factory */
$config_factory = $container
->get('config.factory');
/** @var \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory */
$logger_factory = $container
->get('logger.factory');
return new static($queue_factory, $queue_manager, $config_factory, $logger_factory);
}
/**
* Adds entities to the export queue.
*
* @param \Drupal\Core\Entity\ContentEntityInterface[] $candidate_entities
* An array of entities (uuid, entity object) to be exported to Content Hub.
*
* @return \Drupal\Core\Entity\ContentEntityInterface[]
* An array of successfully queued entities (uuid, entity object).
*/
public function enqueueExportEntities(array $candidate_entities) {
$queued_entities = [];
// The collected entities are clean now and should all be processed.
$exported_entities = [];
foreach ($candidate_entities as $candidate_entity) {
$entity_type = $candidate_entity
->getEntityTypeId();
// Do not enqueue post-dependent entities like paragraphs.
$post_dependency_types = ContentHubEntityDependency::getPostDependencyEntityTypes();
if (in_array($entity_type, $post_dependency_types)) {
continue;
}
$entity_id = $candidate_entity
->id();
$entity_uuid = $candidate_entity
->uuid();
$exported_entities[] = [
'entity_type' => $entity_type,
'entity_id' => $entity_id,
'entity_uuid' => $entity_uuid,
];
$queued_entities[$entity_uuid] = $candidate_entity;
}
unset($candidate_entities);
// Obtain the export queue.
$queue = $this->queueFactory
->get('acquia_contenthub_export_queue');
// Divide the list of entities into chunks to be processed in groups.
$entities_per_item = $this->config
->get('export_queue_entities_per_item');
$entities_per_item = $entities_per_item ?: 1;
$chunks = array_chunk($exported_entities, $entities_per_item);
foreach ($chunks as $entities_chunk) {
$uuids = [];
foreach ($entities_chunk as $entity_chunk) {
$uuids[] = $entity_chunk['entity_uuid'];
unset($entity_chunk['entity_uuid']);
}
$item = new \stdClass();
$item->data = $entities_chunk;
if ($queue
->createItem($item) === FALSE) {
$messages = [];
foreach ($uuids as $uuid) {
$message = new TranslatableMarkup('(Type = @type, ID = @id, UUID = @uuid)', [
'@type' => $queued_entities[$uuid]
->getEntityTypeId(),
'@id' => $queued_entities[$uuid]
->id(),
'@uuid' => $uuid,
]);
$messages[] = $message
->jsonSerialize();
unset($queued_entities[$uuid]);
}
$this->logger
->debug('There was an error trying to enqueue the following entities for export: @entities.', [
'@entities' => implode(', ', $messages),
]);
}
}
return $queued_entities;
}
/**
* Obtains the number of items in the export queue.
*
* @return mixed
* The number of items in the export queue.
*/
public function getQueueCount() {
$queue = $this->queueFactory
->get('acquia_contenthub_export_queue');
return $queue
->numberOfItems();
}
/**
* Execute the delete function for the ACH Export Queue.
*/
public function purgeQueue() {
$queue = $this->queueFactory
->get('acquia_contenthub_export_queue');
$queue
->deleteQueue();
}
/**
* Obtains the Queue waiting time in seconds.
*
* @return int
* Amount in seconds of time to wait before processing an item in the queue.
*/
public function getWaitingTime() {
$waiting_time = $this->config
->get('export_queue_waiting_time');
return $waiting_time ?: 3;
}
/**
* Process all queue items with batch API.
*
* @param string|int $number_of_items
* The number of items to process.
*/
public function processQueueItems($number_of_items = 'all') {
// Create batch which collects all the specified queue items and process
// them one after another.
$batch = [
'title' => $this
->t("Process Content Hub Export Queue"),
'operations' => [],
'finished' => '\\Drupal\\acquia_contenthub\\Controller\\ContentHubExportQueueController::batchFinished',
];
// Calculate the number of items to process.
$queue = $this->queueFactory
->get('acquia_contenthub_export_queue');
$number_of_items = !is_numeric($number_of_items) ? $queue
->numberOfItems() : $number_of_items;
// Count number of the items in this queue, create enough batch operations.
$batch_size = $this->config
->get('export_queue_batch_size');
$batch_size = $batch_size ?: 1;
for ($i = 0; $i < ceil($number_of_items / $batch_size); $i++) {
// Create batch operations.
$batch['operations'][] = [
'\\Drupal\\acquia_contenthub\\Controller\\ContentHubExportQueueController::batchProcess',
[
$number_of_items,
],
];
}
// Adds the batch sets.
batch_set($batch);
}
/**
* Common batch processing callback for all operations.
*
* @param int|string $number_of_items
* The number of items to process.
* @param mixed $context
* The context array.
*/
public static function batchProcess($number_of_items, &$context) {
// Get the queue implementation for acquia_contenthub_export_queue.
$queue_factory = \Drupal::service('queue');
$queue = $queue_factory
->get('acquia_contenthub_export_queue');
$queue_manager = \Drupal::service('plugin.manager.queue_worker');
$queue_worker = $queue_manager
->createInstance('acquia_contenthub_export_queue');
// Get the number of items.
$config_factory = \Drupal::service('config.factory');
$config = $config_factory
->get('acquia_contenthub.entity_config');
$batch_size = $config
->get('export_queue_batch_size');
$batch_size = !empty($batch_size) && is_numeric($batch_size) ? $batch_size : 1;
$number_of_queue = $number_of_items < $batch_size ? $number_of_items : $batch_size;
// Repeat $number_of_queue times.
for ($i = 0; $i < $number_of_queue; $i++) {
// Get a queued item.
if ($item = $queue
->claimItem()) {
try {
// Generating a list of entitites.
$entities = $item->data->data;
$entities_list = [];
foreach ($entities as $entity) {
$entities_list[] = new TranslatableMarkup('(@entity_type, @entity_id)', [
'@entity_type' => $entity['entity_type'],
'@entity_id' => $entity['entity_id'],
]);
}
// Process item.
try {
$entities_processed = $queue_worker
->processItem($item->data);
} catch (RequeueException $ex) {
$entities_processed = FALSE;
}
if ($entities_processed == FALSE) {
// Indicate that the item could not be processed.
if ($entities_processed === FALSE) {
$message = new TranslatableMarkup('There was an error processing entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
'@entities' => implode(',', $entities_list),
]);
}
else {
$message = new TranslatableMarkup('No processing was done for entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
'@entities' => implode(',', $entities_list),
]);
}
$context['message'] = Html::escape($message
->jsonSerialize());
$context['results'][] = Html::escape($message
->jsonSerialize());
}
else {
// If everything was correct, delete processed item from the queue.
$queue
->deleteItem($item);
// Creating a text message to present to the user.
$message = new TranslatableMarkup('Processed entities: @entities and their dependencies (@count @label sent).', [
'@entities' => implode(',', $entities_list),
'@count' => $entities_processed,
'@label' => $entities_processed == 1 ? new TranslatableMarkup('entity') : new TranslatableMarkup('entities'),
]);
$context['message'] = Html::escape($message
->jsonSerialize());
$context['results'][] = Html::escape($message
->jsonSerialize());
}
} catch (SuspendQueueException $e) {
// If there was an Exception thrown because of an error
// Releases the item that the worker could not process.
// Another worker can come and process it.
$queue
->releaseItem($item);
break;
}
}
}
}
/**
* Batch finished callback.
*
* @param bool $success
* Whether the batch process succeeded or not.
* @param array $results
* The results array.
* @param array $operations
* An array of operations.
*/
public static function batchFinished($success, array $results, array $operations) {
if ($success) {
\Drupal::messenger()
->addStatus(t("The contents are successfully exported."));
}
else {
$error_operation = reset($operations);
\Drupal::messenger()
->addStatus(t('An error occurred while processing @operation with arguments : @args', [
'@operation' => $error_operation[0],
'@args' => print_r($error_operation[0], TRUE),
]));
}
// Providing a report on the items processed by the queue.
$elements = [
'#theme' => 'item_list',
'#type' => 'ul',
'#items' => $results,
];
$queue_report = \Drupal::service('renderer')
->render($elements);
\Drupal::messenger()
->addStatus($queue_report);
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
ContentHubExportQueueController:: |
protected | property | Drupal\Core\Config\ImmutableConfig. | |
ContentHubExportQueueController:: |
protected | property | Logger Channel. | |
ContentHubExportQueueController:: |
protected | property | The Queue Factory. | |
ContentHubExportQueueController:: |
protected | property | The Queue Manager. | |
ContentHubExportQueueController:: |
public static | function | Batch finished callback. | |
ContentHubExportQueueController:: |
public static | function | Common batch processing callback for all operations. | |
ContentHubExportQueueController:: |
public static | function |
Instantiates a new instance of this class. Overrides ControllerBase:: |
|
ContentHubExportQueueController:: |
public | function | Adds entities to the export queue. | |
ContentHubExportQueueController:: |
public | function | Obtains the number of items in the export queue. | |
ContentHubExportQueueController:: |
public | function | Obtains the Queue waiting time in seconds. | |
ContentHubExportQueueController:: |
public | function | Process all queue items with batch API. | |
ContentHubExportQueueController:: |
public | function | Execute the delete function for the ACH Export Queue. | |
ContentHubExportQueueController:: |
public | function | ||
ControllerBase:: |
protected | property | The configuration factory. | |
ControllerBase:: |
protected | property | The current user service. | 1 |
ControllerBase:: |
protected | property | The entity form builder. | |
ControllerBase:: |
protected | property | The entity manager. | |
ControllerBase:: |
protected | property | The entity type manager. | |
ControllerBase:: |
protected | property | The form builder. | 2 |
ControllerBase:: |
protected | property | The key-value storage. | 1 |
ControllerBase:: |
protected | property | The language manager. | 1 |
ControllerBase:: |
protected | property | The module handler. | 2 |
ControllerBase:: |
protected | property | The state service. | |
ControllerBase:: |
protected | function | Returns the requested cache bin. | |
ControllerBase:: |
protected | function | Retrieves a configuration object. | |
ControllerBase:: |
private | function | Returns the service container. | |
ControllerBase:: |
protected | function | Returns the current user. | 1 |
ControllerBase:: |
protected | function | Retrieves the entity form builder. | |
ControllerBase:: |
protected | function | Retrieves the entity manager service. | |
ControllerBase:: |
protected | function | Retrieves the entity type manager. | |
ControllerBase:: |
protected | function | Returns the form builder service. | 2 |
ControllerBase:: |
protected | function | Returns a key/value storage collection. | 1 |
ControllerBase:: |
protected | function | Returns the language manager service. | 1 |
ControllerBase:: |
protected | function | Returns the module handler. | 2 |
ControllerBase:: |
protected | function |
Returns a redirect response object for the specified route. Overrides UrlGeneratorTrait:: |
|
ControllerBase:: |
protected | function | Returns the state storage service. | |
LinkGeneratorTrait:: |
protected | property | The link generator. | 1 |
LinkGeneratorTrait:: |
protected | function | Returns the link generator. | |
LinkGeneratorTrait:: |
protected | function | Renders a link to a route given a route name and its parameters. | |
LinkGeneratorTrait:: |
public | function | Sets the link generator service. | |
LoggerChannelTrait:: |
protected | property | The logger channel factory service. | |
LoggerChannelTrait:: |
protected | function | Gets the logger for a specific channel. | |
LoggerChannelTrait:: |
public | function | Injects the logger channel factory. | |
MessengerTrait:: |
protected | property | The messenger. | 29 |
MessengerTrait:: |
public | function | Gets the messenger. | 29 |
MessengerTrait:: |
public | function | Sets the messenger. | |
RedirectDestinationTrait:: |
protected | property | The redirect destination service. | 1 |
RedirectDestinationTrait:: |
protected | function | Prepares a 'destination' URL query parameter for use with \Drupal\Core\Url. | |
RedirectDestinationTrait:: |
protected | function | Returns the redirect destination service. | |
RedirectDestinationTrait:: |
public | function | Sets the redirect destination service. | |
StringTranslationTrait:: |
protected | property | The string translation service. | 1 |
StringTranslationTrait:: |
protected | function | Formats a string containing a count of items. | |
StringTranslationTrait:: |
protected | function | Returns the number of plurals supported by a given language. | |
StringTranslationTrait:: |
protected | function | Gets the string translation service. | |
StringTranslationTrait:: |
public | function | Sets the string translation service to use. | 2 |
StringTranslationTrait:: |
protected | function | Translates a string to the current language or to a given language. | |
UrlGeneratorTrait:: |
protected | property | The url generator. | |
UrlGeneratorTrait:: |
protected | function | Returns the URL generator service. | |
UrlGeneratorTrait:: |
public | function | Sets the URL generator service. | |
UrlGeneratorTrait:: |
protected | function | Generates a URL or path for a specific route based on the given parameters. |