You are here

class Processor in Advanced Queue 8

Provides the default queue processor.

@todo Throw events.

Hierarchy

Expanded class hierarchy of Processor

2 string references to 'Processor'
advancedqueue.schema.yml in config/schema/advancedqueue.schema.yml
config/schema/advancedqueue.schema.yml
advancedqueue.services.yml in ./advancedqueue.services.yml
advancedqueue.services.yml
1 service uses Processor
advancedqueue.processor in ./advancedqueue.services.yml
Drupal\advancedqueue\Processor

File

src/Processor.php, line 16

Namespace

Drupal\advancedqueue
View source
class Processor implements ProcessorInterface {

  /**
   * The event dispatcher.
   *
   * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
   */
  protected $eventDispatcher;

  /**
   * The current time.
   *
   * @var \Drupal\Component\Datetime\TimeInterface
   */
  protected $time;

  /**
   * The job type manager.
   *
   * @var \Drupal\advancedqueue\JobTypeManager
   */
  protected $jobTypeManager;

  /**
   * Constructs a new Processor object.
   *
   * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
   *   The event dispatcher.
   * @param \Drupal\Component\Datetime\TimeInterface $time
   *   The current time.
   * @param \Drupal\advancedqueue\JobTypeManager $job_type_manager
   *   The queue job type manager.
   */
  public function __construct(EventDispatcherInterface $event_dispatcher, TimeInterface $time, JobTypeManager $job_type_manager) {
    $this->eventDispatcher = $event_dispatcher;
    $this->time = $time;
    $this->jobTypeManager = $job_type_manager;
  }

  /**
   * {@inheritdoc}
   */
  public function processQueue(QueueInterface $queue) {

    // Start from a clean slate.
    $queue
      ->getBackend()
      ->cleanupQueue();

    // Allow unlimited processing time only on the CLI.
    $processing_time = $queue
      ->getProcessingTime();
    if ($processing_time == 0 && PHP_SAPI != 'cli') {
      $processing_time = 90;
    }
    $expected_end = $this->time
      ->getCurrentTime() + $processing_time;
    $num_processed = 0;
    while (TRUE) {
      $job = $queue
        ->getBackend()
        ->claimJob();
      if (!$job) {

        // The queue is empty. Stop here.
        break;
      }
      $this
        ->processJob($job, $queue);
      $num_processed++;
      if ($processing_time && $this->time
        ->getCurrentTime() >= $expected_end) {

        // Time limit reached. Stop here.
        break;
      }
    }
    return $num_processed;
  }

  /**
   * {@inheritdoc}
   */
  public function processJob(Job $job, QueueInterface $queue) {
    $this->eventDispatcher
      ->dispatch(AdvancedQueueEvents::PRE_PROCESS, new JobEvent($job));
    try {
      $job_type = $this->jobTypeManager
        ->createInstance($job
        ->getType());
      $result = $job_type
        ->process($job);
    } catch (\Exception $e) {
      $job_type = NULL;
      $result = JobResult::failure($e
        ->getMessage());
      watchdog_exception('cron', $e);
    }

    // Update the job with the result.
    $job
      ->setState($result
      ->getState());
    $job
      ->setMessage($result
      ->getMessage());
    $this->eventDispatcher
      ->dispatch(AdvancedQueueEvents::POST_PROCESS, new JobEvent($job));

    // Pass the job back to the backend.
    $queue_backend = $queue
      ->getBackend();
    if ($job
      ->getState() == Job::STATE_SUCCESS) {
      $queue_backend
        ->onSuccess($job);
    }
    elseif ($job
      ->getState() == Job::STATE_FAILURE && !$job_type) {

      // The job failed because of an exception, no need to retry.
      $queue_backend
        ->onFailure($job);
    }
    elseif ($job
      ->getState() == Job::STATE_FAILURE && $job_type) {
      $max_retries = !is_null($result
        ->getMaxRetries()) ? $result
        ->getMaxRetries() : $job_type
        ->getMaxRetries();
      $retry_delay = !is_null($result
        ->getRetryDelay()) ? $result
        ->getRetryDelay() : $job_type
        ->getRetryDelay();
      if ($job
        ->getNumRetries() < $max_retries) {
        $queue_backend
          ->retryJob($job, $retry_delay);
      }
      else {
        $queue_backend
          ->onFailure($job);
      }
    }
    return $result;
  }

}

Members

Namesort descending Modifiers Type Description Overrides
Processor::$eventDispatcher protected property The event dispatcher.
Processor::$jobTypeManager protected property The job type manager.
Processor::$time protected property The current time.
Processor::processJob public function Processes the given job. Overrides ProcessorInterface::processJob
Processor::processQueue public function Processes the given queue. Overrides ProcessorInterface::processQueue
Processor::__construct public function Constructs a new Processor object.