View source
<?php
namespace Drupal\acquia_contenthub\Plugin\QueueWorker;
use Drupal\acquia_contenthub\Controller\ContentHubEntityExportController;
use Drupal\acquia_contenthub\Controller\ContentHubExportQueueController;
use Drupal\acquia_contenthub\EntityManager;
use Drupal\acquia_contenthub\Normalizer\ContentEntityCdfNormalizer;
use Drupal\Component\Serialization\Json;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Symfony\Component\DependencyInjection\ContainerInterface;
abstract class ContentHubExportQueueBase extends QueueWorkerBase implements ContainerFactoryPluginInterface {
use StringTranslationTrait;
protected $entityManager;
protected $exportController;
protected $exportQueueController;
protected $cdfNormalizer;
protected $entityTypeManager;
protected $loggerFactory;
public function __construct(EntityManager $entity_manager, ContentHubEntityExportController $acquia_contenthub_export_controller, ContentHubExportQueueController $export_queue_controller, ContentEntityCdfNormalizer $cdf_normalizer, EntityTypeManagerInterface $entity_type_manager, LoggerChannelFactoryInterface $logger_channel_factory) {
$this->entityManager = $entity_manager;
$this->exportController = $acquia_contenthub_export_controller;
$this->exportQueueController = $export_queue_controller;
$this->cdfNormalizer = $cdf_normalizer;
$this->entityTypeManager = $entity_type_manager;
$this->loggerFactory = $logger_channel_factory;
}
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
$entity_manager = $container
->get('acquia_contenthub.entity_manager');
$acquia_contenthub_export_controller = $container
->get('acquia_contenthub.acquia_contenthub_export_entities');
$export_queue_controller = $container
->get('acquia_contenthub.acquia_contenthub_export_queue');
$cdf_normalizer = \Drupal::service('acquia_contenthub.normalizer.entity.acquia_contenthub_cdf');
$entity_type_manager = $container
->get('entity_type.manager');
$logger_channel_factory = $container
->get('logger.factory');
return new static($entity_manager, $acquia_contenthub_export_controller, $export_queue_controller, $cdf_normalizer, $entity_type_manager, $logger_channel_factory);
}
public function processItem($item) {
$entities = $item->data;
$this->entityManager
->bulkExport();
$exported_entities = [];
$bulk_url_array = [];
foreach ($entities as $entity) {
$entity_type = $entity['entity_type'];
$entity_id = $entity['entity_id'];
$bulk_url_array[$entity_type][$entity_id] = $entity_id;
$context['query_params']['include_references'] = 'true';
$drupal_entity = \Drupal::entityTypeManager()
->getStorage($entity_type)
->load($entity_id);
$exported_entity = $this->cdfNormalizer
->normalize($drupal_entity, 'acquia_contenthub_cdf', $context);
$exported_entity['entities'] = is_array($exported_entity['entities']) ? $exported_entity['entities'] : [];
foreach ($exported_entity['entities'] as $key => $ch_entity) {
$exported_entity['entities'][$key] = Json::decode($ch_entity
->json());
}
$related_entity = $this->entityTypeManager
->getStorage($entity_type)
->load($entity_id);
if (!$this->entityManager
->isEligibleEntity($related_entity)) {
$this->loggerFactory
->get('acquia_contenthub')
->warning('Entity cannot be processed because it is not eligible for export anymore. UUID: @uuid, @backtrack', [
'@uuid' => $related_entity
->uuid(),
'@backtrack' => __FUNCTION__,
]);
continue;
}
$exported_entity['entities'] = is_array($exported_entity['entities']) ? $exported_entity['entities'] : [];
$exported_entities = array_merge($exported_entities, $exported_entity['entities']);
}
$exported_cdfs = [];
foreach ($exported_entities as $cdf) {
if (!empty($cdf)) {
$exported_cdfs[$cdf['uuid']] = $cdf;
}
}
$uuids = array_keys($exported_cdfs);
$logger = \Drupal::getContainer()
->get('logger.factory');
$logger
->get('acquia_contenthub')
->debug('Queue sending export request to Content Hub for UUIDs %uuids.', [
'%uuids' => implode(", ", array_keys($exported_cdfs)),
]);
if (!empty($exported_cdfs)) {
if ($this->entityManager
->putRemoteEntities(array_values($exported_cdfs))) {
foreach ($exported_cdfs as $exported_entity) {
$this->exportController
->trackExportedEntity($exported_entity, TRUE);
}
return count($exported_cdfs);
}
else {
$message = $this
->t('PUT request to Content Hub failed for these UUIDs: @uuids. Putting it back in the queue for later re-processing.', [
'@uuids' => implode(', ', $uuids),
]);
\Drupal::logger('acquia_contenthub')
->debug($message
->render());
throw new RequeueException($message
->render());
}
}
else {
$message = $this
->t('Could not get the CDF for UUIDs: @uuids. Putting it back in the queue for later re-processing.', [
'@uuids' => implode(', ', $uuids),
]);
\Drupal::logger('acquia_contenthub')
->debug($message
->render());
throw new RequeueException($message
->render());
}
}
}