You are here

public function ContentHubExportQueueWorker::processItem in Acquia Content Hub 8.2

This method return values will be used within ContentHubExportQueue. Different return values will log different messages and will indicate different behaviours: return FALSE; => Error processing entities, queue item not deleted. return 0; => No processing done, queue item is not deleted return TRUE or return int which is not 0 => Entities processed and queue item will be deleted.

Overrides QueueWorkerInterface::processItem

File

modules/acquia_contenthub_publisher/src/Plugin/QueueWorker/ContentHubExportQueueWorker.php, line 166

Class

ContentHubExportQueueWorker
Acquia ContentHub queue worker.

Namespace

Drupal\acquia_contenthub_publisher\Plugin\QueueWorker

Code

public function processItem($data) {
  $this
    ->initializeClient();
  if (!$this->client) {
    $this->achLoggerChannel
      ->error('Acquia Content Hub client cannot be initialized because connection settings are empty.');
    return FALSE;
  }
  $storage = $this->entityTypeManager
    ->getStorage($data->type);
  $entity = $storage
    ->loadByProperties([
    'uuid' => $data->uuid,
  ]);

  // Entity missing so remove it from the tracker and stop processing.
  if (!$entity) {
    $this->tracker
      ->delete($data->uuid);
    $this->achLoggerChannel
      ->warning(sprintf('Entity ("%s", "%s") being exported no longer exists on the publisher. Deleting item from the publisher queue.', $data->type, $data->uuid));
    return TRUE;
  }
  $entity = reset($entity);
  $entities = [];
  $calculate_dependencies = $data->calculate_dependencies ?? TRUE;
  $output = $this->common
    ->getEntityCdf($entity, $entities, TRUE, $calculate_dependencies);
  $config = $this->configFactory
    ->get('acquia_contenthub.admin_settings');
  $document = new CDFDocument(...$output);

  // $output is a cdf of ALLLLLL entities that support the entity we wanted
  // to export. What happens if some of the entities which support the entity
  // we want to export were imported initially? We should dispatch an event
  // to look at the output and see if there are entries in our subscriber
  // table and then compare the rest against plexus data.
  $event = new PrunePublishCdfEntitiesEvent($this->client, $document, $config
    ->get('origin'));
  $this->dispatcher
    ->dispatch(AcquiaContentHubEvents::PRUNE_PUBLISH_CDF_ENTITIES, $event);
  $output = array_values($event
    ->getDocument()
    ->getEntities());
  if (empty($output)) {
    $this->achLoggerChannel
      ->warning(sprintf('You are trying to export an empty CDF. Triggering entity: %s, %s', $data->type, $data->uuid));
    return 0;
  }

  // ContentHub backend determines new or update on the PUT endpoint.
  $response = $this->client
    ->putEntities(...$output);
  if ($response
    ->getStatusCode() !== 202) {
    $this->achLoggerChannel
      ->error(sprintf('Request to Content Hub "/entities" endpoint returned with status code = %s. Triggering entity: %s.', $response
      ->getStatusCode(), $data->uuid));
    return FALSE;
  }
  $entity_uuids = [];
  foreach ($output as $item) {
    $wrapper = !empty($entities[$item
      ->getUuid()]) ? $entities[$item
      ->getUuid()] : NULL;
    if ($wrapper) {
      $this->tracker
        ->track($wrapper
        ->getEntity(), $wrapper
        ->getHash());
      $this->tracker
        ->nullifyQueueId($item
        ->getUuid());
    }
    $entity_uuids[] = $item
      ->getUuid();
  }

  // Reinitialize client cdf metrics data.
  $this->client = $this->factory
    ->getClient();
  $webhook = $config
    ->get('webhook.uuid');
  $exported_entities = implode(', ', $entity_uuids);
  if (!Uuid::isValid($webhook)) {
    $this->achLoggerChannel
      ->warning(sprintf('Site does not have a valid registered webhook and it is required to add entities (%s) to the site\'s interest list in Content Hub.', $exported_entities));
    return FALSE;
  }
  if ($config
    ->get('send_contenthub_updates') ?? TRUE) {
    try {
      $this->client
        ->addEntitiesToInterestList($webhook, $entity_uuids);
      $this->achLoggerChannel
        ->info(sprintf('The following exported entities have been added to the interest list on Content Hub for webhook "%s": [%s].', $webhook, $exported_entities));
    } catch (\Exception $e) {
      $this->achLoggerChannel
        ->error(sprintf('Error adding the following entities to the interest list for webhook "%s": [%s]. Error message: "%s".', $webhook, $exported_entities, $e
        ->getMessage()));
    }
  }
  return count($output);
}