You are here

class ContentHubExportQueueController in Acquia Content Hub 8

Implements an Export Queue Controller for Content Hub.

Hierarchy

Expanded class hierarchy of ContentHubExportQueueController

4 files declare their use of ContentHubExportQueueController
ContentHubExportQueueBase.php in src/Plugin/QueueWorker/ContentHubExportQueueBase.php
ContentHubExportQueueControllerTest.php in tests/src/Unit/Controller/ContentHubExportQueueControllerTest.php
ContentHubExportQueueForm.php in src/Form/ContentHubExportQueueForm.php
ContentHubExportQueueMessageSubscriber.php in src/EventSubscriber/ContentHubExportQueueMessageSubscriber.php
1 string reference to 'ContentHubExportQueueController'
acquia_contenthub.services.yml in ./acquia_contenthub.services.yml
acquia_contenthub.services.yml
1 service uses ContentHubExportQueueController
acquia_contenthub.acquia_contenthub_export_queue in ./acquia_contenthub.services.yml
Drupal\acquia_contenthub\Controller\ContentHubExportQueueController

File

src/Controller/ContentHubExportQueueController.php, line 20

Namespace

Drupal\acquia_contenthub\Controller
View source
class ContentHubExportQueueController extends ControllerBase {

  /**
   * The Queue Factory.
   *
   * @var \Drupal\Core\Queue\QueueFactory
   */
  protected $queueFactory;

  /**
   * The Queue Manager.
   *
   * @var \Drupal\Core\Queue\QueueWorkerManager
   */
  protected $queueManager;

  /**
   * Drupal\Core\Config\ImmutableConfig.
   *
   * @var \Drupal\Core\Config\ImmutableConfig
   */
  protected $config;

  /**
   * Logger Channel.
   *
   * @var \Drupal\Core\Logger\LoggerChannelInterface
   */
  protected $logger;

  /**
   * {@inheritdoc}
   */
  public function __construct(QueueFactory $queue_factory, QueueWorkerManagerInterface $queue_manager, ConfigFactoryInterface $config_factory, LoggerChannelFactoryInterface $logger_factory) {
    $this->queueFactory = $queue_factory;
    $this->queueManager = $queue_manager;
    $this->config = $config_factory
      ->get('acquia_contenthub.entity_config');
    $this->logger = $logger_factory
      ->get('acquia_contenthub');
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {

    /** @var \Drupal\Core\Queue\QueueFactory $queue_factory */
    $queue_factory = $container
      ->get('queue');

    /** @var \Drupal\Core\Queue\QueueWorkerManager $queue_manager */
    $queue_manager = $container
      ->get('plugin.manager.queue_worker');

    /** @var \Drupal\Core\Config\ConfigFactoryInterface $config_factory */
    $config_factory = $container
      ->get('config.factory');

    /** @var \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory */
    $logger_factory = $container
      ->get('logger.factory');
    return new static($queue_factory, $queue_manager, $config_factory, $logger_factory);
  }

  /**
   * Adds entities to the export queue.
   *
   * @param \Drupal\Core\Entity\ContentEntityInterface[] $candidate_entities
   *   An array of entities (uuid, entity object) to be exported to Content Hub.
   *
   * @return \Drupal\Core\Entity\ContentEntityInterface[]
   *   An array of successfully queued entities (uuid, entity object).
   */
  public function enqueueExportEntities(array $candidate_entities) {
    $queued_entities = [];

    // The collected entities are clean now and should all be processed.
    $exported_entities = [];
    foreach ($candidate_entities as $candidate_entity) {
      $entity_type = $candidate_entity
        ->getEntityTypeId();

      // Do not enqueue post-dependent entities like paragraphs.
      $post_dependency_types = ContentHubEntityDependency::getPostDependencyEntityTypes();
      if (in_array($entity_type, $post_dependency_types)) {
        continue;
      }
      $entity_id = $candidate_entity
        ->id();
      $entity_uuid = $candidate_entity
        ->uuid();
      $exported_entities[] = [
        'entity_type' => $entity_type,
        'entity_id' => $entity_id,
        'entity_uuid' => $entity_uuid,
      ];
      $queued_entities[$entity_uuid] = $candidate_entity;
    }
    unset($candidate_entities);

    // Obtain the export queue.
    $queue = $this->queueFactory
      ->get('acquia_contenthub_export_queue');

    // Divide the list of entities into chunks to be processed in groups.
    $entities_per_item = $this->config
      ->get('export_queue_entities_per_item');
    $entities_per_item = $entities_per_item ?: 1;
    $chunks = array_chunk($exported_entities, $entities_per_item);
    foreach ($chunks as $entities_chunk) {
      $uuids = [];
      foreach ($entities_chunk as $entity_chunk) {
        $uuids[] = $entity_chunk['entity_uuid'];
        unset($entity_chunk['entity_uuid']);
      }
      $item = new \stdClass();
      $item->data = $entities_chunk;
      if ($queue
        ->createItem($item) === FALSE) {
        $messages = [];
        foreach ($uuids as $uuid) {
          $message = new TranslatableMarkup('(Type = @type, ID = @id, UUID = @uuid)', [
            '@type' => $queued_entities[$uuid]
              ->getEntityTypeId(),
            '@id' => $queued_entities[$uuid]
              ->id(),
            '@uuid' => $uuid,
          ]);
          $messages[] = $message
            ->jsonSerialize();
          unset($queued_entities[$uuid]);
        }
        $this->logger
          ->debug('There was an error trying to enqueue the following entities for export: @entities.', [
          '@entities' => implode(', ', $messages),
        ]);
      }
    }
    return $queued_entities;
  }

  /**
   * Obtains the number of items in the export queue.
   *
   * @return mixed
   *   The number of items in the export queue.
   */
  public function getQueueCount() {
    $queue = $this->queueFactory
      ->get('acquia_contenthub_export_queue');
    return $queue
      ->numberOfItems();
  }

  /**
   * Execute the delete function for the ACH Export Queue.
   */
  public function purgeQueue() {
    $queue = $this->queueFactory
      ->get('acquia_contenthub_export_queue');
    $queue
      ->deleteQueue();
  }

  /**
   * Obtains the Queue waiting time in seconds.
   *
   * @return int
   *   Amount in seconds of time to wait before processing an item in the queue.
   */
  public function getWaitingTime() {
    $waiting_time = $this->config
      ->get('export_queue_waiting_time');
    return $waiting_time ?: 3;
  }

  /**
   * Process all queue items with batch API.
   *
   * @param string|int $number_of_items
   *   The number of items to process.
   */
  public function processQueueItems($number_of_items = 'all') {

    // Create batch which collects all the specified queue items and process
    // them one after another.
    $batch = [
      'title' => $this
        ->t("Process Content Hub Export Queue"),
      'operations' => [],
      'finished' => '\\Drupal\\acquia_contenthub\\Controller\\ContentHubExportQueueController::batchFinished',
    ];

    // Calculate the number of items to process.
    $queue = $this->queueFactory
      ->get('acquia_contenthub_export_queue');
    $number_of_items = !is_numeric($number_of_items) ? $queue
      ->numberOfItems() : $number_of_items;

    // Count number of the items in this queue, create enough batch operations.
    $batch_size = $this->config
      ->get('export_queue_batch_size');
    $batch_size = $batch_size ?: 1;
    for ($i = 0; $i < ceil($number_of_items / $batch_size); $i++) {

      // Create batch operations.
      $batch['operations'][] = [
        '\\Drupal\\acquia_contenthub\\Controller\\ContentHubExportQueueController::batchProcess',
        [
          $number_of_items,
        ],
      ];
    }

    // Adds the batch sets.
    batch_set($batch);
  }

  /**
   * Common batch processing callback for all operations.
   *
   * @param int|string $number_of_items
   *   The number of items to process.
   * @param mixed $context
   *   The context array.
   */
  public static function batchProcess($number_of_items, &$context) {

    // Get the queue implementation for acquia_contenthub_export_queue.
    $queue_factory = \Drupal::service('queue');
    $queue = $queue_factory
      ->get('acquia_contenthub_export_queue');
    $queue_manager = \Drupal::service('plugin.manager.queue_worker');
    $queue_worker = $queue_manager
      ->createInstance('acquia_contenthub_export_queue');

    // Get the number of items.
    $config_factory = \Drupal::service('config.factory');
    $config = $config_factory
      ->get('acquia_contenthub.entity_config');
    $batch_size = $config
      ->get('export_queue_batch_size');
    $batch_size = !empty($batch_size) && is_numeric($batch_size) ? $batch_size : 1;
    $number_of_queue = $number_of_items < $batch_size ? $number_of_items : $batch_size;

    // Repeat $number_of_queue times.
    for ($i = 0; $i < $number_of_queue; $i++) {

      // Get a queued item.
      if ($item = $queue
        ->claimItem()) {
        try {

          // Generating a list of entitites.
          $entities = $item->data->data;
          $entities_list = [];
          foreach ($entities as $entity) {
            $entities_list[] = new TranslatableMarkup('(@entity_type, @entity_id)', [
              '@entity_type' => $entity['entity_type'],
              '@entity_id' => $entity['entity_id'],
            ]);
          }

          // Process item.
          try {
            $entities_processed = $queue_worker
              ->processItem($item->data);
          } catch (RequeueException $ex) {
            $entities_processed = FALSE;
          }
          if ($entities_processed == FALSE) {

            // Indicate that the item could not be processed.
            if ($entities_processed === FALSE) {
              $message = new TranslatableMarkup('There was an error processing entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
                '@entities' => implode(',', $entities_list),
              ]);
            }
            else {
              $message = new TranslatableMarkup('No processing was done for entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
                '@entities' => implode(',', $entities_list),
              ]);
            }
            $context['message'] = Html::escape($message
              ->jsonSerialize());
            $context['results'][] = Html::escape($message
              ->jsonSerialize());
          }
          else {

            // If everything was correct, delete processed item from the queue.
            $queue
              ->deleteItem($item);

            // Creating a text message to present to the user.
            $message = new TranslatableMarkup('Processed entities: @entities and their dependencies (@count @label sent).', [
              '@entities' => implode(',', $entities_list),
              '@count' => $entities_processed,
              '@label' => $entities_processed == 1 ? new TranslatableMarkup('entity') : new TranslatableMarkup('entities'),
            ]);
            $context['message'] = Html::escape($message
              ->jsonSerialize());
            $context['results'][] = Html::escape($message
              ->jsonSerialize());
          }
        } catch (SuspendQueueException $e) {

          // If there was an Exception thrown because of an error
          // Releases the item that the worker could not process.
          // Another worker can come and process it.
          $queue
            ->releaseItem($item);
          break;
        }
      }
    }
  }

  /**
   * Batch finished callback.
   *
   * @param bool $success
   *   Whether the batch process succeeded or not.
   * @param array $results
   *   The results array.
   * @param array $operations
   *   An array of operations.
   */
  public static function batchFinished($success, array $results, array $operations) {
    if ($success) {
      \Drupal::messenger()
        ->addStatus(t("The contents are successfully exported."));
    }
    else {
      $error_operation = reset($operations);
      \Drupal::messenger()
        ->addStatus(t('An error occurred while processing @operation with arguments : @args', [
        '@operation' => $error_operation[0],
        '@args' => print_r($error_operation[0], TRUE),
      ]));
    }

    // Providing a report on the items processed by the queue.
    $elements = [
      '#theme' => 'item_list',
      '#type' => 'ul',
      '#items' => $results,
    ];
    $queue_report = \Drupal::service('renderer')
      ->render($elements);
    \Drupal::messenger()
      ->addStatus($queue_report);
  }

}

Members

Namesort descending Modifiers Type Description Overrides
ContentHubExportQueueController::$config protected property Drupal\Core\Config\ImmutableConfig.
ContentHubExportQueueController::$logger protected property Logger Channel.
ContentHubExportQueueController::$queueFactory protected property The Queue Factory.
ContentHubExportQueueController::$queueManager protected property The Queue Manager.
ContentHubExportQueueController::batchFinished public static function Batch finished callback.
ContentHubExportQueueController::batchProcess public static function Common batch processing callback for all operations.
ContentHubExportQueueController::create public static function Instantiates a new instance of this class. Overrides ControllerBase::create
ContentHubExportQueueController::enqueueExportEntities public function Adds entities to the export queue.
ContentHubExportQueueController::getQueueCount public function Obtains the number of items in the export queue.
ContentHubExportQueueController::getWaitingTime public function Obtains the Queue waiting time in seconds.
ContentHubExportQueueController::processQueueItems public function Process all queue items with batch API.
ContentHubExportQueueController::purgeQueue public function Execute the delete function for the ACH Export Queue.
ContentHubExportQueueController::__construct public function
ControllerBase::$configFactory protected property The configuration factory.
ControllerBase::$currentUser protected property The current user service. 1
ControllerBase::$entityFormBuilder protected property The entity form builder.
ControllerBase::$entityManager protected property The entity manager.
ControllerBase::$entityTypeManager protected property The entity type manager.
ControllerBase::$formBuilder protected property The form builder. 2
ControllerBase::$keyValue protected property The key-value storage. 1
ControllerBase::$languageManager protected property The language manager. 1
ControllerBase::$moduleHandler protected property The module handler. 2
ControllerBase::$stateService protected property The state service.
ControllerBase::cache protected function Returns the requested cache bin.
ControllerBase::config protected function Retrieves a configuration object.
ControllerBase::container private function Returns the service container.
ControllerBase::currentUser protected function Returns the current user. 1
ControllerBase::entityFormBuilder protected function Retrieves the entity form builder.
ControllerBase::entityManager Deprecated protected function Retrieves the entity manager service.
ControllerBase::entityTypeManager protected function Retrieves the entity type manager.
ControllerBase::formBuilder protected function Returns the form builder service. 2
ControllerBase::keyValue protected function Returns a key/value storage collection. 1
ControllerBase::languageManager protected function Returns the language manager service. 1
ControllerBase::moduleHandler protected function Returns the module handler. 2
ControllerBase::redirect protected function Returns a redirect response object for the specified route. Overrides UrlGeneratorTrait::redirect
ControllerBase::state protected function Returns the state storage service.
LinkGeneratorTrait::$linkGenerator protected property The link generator. 1
LinkGeneratorTrait::getLinkGenerator Deprecated protected function Returns the link generator.
LinkGeneratorTrait::l Deprecated protected function Renders a link to a route given a route name and its parameters.
LinkGeneratorTrait::setLinkGenerator Deprecated public function Sets the link generator service.
LoggerChannelTrait::$loggerFactory protected property The logger channel factory service.
LoggerChannelTrait::getLogger protected function Gets the logger for a specific channel.
LoggerChannelTrait::setLoggerFactory public function Injects the logger channel factory.
MessengerTrait::$messenger protected property The messenger. 29
MessengerTrait::messenger public function Gets the messenger. 29
MessengerTrait::setMessenger public function Sets the messenger.
RedirectDestinationTrait::$redirectDestination protected property The redirect destination service. 1
RedirectDestinationTrait::getDestinationArray protected function Prepares a 'destination' URL query parameter for use with \Drupal\Core\Url.
RedirectDestinationTrait::getRedirectDestination protected function Returns the redirect destination service.
RedirectDestinationTrait::setRedirectDestination public function Sets the redirect destination service.
StringTranslationTrait::$stringTranslation protected property The string translation service. 1
StringTranslationTrait::formatPlural protected function Formats a string containing a count of items.
StringTranslationTrait::getNumberOfPlurals protected function Returns the number of plurals supported by a given language.
StringTranslationTrait::getStringTranslation protected function Gets the string translation service.
StringTranslationTrait::setStringTranslation public function Sets the string translation service to use. 2
StringTranslationTrait::t protected function Translates a string to the current language or to a given language.
UrlGeneratorTrait::$urlGenerator protected property The url generator.
UrlGeneratorTrait::getUrlGenerator Deprecated protected function Returns the URL generator service.
UrlGeneratorTrait::setUrlGenerator Deprecated public function Sets the URL generator service.
UrlGeneratorTrait::url Deprecated protected function Generates a URL or path for a specific route based on the given parameters.