View source
<?php
namespace Drupal\apigee_edge;
use Drupal\apigee_edge\Job\Job;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\Queue\QueueFactory;
class JobExecutor implements JobExecutorInterface {
protected $connection;
protected $time;
protected $queue;
public function __construct(Connection $connection, TimeInterface $time, QueueFactory $queue_factory) {
$this->connection = $connection;
$this->time = $time;
$this->queue = $queue_factory
->get('apigee_edge_job');
}
protected function ensure(Job $job, int $status) {
if ($job
->getStatus() !== $status) {
$job
->setStatus($status);
$this
->save($job);
}
}
public function save(Job $job) {
$now = $this->time
->getCurrentTime();
$jobdata = serialize($job);
$fields = [
'status' => $job
->getStatus(),
'job' => $jobdata,
'updated' => $now,
'tag' => $job
->getTag(),
];
$this->connection
->merge('apigee_edge_job')
->key('id', $job
->getId())
->insertFields([
'id' => $job
->getId(),
'created' => $now,
] + $fields)
->updateFields($fields)
->execute();
}
public function load(string $id) : ?Job {
$query = $this->connection
->select('apigee_edge_job', 'j')
->fields('j', [
'job',
]);
$query
->condition('id', $id);
$jobdata = $query
->execute()
->fetchField();
return $jobdata ? unserialize($jobdata) : NULL;
}
public function select(?string $tag = NULL) : ?Job {
$query = $this->connection
->select('apigee_edge_job', 'j')
->fields('j', [
'job',
])
->orderBy('updated')
->range(0, 1);
$query
->condition('status', [
Job::IDLE,
Job::RESCHEDULED,
], 'IN');
if ($tag !== NULL) {
$query
->condition('tag', $tag);
}
$jobdata = $query
->execute()
->fetchField();
if ($jobdata) {
$job = unserialize($jobdata);
$this
->ensure($job, Job::SELECTED);
return $job;
}
return NULL;
}
public function call(Job $job, bool $update = TRUE) {
$this
->ensure($job, Job::RUNNING);
try {
$result = $job
->execute();
$job
->setStatus($result ? Job::IDLE : Job::FINISHED);
} catch (\Exception $ex) {
watchdog_exception('apigee_edge_job', $ex);
$job
->recordException($ex);
$job
->setStatus($job
->shouldRetry($ex) && $job
->consumeRetry() ? Job::RESCHEDULED : Job::FAILED);
} finally {
if ($update) {
$this
->save($job);
}
}
}
public function cast(Job $job) {
$this
->save($job);
$this->queue
->createItem([
'tag' => $job
->getTag(),
]);
}
public function countJobs(?string $tag = NULL, ?array $statuses = NULL) : int {
$query = $this->connection
->select('apigee_edge_job', 'j');
if ($tag !== NULL) {
$query
->condition('tag', $tag);
}
if ($statuses !== NULL) {
$query
->condition('status', $statuses, 'IN');
}
return (int) $query
->countQuery()
->execute()
->fetchField();
}
}