You are here

public function MaestroEngine::cleanQueue in Maestro 3.x

Same name and namespace in other branches
  1. 8.2 src/Engine/MaestroEngine.php \Drupal\maestro\Engine\MaestroEngine::cleanQueue()

CleanQueue method This is the core method used by the orchestrator to move the process forward and to determine assignments and next steps.

File

src/Engine/MaestroEngine.php, line 1370

Class

MaestroEngine
Class MaestroEngine.

Namespace

Drupal\maestro\Engine

Code

public function cleanQueue() {
  $config = \Drupal::config('maestro.settings');
  if ($this->debug) {
    kint_require();
    \Kint::$maxLevels = 0;
  }

  // We first check to see if there are any tasks that need processing
  // we do this by looking at the queue flags to determine if the
  // task is not archived, not completed and hasn't run once.
  $query = \Drupal::entityTypeManager()
    ->getStorage('maestro_queue')
    ->getQuery();
  $query
    ->condition('archived', '0')
    ->condition('status', '0')
    ->condition('is_interactive', '0')
    ->condition('run_once', '0')
    ->condition('process_id.entity.complete', '0');
  $entity_ids = $query
    ->execute();

  // This gives us a list of entity IDs that we can use to determine the state of the process and what, if anything, we
  // have to do with this task.
  // now we need interactive tasks that have a completion status.
  $query = \Drupal::entityTypeManager()
    ->getStorage('maestro_queue')
    ->getQuery();
  $query
    ->condition('archived', '0')
    ->condition('is_interactive', '1')
    ->condition('status', '0', '<>')
    ->condition('run_once', '1')
    ->condition('process_id.entity.complete', '0');
  $entity_ids += $query
    ->execute();
  ksort($entity_ids);
  foreach ($entity_ids as $queueID) {

    // queueID is the numeric ID of the entity ID.  Load it, but first clear any cache if dev mode is on.
    if ($this->developmentMode) {
      $queueRecord = \Drupal::entityTypeManager()
        ->getStorage('maestro_queue')
        ->resetCache([
        $queueID,
      ]);
    }
    $queueRecord = \Drupal::entityTypeManager()
      ->getStorage('maestro_queue')
      ->load($queueID);
    $processID = $queueRecord->process_id
      ->getString();
    if ($this->developmentMode) {
      $processRecord = \Drupal::entityTypeManager()
        ->getStorage('maestro_process')
        ->resetCache([
        $processID,
      ]);
    }
    $processRecord = \Drupal::entityTypeManager()
      ->getStorage('maestro_process')
      ->load($processID);
    $taskClassName = $queueRecord->task_class_name
      ->getString();
    $taskID = $queueRecord->task_id
      ->getString();
    $templateMachineName = $processRecord->template_id
      ->getString();
    if ($processRecord->complete
      ->getString() == '0') {

      // Execute it!
      $task = $this
        ->getPluginTask($taskClassName, $processID, $queueID);

      // Its a task and not an interactive task.
      if ($task && !$task
        ->isInteractive()) {
        $result = $task
          ->execute();
        if ($result === TRUE) {

          // We now set the task's status and create the next task instance!
          $queueRecord
            ->set('status', $task
            ->getExecutionStatus());
          $queueRecord
            ->set('completed', time());
          $queueRecord
            ->save();
          $this
            ->nextStep($templateMachineName, $taskID, $processID, $task
            ->getCompletionStatus());
          $this
            ->archiveTask($queueID);
        }
      }
      else {

        // If it IS an interactive task.
        if ($task && $task
          ->isInteractive()) {
          $this
            ->nextStep($templateMachineName, $taskID, $processID, $task
            ->getCompletionStatus());
          $this
            ->archiveTask($queueID);
        }
        else {

          // This plugin task definition doesn't exist and its not interactive, however, throwing an exception will lock up the engine.
          // we will throw the exception here knowing that this exception will lock the engine at this point.
          // This process is doomed for failure anyway, too bad it's causing the engine to stall...
          // suggestion here is to create a new status of "error", apply it to the task, which the engine will skip over and we can report on easily with views.
          throw new MaestroGeneralException('Task definition doesn\'t exist. TaskID:' . $taskID . ' in ProcessID:' . $processID . ' is not flagged as interactive or non-interactive.');
        }
      }
    }

    //end if process is not completed (0)
  }

  //end foreach through open queue items

  // Retaining this section for now to determine if we should be closing any open tasks that have a process that is complete.
  // this has no effect on the engine either way.

  /* //Now we check for queue items that need to be archived
     //These are tasks that have a status of 1 (complete) and are not yet archived
     $query = \Drupal::entityTypeManager()->getStorage('maestro_queue')->getQuery();
     $query->condition('archived', '0')
     ->condition('status', '0', '<>')
     ->condition('process_id.entity.complete', '0');
     $entity_ids = $query->execute();
     foreach($entity_ids as $queueID) {
     //TODO: pre-archive hook?
     $this->archiveTask($queueID);
     //TODO: post archive hook?
     } */

  // TODO: pull notifications out like this to its own cron hook?
  if ($config
    ->get('maestro_send_notifications')) {
    $currentTime = time();

    // So now only for interactive tasks that are not complete and have aged beyond their reminder intervals.
    $query = \Drupal::entityTypeManager()
      ->getStorage('maestro_queue')
      ->getQuery();
    $query
      ->condition('archived', '0')
      ->condition('is_interactive', '1')
      ->condition('status', '0')
      ->condition('run_once', '1')
      ->condition('next_reminder_time', $currentTime, '<')
      ->condition('next_reminder_time', '0', '<>')
      ->condition('reminder_interval', '0', '>');
    $entity_ids = $query
      ->execute();

    // Now for each of these entity_ids, we send out a reminder.
    foreach ($entity_ids as $queueID) {

      // We know that because we're in this loop, these interactive tasks require reminders.
      if ($this->developmentMode) {
        $queueRecord = \Drupal::entityTypeManager()
          ->getStorage('maestro_queue')
          ->resetCache([
          $queueID,
        ]);
      }
      $queueRecord = \Drupal::entityTypeManager()
        ->getStorage('maestro_queue')
        ->load($queueID);
      $taskMachineName = $queueRecord->task_id
        ->getString();
      $templateMachineName = MaestroEngine::getTemplateIdFromProcessId($queueRecord->process_id
        ->getString());

      // Just days * seconds to get an offset.
      $reminderInterval = intval($queueRecord->reminder_interval
        ->getString()) * 86400;

      // we're in here because we need a reminder.  So do it.
      $this
        ->doProductionAssignmentNotifications($templateMachineName, $taskMachineName, $queueID, 'reminder');
      $queueRecord
        ->set('next_reminder_time', $currentTime + $reminderInterval);
      $queueRecord
        ->set('num_reminders_sent', intval($queueRecord->num_reminders_sent
        ->getString()) + 1);
      $queueRecord
        ->save();
    }

    // Now for escalations.
    $query = \Drupal::entityTypeManager()
      ->getStorage('maestro_queue')
      ->getQuery();
    $query
      ->condition('archived', '0')
      ->condition('is_interactive', '1')
      ->condition('status', '0')
      ->condition('run_once', '1')
      ->condition('escalation_interval', 0, '>');
    $entity_ids = $query
      ->execute();
    foreach ($entity_ids as $queueID) {

      // So for only those queue records that have an escalation interval
      // has this task aged beyond the escalation interval number of days since it was created?  if so, notify.
      if ($this->developmentMode) {
        $queueRecord = \Drupal::entityTypeManager()
          ->getStorage('maestro_queue')
          ->resetCache([
          $queueID,
        ]);
      }
      $queueRecord = \Drupal::entityTypeManager()
        ->getStorage('maestro_queue')
        ->load($queueID);
      $taskMachineName = $queueRecord->task_id
        ->getString();
      $templateMachineName = MaestroEngine::getTemplateIdFromProcessId($queueRecord->process_id
        ->getString());
      $createdTime = $queueRecord->created
        ->getString();
      $numberOfEscalationsSent = intval($queueRecord->num_escalations_sent
        ->getString());

      // First time through, numberOfEscalations is 0... second time it's 1 etc.
      // that means that our interval needs to be numberOfEscalations +1 * the offset of the escalation in days.
      $escalationInterval = (1 + $numberOfEscalationsSent) * (intval($queueRecord->escalation_interval
        ->getString()) * 86400);
      if ($currentTime > $createdTime + $escalationInterval) {

        // We need to send out an escalation.
        $this
          ->doProductionAssignmentNotifications($templateMachineName, $taskMachineName, $queueID, 'escalation');
        $queueRecord
          ->set('last_escalation_time', $currentTime);
        $queueRecord
          ->set('num_escalations_sent', intval($queueRecord->num_escalations_sent
          ->getString()) + 1);
        $queueRecord
          ->save();
      }
    }
  }
}