public function Database::claimJob in Advanced Queue 8
Claims the next available job for processing.
Return value
\Drupal\advancedqueue\Job|null The job, or NULL if none available.
Overrides BackendInterface::claimJob
File
- src/
Plugin/ AdvancedQueue/ Backend/ Database.php, line 165
Class
- Database
- Provides the database queue backend.
Namespace
Drupal\advancedqueue\Plugin\AdvancedQueue\BackendCode
public function claimJob() {
// Claim a job by updating its expire fields. If the claim is not successful
// another thread may have claimed the job in the meantime. Therefore loop
// until a job is successfully claimed or we are reasonably sure there
// are no unclaimed jobs left.
while (TRUE) {
$query = 'SELECT * FROM {advancedqueue}
WHERE queue_id = :queue_id AND state = :state AND available <= :now AND expires = 0
ORDER BY available, job_id ASC';
$params = [
':queue_id' => $this->queueId,
':state' => Job::STATE_QUEUED,
':now' => $this->time
->getCurrentTime(),
];
$job_definition = $this->connection
->queryRange($query, 0, 1, $params)
->fetchAssoc();
if (!$job_definition) {
// No jobs left to claim.
return NULL;
}
// Try to update the item. Only one thread can succeed in updating the
// same row. We cannot rely on the request time because items might be
// claimed by a single consumer which runs longer than 1 second. If we
// continue to use request time instead of current time, we steal
// time from the lease, and will tend to reset items before the lease
// should really expire.
$state = Job::STATE_PROCESSING;
$expires = $this->time
->getCurrentTime() + $this->configuration['lease_time'];
$update = $this->connection
->update('advancedqueue')
->fields([
'state' => $state,
'expires' => $expires,
])
->condition('job_id', $job_definition['job_id'])
->condition('expires', 0);
// If there are affected rows, the claim succeeded.
if ($update
->execute()) {
$job_definition['id'] = $job_definition['job_id'];
unset($job_definition['job_id']);
$job_definition['payload'] = json_decode($job_definition['payload'], TRUE);
$job_definition['state'] = $state;
$job_definition['expires'] = $expires;
return new Job($job_definition);
}
}
}