You are here

class TaskManager in Search API 8

Provides a service for managing pending tasks.

Tasks are executed by this service by dispatching an event with the class \Drupal\search_api\Task\TaskEvent and the name "search_api.task.TYPE", where TYPE is the type of task. Any module wishing to employ the Search API task system can therefore just create events of any type they want as long as they have a subscriber listening to events with the corresponding name.

Contrib modules should, however, always prefix TYPE with their module short name, followed by a period, to avoid collisions.

The system is used by the Search API module itself in the following ways:

Hierarchy

Expanded class hierarchy of TaskManager

See also

\Drupal\search_api\Task\TaskEvent

1 string reference to 'TaskManager'
search_api.services.yml in ./search_api.services.yml
search_api.services.yml
1 service uses TaskManager
search_api.task_manager in ./search_api.services.yml
Drupal\search_api\Task\TaskManager

File

src/Task/TaskManager.php, line 43

Namespace

Drupal\search_api\Task
View source
class TaskManager implements TaskManagerInterface {
  use DependencySerializationTrait;
  use StringTranslationTrait;

  /**
   * The entity type manager.
   *
   * @var \Drupal\Core\Entity\EntityTypeManagerInterface
   */
  protected $entityTypeManager;

  /**
   * The event dispatcher.
   *
   * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
   */
  protected $eventDispatcher;

  /**
   * The messenger service.
   *
   * @var \Drupal\Core\Messenger\MessengerInterface
   */
  protected $messenger;

  /**
   * Constructs a TaskManager object.
   *
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
   *   The entity type manager.
   * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
   *   The event dispatcher.
   * @param \Drupal\Core\StringTranslation\TranslationInterface $translation
   *   The string translation service.
   * @param \Drupal\Core\Messenger\MessengerInterface $messenger
   *   The messenger.
   */
  public function __construct(EntityTypeManagerInterface $entity_type_manager, EventDispatcherInterface $event_dispatcher, TranslationInterface $translation, MessengerInterface $messenger) {
    $this->entityTypeManager = $entity_type_manager;
    $this->eventDispatcher = $event_dispatcher;
    $this
      ->setStringTranslation($translation);
    $this->messenger = $messenger;
  }

  /**
   * Returns the entity storage for search tasks.
   *
   * @return \Drupal\Core\Entity\EntityStorageInterface
   *   The storage handler.
   */
  protected function getTaskStorage() {
    return $this->entityTypeManager
      ->getStorage('search_api_task');
  }

  /**
   * Creates an entity query matching the given search tasks.
   *
   * @param array $conditions
   *   (optional) An array of conditions to be matched for the tasks, with
   *   property names keyed to the value (or values, for multiple possibilities)
   *   that the property should have.
   *
   * @return \Drupal\Core\Entity\Query\QueryInterface
   *   An entity query for search tasks.
   */
  protected function getTasksQuery(array $conditions = []) {
    $query = $this
      ->getTaskStorage()
      ->getQuery()
      ->accessCheck(FALSE);
    foreach ($conditions as $property => $values) {
      if ($values === NULL) {
        $query
          ->notExists($property);
      }
      else {
        $query
          ->condition($property, $values, is_array($values) ? 'IN' : '=');
      }
    }
    $query
      ->sort('id');
    return $query;
  }

  /**
   * {@inheritdoc}
   */
  public function getTasksCount(array $conditions = []) {
    return $this
      ->getTasksQuery($conditions)
      ->count()
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public function addTask($type, ServerInterface $server = NULL, IndexInterface $index = NULL, $data = NULL) {
    $server_id = $server ? $server
      ->id() : NULL;
    $index_id = $index ? $index
      ->id() : NULL;
    if (isset($data)) {
      if ($data instanceof EntityInterface) {
        $data = [
          '#entity_type' => $data
            ->getEntityTypeId(),
          '#values' => $data
            ->toArray(),
        ];
      }
      $data = serialize($data);
    }
    $result = $this
      ->getTasksQuery([
      'type' => $type,
      'server_id' => $server_id,
      'index_id' => $index_id,
      'data' => $data,
    ])
      ->execute();
    if ($result) {
      return $this
        ->getTaskStorage()
        ->load(reset($result));
    }
    $task = $this
      ->getTaskStorage()
      ->create([
      'type' => $type,
      'server_id' => $server_id,
      'index_id' => $index_id,
      'data' => $data,
    ]);
    $task
      ->save();
    return $task;
  }

  /**
   * {@inheritdoc}
   */
  public function loadTasks(array $conditions = []) {
    $task_ids = $this
      ->getTasksQuery($conditions)
      ->execute();
    if ($task_ids) {
      return $this
        ->getTaskStorage()
        ->loadMultiple($task_ids);
    }
    return [];
  }

  /**
   * {@inheritdoc}
   */
  public function deleteTask($task_id) {
    $task = $this
      ->getTaskStorage()
      ->load($task_id);
    if ($task) {
      $task
        ->delete();
    }
  }

  /**
   * {@inheritdoc}
   */
  public function deleteTasks(array $conditions = []) {
    $storage = $this
      ->getTaskStorage();
    while (TRUE) {
      $task_ids = $this
        ->getTasksQuery($conditions)
        ->range(0, 100)
        ->execute();
      if (!$task_ids) {
        break;
      }
      $tasks = $storage
        ->loadMultiple($task_ids);
      $storage
        ->delete($tasks);
      if (count($task_ids) < 100) {
        break;
      }
    }
  }

  /**
   * {@inheritdoc}
   */
  public function executeSpecificTask(TaskInterface $task) {
    $event = new TaskEvent($task);
    $this->eventDispatcher
      ->dispatch('search_api.task.' . $task
      ->getType(), $event);
    if (!$event
      ->isPropagationStopped()) {
      $id = $task
        ->id();
      $type = $task
        ->getType();
      throw new SearchApiException("Could not execute task #{$id} of type '{$type}'. Type seems to be unknown.");
    }
    if ($exception = $event
      ->getException()) {
      throw $exception;
    }
    $task
      ->delete();
  }

  /**
   * {@inheritdoc}
   */
  public function executeSingleTask(array $conditions = []) {
    $task_id = $this
      ->getTasksQuery($conditions)
      ->range(0, 1)
      ->execute();
    if ($task_id) {
      $task_id = reset($task_id);

      /** @var \Drupal\search_api\Task\TaskInterface $task */
      $task = $this
        ->getTaskStorage()
        ->load($task_id);
      $this
        ->executeSpecificTask($task);
      return TRUE;
    }
    return FALSE;
  }

  /**
   * {@inheritdoc}
   */
  public function executeAllTasks(array $conditions = [], $limit = NULL) {

    // We have to use this roundabout way because tasks, during their execution,
    // might create additional tasks. (For example, see
    // \Drupal\search_api\Task\IndexTaskManager::trackItems().)
    $executed = 0;
    while (TRUE) {
      $query = $this
        ->getTasksQuery($conditions);
      if (isset($limit)) {
        $query
          ->range(0, $limit - $executed);
      }
      $task_ids = $query
        ->execute();
      if (!$task_ids) {
        break;
      }

      // We can't use multi-load here as a task might delete other tasks, so we
      // have to make sure each tasks still exists right before it is executed.
      foreach ($task_ids as $task_id) {

        /** @var \Drupal\search_api\Task\TaskInterface $task */
        $task = $this
          ->getTaskStorage()
          ->load($task_id);
        if ($task) {
          $this
            ->executeSpecificTask($task);
        }
        else {
          --$executed;
        }
      }
      $executed += count($task_ids);
      if (isset($limit) && $executed >= $limit) {
        break;
      }
    }
    return !$this
      ->getTasksCount($conditions);
  }

  /**
   * {@inheritdoc}
   */
  public function setTasksBatch(array $conditions = []) {
    $task_ids = $this
      ->getTasksQuery($conditions)
      ->range(0, 100)
      ->execute();
    if (!$task_ids) {
      return;
    }
    $batch_definition = [
      'operations' => [
        [
          [
            $this,
            'processBatch',
          ],
          [
            $task_ids,
            $conditions,
          ],
        ],
      ],
      'finished' => [
        $this,
        'finishBatch',
      ],
    ];

    // If called inside of Drush, we want to start the batch immediately.
    // However, we first need to determine whether there already is one running,
    // since we don't want to start a second one – our new batch will
    // automatically be appended to the currently running batch operation.
    $batch = batch_get();
    $run_drush_batch = function_exists('drush_backend_batch_process') && empty($batch['running']);

    // Schedule the batch.
    batch_set($batch_definition);

    // Now run the Drush batch, if applicable.
    if ($run_drush_batch) {
      $result = drush_backend_batch_process();

      // Drush performs batch processing in a separate PHP request. When the
      // last batch is processed the batch list is cleared, but this only takes
      // effect in the other request. Take the same action here to ensure that
      // we are not requeueing stale batches when there are multiple tasks being
      // handled in a single request.
      // (Drush 9.6 changed the structure of $result, so check for both variants
      // as long as we support earlier Drush versions, too.)
      if (!empty($result['context']['drush_batch_process_finished']) || !empty($result['drush_batch_process_finished'])) {
        $batch =& batch_get();
        $batch = NULL;
        unset($batch);
      }
    }
  }

  /**
   * Processes a single pending task as part of a batch operation.
   *
   * @param int[] $task_ids
   *   An array of task IDs to execute. Might not contain all task IDs.
   * @param array $conditions
   *   An array of conditions defining the tasks to be executed. Should be used
   *   to retrieve more task IDs if necessary.
   * @param array|\ArrayAccess $context
   *   The context of the current batch, as defined in the @link batch Batch
   *   operations @endlink documentation.
   *
   * @throws \Drupal\search_api\SearchApiException
   *   Thrown if any error occurred while processing the task.
   */
  public function processBatch(array $task_ids, array $conditions, &$context) {

    // Initialize context information.
    if (!isset($context['sandbox']['task_ids'])) {
      $context['sandbox']['task_ids'] = $task_ids;
    }
    if (!isset($context['results']['total'])) {
      $context['results']['total'] = $this
        ->getTasksCount($conditions);
    }
    $task_id = array_shift($context['sandbox']['task_ids']);

    /** @var \Drupal\search_api\Task\TaskInterface $task */
    $task = $this
      ->getTaskStorage()
      ->load($task_id);
    if ($task) {
      $this
        ->executeSpecificTask($task);
    }
    if (!$context['sandbox']['task_ids']) {
      $context['sandbox']['task_ids'] = $this
        ->getTasksQuery($conditions)
        ->range(0, 100)
        ->execute();
      if (!$context['sandbox']['task_ids']) {
        $context['finished'] = 1;
        return;
      }
    }
    $pending = $this
      ->getTasksCount($conditions);
    $context['finished'] = 1 - $pending / $context['results']['total'];
    $executed = $context['results']['total'] - $pending;
    if ($executed > 0) {
      $context['message'] = $this
        ->formatPlural($executed, 'Successfully executed @count pending task.', 'Successfully executed @count pending tasks.');
    }
  }

  /**
   * Finishes an "execute tasks" batch.
   *
   * @param bool $success
   *   Indicates whether the batch process was successful.
   * @param array $results
   *   Results information passed from the processing callback.
   */
  public function finishBatch($success, array $results) {

    // Check if the batch job was successful.
    if ($success) {
      $message = $this
        ->formatPlural($results['total'], 'Successfully executed @count pending task.', 'Successfully executed @count pending tasks.');
      $this->messenger
        ->addStatus($message);
    }
    else {

      // Notify the user about the batch job failure.
      $this->messenger
        ->addError($this
        ->t('An error occurred while trying to execute tasks. Check the logs for details.'));
    }
  }

}

Members

Namesort descending Modifiers Type Description Overrides
DependencySerializationTrait::$_entityStorages protected property An array of entity type IDs keyed by the property name of their storages.
DependencySerializationTrait::$_serviceIds protected property An array of service IDs keyed by property name used for serialization.
DependencySerializationTrait::__sleep public function 1
DependencySerializationTrait::__wakeup public function 2
StringTranslationTrait::$stringTranslation protected property The string translation service. 1
StringTranslationTrait::formatPlural protected function Formats a string containing a count of items.
StringTranslationTrait::getNumberOfPlurals protected function Returns the number of plurals supported by a given language.
StringTranslationTrait::getStringTranslation protected function Gets the string translation service.
StringTranslationTrait::setStringTranslation public function Sets the string translation service to use. 2
StringTranslationTrait::t protected function Translates a string to the current language or to a given language.
TaskManager::$entityTypeManager protected property The entity type manager.
TaskManager::$eventDispatcher protected property The event dispatcher.
TaskManager::$messenger protected property The messenger service.
TaskManager::addTask public function Adds a new pending task. Overrides TaskManagerInterface::addTask
TaskManager::deleteTask public function Deletes the task with the given ID. Overrides TaskManagerInterface::deleteTask
TaskManager::deleteTasks public function Deletes all tasks that fulfil a certain set of conditions. Overrides TaskManagerInterface::deleteTasks
TaskManager::executeAllTasks public function Executes all (or some) pending tasks. Overrides TaskManagerInterface::executeAllTasks
TaskManager::executeSingleTask public function Retrieves and executes a single task. Overrides TaskManagerInterface::executeSingleTask
TaskManager::executeSpecificTask public function Executes and deletes the given task. Overrides TaskManagerInterface::executeSpecificTask
TaskManager::finishBatch public function Finishes an "execute tasks" batch.
TaskManager::getTasksCount public function Retrieves the number of pending tasks for the given conditions. Overrides TaskManagerInterface::getTasksCount
TaskManager::getTasksQuery protected function Creates an entity query matching the given search tasks.
TaskManager::getTaskStorage protected function Returns the entity storage for search tasks.
TaskManager::loadTasks public function Load all tasks matching the given conditions. Overrides TaskManagerInterface::loadTasks
TaskManager::processBatch public function Processes a single pending task as part of a batch operation.
TaskManager::setTasksBatch public function Sets a batch for executing all pending tasks. Overrides TaskManagerInterface::setTasksBatch
TaskManager::__construct public function Constructs a TaskManager object.