public function MaestroEngine::cleanQueue in Maestro 3.x
Same name and namespace in other branches
- 8.2 src/Engine/MaestroEngine.php \Drupal\maestro\Engine\MaestroEngine::cleanQueue()
CleanQueue method This is the core method used by the orchestrator to move the process forward and to determine assignments and next steps.
File
- src/
Engine/ MaestroEngine.php, line 1370
Class
- MaestroEngine
- Class MaestroEngine.
Namespace
Drupal\maestro\EngineCode
public function cleanQueue() {
$config = \Drupal::config('maestro.settings');
if ($this->debug) {
kint_require();
\Kint::$maxLevels = 0;
}
// We first check to see if there are any tasks that need processing
// we do this by looking at the queue flags to determine if the
// task is not archived, not completed and hasn't run once.
$query = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->getQuery();
$query
->condition('archived', '0')
->condition('status', '0')
->condition('is_interactive', '0')
->condition('run_once', '0')
->condition('process_id.entity.complete', '0');
$entity_ids = $query
->execute();
// This gives us a list of entity IDs that we can use to determine the state of the process and what, if anything, we
// have to do with this task.
// now we need interactive tasks that have a completion status.
$query = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->getQuery();
$query
->condition('archived', '0')
->condition('is_interactive', '1')
->condition('status', '0', '<>')
->condition('run_once', '1')
->condition('process_id.entity.complete', '0');
$entity_ids += $query
->execute();
ksort($entity_ids);
foreach ($entity_ids as $queueID) {
// queueID is the numeric ID of the entity ID. Load it, but first clear any cache if dev mode is on.
if ($this->developmentMode) {
$queueRecord = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->resetCache([
$queueID,
]);
}
$queueRecord = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->load($queueID);
$processID = $queueRecord->process_id
->getString();
if ($this->developmentMode) {
$processRecord = \Drupal::entityTypeManager()
->getStorage('maestro_process')
->resetCache([
$processID,
]);
}
$processRecord = \Drupal::entityTypeManager()
->getStorage('maestro_process')
->load($processID);
$taskClassName = $queueRecord->task_class_name
->getString();
$taskID = $queueRecord->task_id
->getString();
$templateMachineName = $processRecord->template_id
->getString();
if ($processRecord->complete
->getString() == '0') {
// Execute it!
$task = $this
->getPluginTask($taskClassName, $processID, $queueID);
// Its a task and not an interactive task.
if ($task && !$task
->isInteractive()) {
$result = $task
->execute();
if ($result === TRUE) {
// We now set the task's status and create the next task instance!
$queueRecord
->set('status', $task
->getExecutionStatus());
$queueRecord
->set('completed', time());
$queueRecord
->save();
$this
->nextStep($templateMachineName, $taskID, $processID, $task
->getCompletionStatus());
$this
->archiveTask($queueID);
}
}
else {
// If it IS an interactive task.
if ($task && $task
->isInteractive()) {
$this
->nextStep($templateMachineName, $taskID, $processID, $task
->getCompletionStatus());
$this
->archiveTask($queueID);
}
else {
// This plugin task definition doesn't exist and its not interactive, however, throwing an exception will lock up the engine.
// we will throw the exception here knowing that this exception will lock the engine at this point.
// This process is doomed for failure anyway, too bad it's causing the engine to stall...
// suggestion here is to create a new status of "error", apply it to the task, which the engine will skip over and we can report on easily with views.
throw new MaestroGeneralException('Task definition doesn\'t exist. TaskID:' . $taskID . ' in ProcessID:' . $processID . ' is not flagged as interactive or non-interactive.');
}
}
}
//end if process is not completed (0)
}
//end foreach through open queue items
// Retaining this section for now to determine if we should be closing any open tasks that have a process that is complete.
// this has no effect on the engine either way.
/* //Now we check for queue items that need to be archived
//These are tasks that have a status of 1 (complete) and are not yet archived
$query = \Drupal::entityTypeManager()->getStorage('maestro_queue')->getQuery();
$query->condition('archived', '0')
->condition('status', '0', '<>')
->condition('process_id.entity.complete', '0');
$entity_ids = $query->execute();
foreach($entity_ids as $queueID) {
//TODO: pre-archive hook?
$this->archiveTask($queueID);
//TODO: post archive hook?
} */
// TODO: pull notifications out like this to its own cron hook?
if ($config
->get('maestro_send_notifications')) {
$currentTime = time();
// So now only for interactive tasks that are not complete and have aged beyond their reminder intervals.
$query = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->getQuery();
$query
->condition('archived', '0')
->condition('is_interactive', '1')
->condition('status', '0')
->condition('run_once', '1')
->condition('next_reminder_time', $currentTime, '<')
->condition('next_reminder_time', '0', '<>')
->condition('reminder_interval', '0', '>');
$entity_ids = $query
->execute();
// Now for each of these entity_ids, we send out a reminder.
foreach ($entity_ids as $queueID) {
// We know that because we're in this loop, these interactive tasks require reminders.
if ($this->developmentMode) {
$queueRecord = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->resetCache([
$queueID,
]);
}
$queueRecord = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->load($queueID);
$taskMachineName = $queueRecord->task_id
->getString();
$templateMachineName = MaestroEngine::getTemplateIdFromProcessId($queueRecord->process_id
->getString());
// Just days * seconds to get an offset.
$reminderInterval = intval($queueRecord->reminder_interval
->getString()) * 86400;
// we're in here because we need a reminder. So do it.
$this
->doProductionAssignmentNotifications($templateMachineName, $taskMachineName, $queueID, 'reminder');
$queueRecord
->set('next_reminder_time', $currentTime + $reminderInterval);
$queueRecord
->set('num_reminders_sent', intval($queueRecord->num_reminders_sent
->getString()) + 1);
$queueRecord
->save();
}
// Now for escalations.
$query = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->getQuery();
$query
->condition('archived', '0')
->condition('is_interactive', '1')
->condition('status', '0')
->condition('run_once', '1')
->condition('escalation_interval', 0, '>');
$entity_ids = $query
->execute();
foreach ($entity_ids as $queueID) {
// So for only those queue records that have an escalation interval
// has this task aged beyond the escalation interval number of days since it was created? if so, notify.
if ($this->developmentMode) {
$queueRecord = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->resetCache([
$queueID,
]);
}
$queueRecord = \Drupal::entityTypeManager()
->getStorage('maestro_queue')
->load($queueID);
$taskMachineName = $queueRecord->task_id
->getString();
$templateMachineName = MaestroEngine::getTemplateIdFromProcessId($queueRecord->process_id
->getString());
$createdTime = $queueRecord->created
->getString();
$numberOfEscalationsSent = intval($queueRecord->num_escalations_sent
->getString());
// First time through, numberOfEscalations is 0... second time it's 1 etc.
// that means that our interval needs to be numberOfEscalations +1 * the offset of the escalation in days.
$escalationInterval = (1 + $numberOfEscalationsSent) * (intval($queueRecord->escalation_interval
->getString()) * 86400);
if ($currentTime > $createdTime + $escalationInterval) {
// We need to send out an escalation.
$this
->doProductionAssignmentNotifications($templateMachineName, $taskMachineName, $queueID, 'escalation');
$queueRecord
->set('last_escalation_time', $currentTime);
$queueRecord
->set('num_escalations_sent', intval($queueRecord->num_escalations_sent
->getString()) + 1);
$queueRecord
->save();
}
}
}
}