public static function ContentHubExportQueueController::batchProcess in Acquia Content Hub 8
Common batch processing callback for all operations.
Parameters
int|string $number_of_items: The number of items to process.
mixed $context: The context array.
File
- src/
Controller/ ContentHubExportQueueController.php, line 217
Class
- ContentHubExportQueueController
- Implements an Export Queue Controller for Content Hub.
Namespace
Drupal\acquia_contenthub\ControllerCode
public static function batchProcess($number_of_items, &$context) {
// Get the queue implementation for acquia_contenthub_export_queue.
$queue_factory = \Drupal::service('queue');
$queue = $queue_factory
->get('acquia_contenthub_export_queue');
$queue_manager = \Drupal::service('plugin.manager.queue_worker');
$queue_worker = $queue_manager
->createInstance('acquia_contenthub_export_queue');
// Get the number of items.
$config_factory = \Drupal::service('config.factory');
$config = $config_factory
->get('acquia_contenthub.entity_config');
$batch_size = $config
->get('export_queue_batch_size');
$batch_size = !empty($batch_size) && is_numeric($batch_size) ? $batch_size : 1;
$number_of_queue = $number_of_items < $batch_size ? $number_of_items : $batch_size;
// Repeat $number_of_queue times.
for ($i = 0; $i < $number_of_queue; $i++) {
// Get a queued item.
if ($item = $queue
->claimItem()) {
try {
// Generating a list of entitites.
$entities = $item->data->data;
$entities_list = [];
foreach ($entities as $entity) {
$entities_list[] = new TranslatableMarkup('(@entity_type, @entity_id)', [
'@entity_type' => $entity['entity_type'],
'@entity_id' => $entity['entity_id'],
]);
}
// Process item.
try {
$entities_processed = $queue_worker
->processItem($item->data);
} catch (RequeueException $ex) {
$entities_processed = FALSE;
}
if ($entities_processed == FALSE) {
// Indicate that the item could not be processed.
if ($entities_processed === FALSE) {
$message = new TranslatableMarkup('There was an error processing entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
'@entities' => implode(',', $entities_list),
]);
}
else {
$message = new TranslatableMarkup('No processing was done for entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
'@entities' => implode(',', $entities_list),
]);
}
$context['message'] = Html::escape($message
->jsonSerialize());
$context['results'][] = Html::escape($message
->jsonSerialize());
}
else {
// If everything was correct, delete processed item from the queue.
$queue
->deleteItem($item);
// Creating a text message to present to the user.
$message = new TranslatableMarkup('Processed entities: @entities and their dependencies (@count @label sent).', [
'@entities' => implode(',', $entities_list),
'@count' => $entities_processed,
'@label' => $entities_processed == 1 ? new TranslatableMarkup('entity') : new TranslatableMarkup('entities'),
]);
$context['message'] = Html::escape($message
->jsonSerialize());
$context['results'][] = Html::escape($message
->jsonSerialize());
}
} catch (SuspendQueueException $e) {
// If there was an Exception thrown because of an error
// Releases the item that the worker could not process.
// Another worker can come and process it.
$queue
->releaseItem($item);
break;
}
}
}
}