public function ContentHubExportQueueBase::processItem in Acquia Content Hub 8
Works on a single queue item.
Parameters
mixed $data: The data that was passed to \Drupal\Core\Queue\QueueInterface::createItem() when the item was queued.
Throws
\Drupal\Core\Queue\RequeueException Processing is not yet finished. This will allow another process to claim the item immediately.
\Exception A QueueWorker plugin may throw an exception to indicate there was a problem. The cron process will log the exception, and leave the item in the queue to be processed again later.
\Drupal\Core\Queue\SuspendQueueException More specifically, a SuspendQueueException should be thrown when a QueueWorker plugin is aware that the problem will affect all subsequent workers of its queue. For example, a callback that makes HTTP requests may find that the remote server is not responding. The cron process will behave as with a normal Exception, and in addition will not attempt to process further items from the current item's queue during the current cron run.
Overrides QueueWorkerInterface::processItem
See also
\Drupal\Core\Cron::processQueues()
File
- src/
Plugin/ QueueWorker/ ContentHubExportQueueBase.php, line 107
Class
- ContentHubExportQueueBase
- Provides base functionality for the Content Hub Export Queue.
Namespace
Drupal\acquia_contenthub\Plugin\QueueWorkerCode
public function processItem($item) {
// An item contains an array of entities to process in a single batch.
$entities = $item->data;
$this->entityManager
->bulkExport();
$exported_entities = [];
$bulk_url_array = [];
foreach ($entities as $entity) {
$entity_type = $entity['entity_type'];
$entity_id = $entity['entity_id'];
$bulk_url_array[$entity_type][$entity_id] = $entity_id;
// Obtaining the CDF from the normalizer service.
$context['query_params']['include_references'] = 'true';
$drupal_entity = \Drupal::entityTypeManager()
->getStorage($entity_type)
->load($entity_id);
$exported_entity = $this->cdfNormalizer
->normalize($drupal_entity, 'acquia_contenthub_cdf', $context);
$exported_entity['entities'] = is_array($exported_entity['entities']) ? $exported_entity['entities'] : [];
foreach ($exported_entity['entities'] as $key => $ch_entity) {
$exported_entity['entities'][$key] = Json::decode($ch_entity
->json());
}
$related_entity = $this->entityTypeManager
->getStorage($entity_type)
->load($entity_id);
if (!$this->entityManager
->isEligibleEntity($related_entity)) {
$this->loggerFactory
->get('acquia_contenthub')
->warning('Entity cannot be processed because it is not eligible for export anymore. UUID: @uuid, @backtrack', [
'@uuid' => $related_entity
->uuid(),
'@backtrack' => __FUNCTION__,
]);
continue;
}
$exported_entity['entities'] = is_array($exported_entity['entities']) ? $exported_entity['entities'] : [];
$exported_entities = array_merge($exported_entities, $exported_entity['entities']);
}
// Eliminate duplicates.
$exported_cdfs = [];
foreach ($exported_entities as $cdf) {
if (!empty($cdf)) {
$exported_cdfs[$cdf['uuid']] = $cdf;
}
}
$uuids = array_keys($exported_cdfs);
// Log list of UUIDs being exported.
$logger = \Drupal::getContainer()
->get('logger.factory');
$logger
->get('acquia_contenthub')
->debug('Queue sending export request to Content Hub for UUIDs %uuids.', [
'%uuids' => implode(", ", array_keys($exported_cdfs)),
]);
// Publish entities.
if (!empty($exported_cdfs)) {
if ($this->entityManager
->putRemoteEntities(array_values($exported_cdfs))) {
foreach ($exported_cdfs as $exported_entity) {
// Obtaining the entity ID from the entity.
$this->exportController
->trackExportedEntity($exported_entity, TRUE);
}
return count($exported_cdfs);
}
else {
// Error, cannot put entities to Content Hub.
$message = $this
->t('PUT request to Content Hub failed for these UUIDs: @uuids. Putting it back in the queue for later re-processing.', [
'@uuids' => implode(', ', $uuids),
]);
\Drupal::logger('acquia_contenthub')
->debug($message
->render());
throw new RequeueException($message
->render());
}
}
else {
// Nothing to export.
$message = $this
->t('Could not get the CDF for UUIDs: @uuids. Putting it back in the queue for later re-processing.', [
'@uuids' => implode(', ', $uuids),
]);
\Drupal::logger('acquia_contenthub')
->debug($message
->render());
throw new RequeueException($message
->render());
}
}