You are here

class JobExecutor in Apigee Edge 8

Job executor service.

Hierarchy

Expanded class hierarchy of JobExecutor

2 files declare their use of JobExecutor
apigee_edge.module in ./apigee_edge.module
Copyright 2018 Google Inc.
JobCreatorTrait.php in src/Job/JobCreatorTrait.php
1 string reference to 'JobExecutor'
apigee_edge.services.yml in ./apigee_edge.services.yml
apigee_edge.services.yml
1 service uses JobExecutor
apigee_edge.job_executor in ./apigee_edge.services.yml
Drupal\apigee_edge\JobExecutor

File

src/JobExecutor.php, line 30

Namespace

Drupal\apigee_edge
View source
class JobExecutor implements JobExecutorInterface {

  /**
   * Database connection.
   *
   * @var \Drupal\Core\Database\Connection
   */
  protected $connection;

  /**
   * Time service.
   *
   * @var \Drupal\Component\Datetime\TimeInterface
   */
  protected $time;

  /**
   * The 'apigee_edge_job' queue.
   *
   * @var \Drupal\Core\Queue\QueueInterface
   */
  protected $queue;

  /**
   * JobExecutor constructor.
   *
   * @param \Drupal\Core\Database\Connection $connection
   *   Database connection.
   * @param \Drupal\Component\Datetime\TimeInterface $time
   *   Time interface.
   * @param \Drupal\Core\Queue\QueueFactory $queue_factory
   *   Queue factory.
   */
  public function __construct(Connection $connection, TimeInterface $time, QueueFactory $queue_factory) {
    $this->connection = $connection;
    $this->time = $time;
    $this->queue = $queue_factory
      ->get('apigee_edge_job');
  }

  /**
   * Ensures that a job exists with a given status.
   *
   * @param \Drupal\apigee_edge\Job\Job $job
   *   Job object.
   * @param int $status
   *   Job status.
   */
  protected function ensure(Job $job, int $status) {
    if ($job
      ->getStatus() !== $status) {
      $job
        ->setStatus($status);
      $this
        ->save($job);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function save(Job $job) {
    $now = $this->time
      ->getCurrentTime();
    $jobdata = serialize($job);
    $fields = [
      'status' => $job
        ->getStatus(),
      'job' => $jobdata,
      'updated' => $now,
      'tag' => $job
        ->getTag(),
    ];
    $this->connection
      ->merge('apigee_edge_job')
      ->key('id', $job
      ->getId())
      ->insertFields([
      'id' => $job
        ->getId(),
      'created' => $now,
    ] + $fields)
      ->updateFields($fields)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public function load(string $id) : ?Job {
    $query = $this->connection
      ->select('apigee_edge_job', 'j')
      ->fields('j', [
      'job',
    ]);
    $query
      ->condition('id', $id);
    $jobdata = $query
      ->execute()
      ->fetchField();
    return $jobdata ? unserialize($jobdata) : NULL;
  }

  /**
   * {@inheritdoc}
   */
  public function select(?string $tag = NULL) : ?Job {

    // TODO handle race conditions.
    $query = $this->connection
      ->select('apigee_edge_job', 'j')
      ->fields('j', [
      'job',
    ])
      ->orderBy('updated')
      ->range(0, 1);
    $query
      ->condition('status', [
      Job::IDLE,
      Job::RESCHEDULED,
    ], 'IN');
    if ($tag !== NULL) {
      $query
        ->condition('tag', $tag);
    }
    $jobdata = $query
      ->execute()
      ->fetchField();
    if ($jobdata) {

      /** @var \Drupal\apigee_edge\Job\Job $job */
      $job = unserialize($jobdata);
      $this
        ->ensure($job, Job::SELECTED);
      return $job;
    }
    return NULL;
  }

  /**
   * {@inheritdoc}
   */
  public function call(Job $job, bool $update = TRUE) {
    $this
      ->ensure($job, Job::RUNNING);
    try {
      $result = $job
        ->execute();
      $job
        ->setStatus($result ? Job::IDLE : Job::FINISHED);
    } catch (\Exception $ex) {
      watchdog_exception('apigee_edge_job', $ex);
      $job
        ->recordException($ex);
      $job
        ->setStatus($job
        ->shouldRetry($ex) && $job
        ->consumeRetry() ? Job::RESCHEDULED : Job::FAILED);
    } finally {
      if ($update) {
        $this
          ->save($job);
      }
    }
  }

  /**
   * {@inheritdoc}
   */
  public function cast(Job $job) {
    $this
      ->save($job);
    $this->queue
      ->createItem([
      'tag' => $job
        ->getTag(),
    ]);
  }

  /**
   * {@inheritdoc}
   */
  public function countJobs(?string $tag = NULL, ?array $statuses = NULL) : int {
    $query = $this->connection
      ->select('apigee_edge_job', 'j');
    if ($tag !== NULL) {
      $query
        ->condition('tag', $tag);
    }
    if ($statuses !== NULL) {
      $query
        ->condition('status', $statuses, 'IN');
    }
    return (int) $query
      ->countQuery()
      ->execute()
      ->fetchField();
  }

}

Members

Namesort descending Modifiers Type Description Overrides
JobExecutor::$connection protected property Database connection.
JobExecutor::$queue protected property The 'apigee_edge_job' queue.
JobExecutor::$time protected property Time service.
JobExecutor::call public function Executes a job synchronously. Overrides JobExecutorInterface::call
JobExecutor::cast public function Executes a job asynchronously. Overrides JobExecutorInterface::cast
JobExecutor::countJobs public function Counts jobs in the queue. Overrides JobExecutorInterface::countJobs
JobExecutor::ensure protected function Ensures that a job exists with a given status.
JobExecutor::load public function Loads a job from the database. Overrides JobExecutorInterface::load
JobExecutor::save public function Saves a job. Overrides JobExecutorInterface::save
JobExecutor::select public function Claims a job if one is available. Overrides JobExecutorInterface::select
JobExecutor::__construct public function JobExecutor constructor.