class ContentHubImportQueueWorker in Acquia Content Hub 8.2
Queue worker for importing entities.
Plugin annotation
@QueueWorker(
id = "acquia_contenthub_subscriber_import",
title = "Queue Worker to import entities from contenthub."
)
Hierarchy
- class \Drupal\Component\Plugin\PluginBase implements DerivativeInspectionInterface, PluginInspectionInterface
- class \Drupal\Core\Queue\QueueWorkerBase implements QueueWorkerInterface
- class \Drupal\acquia_contenthub_subscriber\Plugin\QueueWorker\ContentHubImportQueueWorker implements ContainerFactoryPluginInterface
- class \Drupal\Core\Queue\QueueWorkerBase implements QueueWorkerInterface
Expanded class hierarchy of ContentHubImportQueueWorker
1 file declares its use of ContentHubImportQueueWorker
- UnserializationTest.php in tests/
src/ Kernel/ UnserializationTest.php
File
- modules/
acquia_contenthub_subscriber/ src/ Plugin/ QueueWorker/ ContentHubImportQueueWorker.php, line 24
Namespace
Drupal\acquia_contenthub_subscriber\Plugin\QueueWorkerView source
class ContentHubImportQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {
/**
* The event dispatcher.
*
* @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
*/
protected $dispatcher;
/**
* The common actions object.
*
* @var \Drupal\acquia_contenthub\ContentHubCommonActions
*/
protected $common;
/**
* The client factory.
*
* @var \Drupal\acquia_contenthub\Client\ClientFactory
*/
protected $factory;
/**
* The Content Hub Client.
*
* @var \Acquia\ContentHubClient\ContentHubClient
*/
protected $client;
/**
* The Subscriber Tracker.
*
* @var \Drupal\acquia_contenthub_subscriber\SubscriberTracker
*/
protected $tracker;
/**
* The logger channel factory.
*
* @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
*/
protected $achLoggerChannel;
/**
* The config factory.
*
* @var \Drupal\Core\Config\ConfigFactoryInterface
*/
protected $configFactory;
/**
* ContentHubExportQueueWorker constructor.
*
* @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $dispatcher
* Dispatcher.
* @param \Drupal\acquia_contenthub\ContentHubCommonActions $common
* The common actions object.
* @param \Drupal\acquia_contenthub\Client\ClientFactory $factory
* The client factory.
* @param \Drupal\acquia_contenthub_subscriber\SubscriberTracker $tracker
* The Subscriber Tracker.
* @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
* The logger factory.
* @param \Drupal\Core\Config\ConfigFactoryInterface $config_factory
* The config factory.
* @param array $configuration
* The plugin configuration.
* @param string $plugin_id
* The plugin id.
* @param mixed $plugin_definition
* The plugin definition.
*
* @throws \Exception
*/
public function __construct(EventDispatcherInterface $dispatcher, ContentHubCommonActions $common, ClientFactory $factory, SubscriberTracker $tracker, LoggerChannelFactoryInterface $logger_factory, ConfigFactoryInterface $config_factory, array $configuration, $plugin_id, $plugin_definition) {
$this->common = $common;
if (!empty($this->common
->getUpdateDbStatus())) {
throw new \Exception("Site has pending database updates. Apply these updates before importing content.");
}
$this->dispatcher = $dispatcher;
$this->factory = $factory;
$this->tracker = $tracker;
$this->achLoggerChannel = $logger_factory
->get('acquia_contenthub_subscriber');
$this->configFactory = $config_factory;
parent::__construct($configuration, $plugin_id, $plugin_definition);
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static($container
->get('event_dispatcher'), $container
->get('acquia_contenthub_common_actions'), $container
->get('acquia_contenthub.client.factory'), $container
->get('acquia_contenthub_subscriber.tracker'), $container
->get('logger.factory'), $container
->get('config.factory'), $configuration, $plugin_id, $plugin_definition);
}
/**
* Initializes the Connection Manager.
*/
public function initializeClient() : void {
if (empty($this->client)) {
$this->client = $this->factory
->getClient();
}
}
/**
* Processes acquia_contenthub_subscriber_import queue items.
*
* @param mixed $data
* The data in the queue.
*
* @throws \Exception
*/
public function processItem($data) : void {
$this
->initializeClient();
if (!$this->client) {
$this->achLoggerChannel
->error('Acquia Content Hub client cannot be initialized because connection settings are empty.');
return;
}
$settings = $this->factory
->getSettings();
$webhook = $settings
->getWebhook('uuid');
try {
$interests = $this->client
->getInterestsByWebhook($webhook);
} catch (\Exception $exception) {
$this->achLoggerChannel
->error(sprintf('Following error occurred while we were trying to get the interest list: %s', $exception
->getMessage()));
return;
}
$config = $this->configFactory
->get('acquia_contenthub.admin_settings');
$send_update = $config
->get('send_contenthub_updates') ?? TRUE;
$process_items = explode(', ', $data->uuids);
// Get rid of items potentially deleted from the interest list.
$uuids = array_intersect($process_items, $interests);
if (count($uuids) !== count($process_items)) {
// Log the uuids no longer on the interest list for this webhook.
$missing_uuids = array_diff($process_items, $uuids);
$this->achLoggerChannel
->info(sprintf('Skipped importing the following missing entities: %s. This occurs when entities are deleted at the Publisher before importing.', implode(', ', $missing_uuids)));
}
if (!$uuids) {
$this->achLoggerChannel
->info('There are no matching entities in the queues and the site interest list.');
return;
}
try {
$stack = $this->common
->importEntities(...$uuids);
// Reinitialize the client to refresh the client CDF metrics.
$this->client = $this->factory
->getClient();
} catch (ContentHubImportException $e) {
// Get UUIDs.
$e_uuids = $e
->getUuids();
if (array_diff($uuids, $e_uuids) == array_diff($e_uuids, $uuids) && $e
->isEntitiesMissing()) {
// The UUIDs can't be imported since they aren't in the Service.
// The missing UUIDs are the same as the ones that were sent for import.
if ($webhook) {
foreach ($uuids as $uuid) {
try {
if (!$this->tracker
->getEntityByRemoteIdAndHash($uuid)) {
// If we cannot load, delete interest and tracking record.
if ($send_update) {
$this->client
->deleteInterest($uuid, $webhook);
}
$this->tracker
->delete($uuid);
$this->achLoggerChannel
->info(sprintf('The following entity was deleted from interest list and tracking table: %s', $uuid));
}
} catch (\Exception $ex) {
$this->achLoggerChannel
->error(sprintf('Entity deletion from tracking table and interest list failed. Entity: %s. Message: %s', $uuid, $ex
->getMessage()));
}
return;
}
}
}
else {
// There are import problems but probably on dependent entities.
$this->achLoggerChannel
->error(sprintf('Import failed: %s.', $e
->getMessage()));
throw $e;
}
}
if ($webhook && $send_update) {
try {
$this->client
->addEntitiesToInterestList($webhook, array_keys($stack
->getDependencies()));
$this->achLoggerChannel
->info(sprintf('The following imported entities have been added to the interest list on Content Hub for webhook "%s": [%s].', $webhook, implode(', ', $uuids)));
} catch (\Exception $e) {
$this->achLoggerChannel
->error(sprintf('Error adding the following entities to the interest list for webhook "%s": [%s]. Error message: "%s".', $webhook, implode(', ', $uuids), $e
->getMessage()));
}
}
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
ContentHubImportQueueWorker:: |
protected | property | The logger channel factory. | |
ContentHubImportQueueWorker:: |
protected | property | The Content Hub Client. | |
ContentHubImportQueueWorker:: |
protected | property | The common actions object. | |
ContentHubImportQueueWorker:: |
protected | property | The config factory. | |
ContentHubImportQueueWorker:: |
protected | property | The event dispatcher. | |
ContentHubImportQueueWorker:: |
protected | property | The client factory. | |
ContentHubImportQueueWorker:: |
protected | property | The Subscriber Tracker. | |
ContentHubImportQueueWorker:: |
public static | function |
Creates an instance of the plugin. Overrides ContainerFactoryPluginInterface:: |
|
ContentHubImportQueueWorker:: |
public | function | Initializes the Connection Manager. | |
ContentHubImportQueueWorker:: |
public | function |
Processes acquia_contenthub_subscriber_import queue items. Overrides QueueWorkerInterface:: |
|
ContentHubImportQueueWorker:: |
public | function |
ContentHubExportQueueWorker constructor. Overrides PluginBase:: |
|
PluginBase:: |
protected | property | Configuration information passed into the plugin. | 1 |
PluginBase:: |
protected | property | The plugin implementation definition. | 1 |
PluginBase:: |
protected | property | The plugin_id. | |
PluginBase:: |
constant | A string which is used to separate base plugin IDs from the derivative ID. | ||
PluginBase:: |
public | function |
Gets the base_plugin_id of the plugin instance. Overrides DerivativeInspectionInterface:: |
|
PluginBase:: |
public | function |
Gets the derivative_id of the plugin instance. Overrides DerivativeInspectionInterface:: |
|
PluginBase:: |
public | function |
Gets the definition of the plugin implementation. Overrides PluginInspectionInterface:: |
3 |
PluginBase:: |
public | function |
Gets the plugin_id of the plugin instance. Overrides PluginInspectionInterface:: |
|
PluginBase:: |
public | function | Determines if the plugin is configurable. |