You are here

public static function ContentHubExportQueueController::batchProcess in Acquia Content Hub 8

Common batch processing callback for all operations.

Parameters

int|string $number_of_items: The number of items to process.

mixed $context: The context array.

File

src/Controller/ContentHubExportQueueController.php, line 217

Class

ContentHubExportQueueController
Implements an Export Queue Controller for Content Hub.

Namespace

Drupal\acquia_contenthub\Controller

Code

public static function batchProcess($number_of_items, &$context) {

  // Get the queue implementation for acquia_contenthub_export_queue.
  $queue_factory = \Drupal::service('queue');
  $queue = $queue_factory
    ->get('acquia_contenthub_export_queue');
  $queue_manager = \Drupal::service('plugin.manager.queue_worker');
  $queue_worker = $queue_manager
    ->createInstance('acquia_contenthub_export_queue');

  // Get the number of items.
  $config_factory = \Drupal::service('config.factory');
  $config = $config_factory
    ->get('acquia_contenthub.entity_config');
  $batch_size = $config
    ->get('export_queue_batch_size');
  $batch_size = !empty($batch_size) && is_numeric($batch_size) ? $batch_size : 1;
  $number_of_queue = $number_of_items < $batch_size ? $number_of_items : $batch_size;

  // Repeat $number_of_queue times.
  for ($i = 0; $i < $number_of_queue; $i++) {

    // Get a queued item.
    if ($item = $queue
      ->claimItem()) {
      try {

        // Generating a list of entitites.
        $entities = $item->data->data;
        $entities_list = [];
        foreach ($entities as $entity) {
          $entities_list[] = new TranslatableMarkup('(@entity_type, @entity_id)', [
            '@entity_type' => $entity['entity_type'],
            '@entity_id' => $entity['entity_id'],
          ]);
        }

        // Process item.
        try {
          $entities_processed = $queue_worker
            ->processItem($item->data);
        } catch (RequeueException $ex) {
          $entities_processed = FALSE;
        }
        if ($entities_processed == FALSE) {

          // Indicate that the item could not be processed.
          if ($entities_processed === FALSE) {
            $message = new TranslatableMarkup('There was an error processing entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
              '@entities' => implode(',', $entities_list),
            ]);
          }
          else {
            $message = new TranslatableMarkup('No processing was done for entities: @entities and their dependencies. The item has been sent back to the queue to be processed again later. Check your logs for more info.', [
              '@entities' => implode(',', $entities_list),
            ]);
          }
          $context['message'] = Html::escape($message
            ->jsonSerialize());
          $context['results'][] = Html::escape($message
            ->jsonSerialize());
        }
        else {

          // If everything was correct, delete processed item from the queue.
          $queue
            ->deleteItem($item);

          // Creating a text message to present to the user.
          $message = new TranslatableMarkup('Processed entities: @entities and their dependencies (@count @label sent).', [
            '@entities' => implode(',', $entities_list),
            '@count' => $entities_processed,
            '@label' => $entities_processed == 1 ? new TranslatableMarkup('entity') : new TranslatableMarkup('entities'),
          ]);
          $context['message'] = Html::escape($message
            ->jsonSerialize());
          $context['results'][] = Html::escape($message
            ->jsonSerialize());
        }
      } catch (SuspendQueueException $e) {

        // If there was an Exception thrown because of an error
        // Releases the item that the worker could not process.
        // Another worker can come and process it.
        $queue
          ->releaseItem($item);
        break;
      }
    }
  }
}