class ContentHubExportQueueWorker in Acquia Content Hub 8.2
Acquia ContentHub queue worker.
Plugin annotation
@QueueWorker(
id = "acquia_contenthub_publish_export",
title = "Queue Worker to export entities to contenthub."
)
Hierarchy
- class \Drupal\Component\Plugin\PluginBase implements DerivativeInspectionInterface, PluginInspectionInterface
- class \Drupal\Core\Queue\QueueWorkerBase implements QueueWorkerInterface
- class \Drupal\acquia_contenthub_publisher\Plugin\QueueWorker\ContentHubExportQueueWorker implements ContainerFactoryPluginInterface
- class \Drupal\Core\Queue\QueueWorkerBase implements QueueWorkerInterface
Expanded class hierarchy of ContentHubExportQueueWorker
File
- modules/
acquia_contenthub_publisher/ src/ Plugin/ QueueWorker/ ContentHubExportQueueWorker.php, line 28
Namespace
Drupal\acquia_contenthub_publisher\Plugin\QueueWorkerView source
class ContentHubExportQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {
/**
* The event dispatcher.
*
* @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
*/
protected $dispatcher;
/**
* The entity type manager.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* The common contenthub actions object.
*
* @var \Drupal\acquia_contenthub\ContentHubCommonActions
*/
protected $common;
/**
* The client factory.
*
* @var \Drupal\acquia_contenthub\Client\ClientFactory
*/
protected $factory;
/**
* The published entity tracker.
*
* @var \Drupal\acquia_contenthub_publisher\PublisherTracker
*/
protected $tracker;
/**
* The config factory.
*
* @var \Drupal\Core\Config\ConfigFactoryInterface
*/
protected $configFactory;
/**
* The logger channel factory.
*
* @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
*/
protected $achLoggerChannel;
/**
* The Content Hub Client.
*
* @var \Acquia\ContentHubClient\ContentHubClient
*/
protected $client;
/**
* ContentHubExportQueueWorker constructor.
*
* @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $dispatcher
* The event dispatcher.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* The entity type manager.
* @param \Drupal\acquia_contenthub\ContentHubCommonActions $common
* The common contenthub actions object.
* @param \Drupal\acquia_contenthub\Client\ClientFactory $factory
* The client factory.
* @param \Drupal\acquia_contenthub_publisher\PublisherTracker $tracker
* The published entity tracker.
* The event dispatcher.
* @param \Drupal\Core\Config\ConfigFactoryInterface $config_factory
* The config factory.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
* The logger factory.
* @param array $configuration
* The plugin configuration.
* @param string $plugin_id
* The plugin id.
* @param mixed $plugin_definition
* The plugin definition.
*
* @throws \Exception
*/
public function __construct(EventDispatcherInterface $dispatcher, EntityTypeManagerInterface $entity_type_manager, ContentHubCommonActions $common, ClientFactory $factory, PublisherTracker $tracker, ConfigFactoryInterface $config_factory, LoggerChannelFactoryInterface $logger_factory, array $configuration, $plugin_id, $plugin_definition) {
$this->dispatcher = $dispatcher;
$this->common = $common;
if (!empty($this->common
->getUpdateDbStatus())) {
throw new \Exception("Site has pending database updates. Apply these updates before exporting content.");
}
$this->entityTypeManager = $entity_type_manager;
$this->factory = $factory;
$this->tracker = $tracker;
$this->configFactory = $config_factory;
$this->achLoggerChannel = $logger_factory
->get('acquia_contenthub_publisher');
parent::__construct($configuration, $plugin_id, $plugin_definition);
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static($container
->get('event_dispatcher'), $container
->get('entity_type.manager'), $container
->get('acquia_contenthub_common_actions'), $container
->get('acquia_contenthub.client.factory'), $container
->get('acquia_contenthub_publisher.tracker'), $container
->get('config.factory'), $container
->get('logger.factory'), $configuration, $plugin_id, $plugin_definition);
}
/**
* Initializes the Connection Manager.
*/
public function initializeClient() : void {
if (empty($this->client)) {
$this->client = $this->factory
->getClient();
}
}
/**
* {@inheritdoc}
*
* This method return values will be used within ContentHubExportQueue.
* Different return values will log different messages and will indicate
* different behaviours:
* return FALSE; => Error processing entities, queue item not deleted.
* return 0; => No processing done, queue item is not deleted
* return TRUE or return int which is not 0 =>
* Entities processed and queue item will be deleted.
*/
public function processItem($data) {
$this
->initializeClient();
if (!$this->client) {
$this->achLoggerChannel
->error('Acquia Content Hub client cannot be initialized because connection settings are empty.');
return FALSE;
}
$storage = $this->entityTypeManager
->getStorage($data->type);
$entity = $storage
->loadByProperties([
'uuid' => $data->uuid,
]);
// Entity missing so remove it from the tracker and stop processing.
if (!$entity) {
$this->tracker
->delete($data->uuid);
$this->achLoggerChannel
->warning(sprintf('Entity ("%s", "%s") being exported no longer exists on the publisher. Deleting item from the publisher queue.', $data->type, $data->uuid));
return TRUE;
}
$entity = reset($entity);
$entities = [];
$calculate_dependencies = $data->calculate_dependencies ?? TRUE;
$output = $this->common
->getEntityCdf($entity, $entities, TRUE, $calculate_dependencies);
$config = $this->configFactory
->get('acquia_contenthub.admin_settings');
$document = new CDFDocument(...$output);
// $output is a cdf of ALLLLLL entities that support the entity we wanted
// to export. What happens if some of the entities which support the entity
// we want to export were imported initially? We should dispatch an event
// to look at the output and see if there are entries in our subscriber
// table and then compare the rest against plexus data.
$event = new PrunePublishCdfEntitiesEvent($this->client, $document, $config
->get('origin'));
$this->dispatcher
->dispatch(AcquiaContentHubEvents::PRUNE_PUBLISH_CDF_ENTITIES, $event);
$output = array_values($event
->getDocument()
->getEntities());
if (empty($output)) {
$this->achLoggerChannel
->warning(sprintf('You are trying to export an empty CDF. Triggering entity: %s, %s', $data->type, $data->uuid));
return 0;
}
// ContentHub backend determines new or update on the PUT endpoint.
$response = $this->client
->putEntities(...$output);
if ($response
->getStatusCode() !== 202) {
$this->achLoggerChannel
->error(sprintf('Request to Content Hub "/entities" endpoint returned with status code = %s. Triggering entity: %s.', $response
->getStatusCode(), $data->uuid));
return FALSE;
}
$entity_uuids = [];
foreach ($output as $item) {
$wrapper = !empty($entities[$item
->getUuid()]) ? $entities[$item
->getUuid()] : NULL;
if ($wrapper) {
$this->tracker
->track($wrapper
->getEntity(), $wrapper
->getHash());
$this->tracker
->nullifyQueueId($item
->getUuid());
}
$entity_uuids[] = $item
->getUuid();
}
// Reinitialize client cdf metrics data.
$this->client = $this->factory
->getClient();
$webhook = $config
->get('webhook.uuid');
$exported_entities = implode(', ', $entity_uuids);
if (!Uuid::isValid($webhook)) {
$this->achLoggerChannel
->warning(sprintf('Site does not have a valid registered webhook and it is required to add entities (%s) to the site\'s interest list in Content Hub.', $exported_entities));
return FALSE;
}
if ($config
->get('send_contenthub_updates') ?? TRUE) {
try {
$this->client
->addEntitiesToInterestList($webhook, $entity_uuids);
$this->achLoggerChannel
->info(sprintf('The following exported entities have been added to the interest list on Content Hub for webhook "%s": [%s].', $webhook, $exported_entities));
} catch (\Exception $e) {
$this->achLoggerChannel
->error(sprintf('Error adding the following entities to the interest list for webhook "%s": [%s]. Error message: "%s".', $webhook, $exported_entities, $e
->getMessage()));
}
}
return count($output);
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
ContentHubExportQueueWorker:: |
protected | property | The logger channel factory. | |
ContentHubExportQueueWorker:: |
protected | property | The Content Hub Client. | |
ContentHubExportQueueWorker:: |
protected | property | The common contenthub actions object. | |
ContentHubExportQueueWorker:: |
protected | property | The config factory. | |
ContentHubExportQueueWorker:: |
protected | property | The event dispatcher. | |
ContentHubExportQueueWorker:: |
protected | property | The entity type manager. | |
ContentHubExportQueueWorker:: |
protected | property | The client factory. | |
ContentHubExportQueueWorker:: |
protected | property | The published entity tracker. | |
ContentHubExportQueueWorker:: |
public static | function |
Creates an instance of the plugin. Overrides ContainerFactoryPluginInterface:: |
|
ContentHubExportQueueWorker:: |
public | function | Initializes the Connection Manager. | |
ContentHubExportQueueWorker:: |
public | function |
This method return values will be used within ContentHubExportQueue.
Different return values will log different messages and will indicate
different behaviours:
return FALSE; => Error processing entities, queue item not deleted.
return 0; => No… Overrides QueueWorkerInterface:: |
|
ContentHubExportQueueWorker:: |
public | function |
ContentHubExportQueueWorker constructor. Overrides PluginBase:: |
|
PluginBase:: |
protected | property | Configuration information passed into the plugin. | 1 |
PluginBase:: |
protected | property | The plugin implementation definition. | 1 |
PluginBase:: |
protected | property | The plugin_id. | |
PluginBase:: |
constant | A string which is used to separate base plugin IDs from the derivative ID. | ||
PluginBase:: |
public | function |
Gets the base_plugin_id of the plugin instance. Overrides DerivativeInspectionInterface:: |
|
PluginBase:: |
public | function |
Gets the derivative_id of the plugin instance. Overrides DerivativeInspectionInterface:: |
|
PluginBase:: |
public | function |
Gets the definition of the plugin implementation. Overrides PluginInspectionInterface:: |
3 |
PluginBase:: |
public | function |
Gets the plugin_id of the plugin instance. Overrides PluginInspectionInterface:: |
|
PluginBase:: |
public | function | Determines if the plugin is configurable. |