You are here

public function ContentHubExportQueueBase::processItem in Acquia Content Hub 8

Works on a single queue item.

Parameters

mixed $data: The data that was passed to \Drupal\Core\Queue\QueueInterface::createItem() when the item was queued.

Throws

\Drupal\Core\Queue\RequeueException Processing is not yet finished. This will allow another process to claim the item immediately.

\Exception A QueueWorker plugin may throw an exception to indicate there was a problem. The cron process will log the exception, and leave the item in the queue to be processed again later.

\Drupal\Core\Queue\SuspendQueueException More specifically, a SuspendQueueException should be thrown when a QueueWorker plugin is aware that the problem will affect all subsequent workers of its queue. For example, a callback that makes HTTP requests may find that the remote server is not responding. The cron process will behave as with a normal Exception, and in addition will not attempt to process further items from the current item's queue during the current cron run.

Overrides QueueWorkerInterface::processItem

See also

\Drupal\Core\Cron::processQueues()

File

src/Plugin/QueueWorker/ContentHubExportQueueBase.php, line 107

Class

ContentHubExportQueueBase
Provides base functionality for the Content Hub Export Queue.

Namespace

Drupal\acquia_contenthub\Plugin\QueueWorker

Code

public function processItem($item) {

  // An item contains an array of entities to process in a single batch.
  $entities = $item->data;
  $this->entityManager
    ->bulkExport();
  $exported_entities = [];
  $bulk_url_array = [];
  foreach ($entities as $entity) {
    $entity_type = $entity['entity_type'];
    $entity_id = $entity['entity_id'];
    $bulk_url_array[$entity_type][$entity_id] = $entity_id;

    // Obtaining the CDF from the normalizer service.
    $context['query_params']['include_references'] = 'true';
    $drupal_entity = \Drupal::entityTypeManager()
      ->getStorage($entity_type)
      ->load($entity_id);
    $exported_entity = $this->cdfNormalizer
      ->normalize($drupal_entity, 'acquia_contenthub_cdf', $context);
    $exported_entity['entities'] = is_array($exported_entity['entities']) ? $exported_entity['entities'] : [];
    foreach ($exported_entity['entities'] as $key => $ch_entity) {
      $exported_entity['entities'][$key] = Json::decode($ch_entity
        ->json());
    }
    $related_entity = $this->entityTypeManager
      ->getStorage($entity_type)
      ->load($entity_id);
    if (!$this->entityManager
      ->isEligibleEntity($related_entity)) {
      $this->loggerFactory
        ->get('acquia_contenthub')
        ->warning('Entity cannot be processed because it is not eligible for export anymore. UUID: @uuid, @backtrack', [
        '@uuid' => $related_entity
          ->uuid(),
        '@backtrack' => __FUNCTION__,
      ]);
      continue;
    }
    $exported_entity['entities'] = is_array($exported_entity['entities']) ? $exported_entity['entities'] : [];
    $exported_entities = array_merge($exported_entities, $exported_entity['entities']);
  }

  // Eliminate duplicates.
  $exported_cdfs = [];
  foreach ($exported_entities as $cdf) {
    if (!empty($cdf)) {
      $exported_cdfs[$cdf['uuid']] = $cdf;
    }
  }
  $uuids = array_keys($exported_cdfs);

  // Log list of UUIDs being exported.
  $logger = \Drupal::getContainer()
    ->get('logger.factory');
  $logger
    ->get('acquia_contenthub')
    ->debug('Queue sending export request to Content Hub for UUIDs %uuids.', [
    '%uuids' => implode(", ", array_keys($exported_cdfs)),
  ]);

  // Publish entities.
  if (!empty($exported_cdfs)) {
    if ($this->entityManager
      ->putRemoteEntities(array_values($exported_cdfs))) {
      foreach ($exported_cdfs as $exported_entity) {

        // Obtaining the entity ID from the entity.
        $this->exportController
          ->trackExportedEntity($exported_entity, TRUE);
      }
      return count($exported_cdfs);
    }
    else {

      // Error, cannot put entities to Content Hub.
      $message = $this
        ->t('PUT request to Content Hub failed for these UUIDs: @uuids. Putting it back in the queue for later re-processing.', [
        '@uuids' => implode(', ', $uuids),
      ]);
      \Drupal::logger('acquia_contenthub')
        ->debug($message
        ->render());
      throw new RequeueException($message
        ->render());
    }
  }
  else {

    // Nothing to export.
    $message = $this
      ->t('Could not get the CDF for UUIDs: @uuids. Putting it back in the queue for later re-processing.', [
      '@uuids' => implode(', ', $uuids),
    ]);
    \Drupal::logger('acquia_contenthub')
      ->debug($message
      ->render());
    throw new RequeueException($message
      ->render());
  }
}