You are here

public function Database::claimJob in Advanced Queue 8

Claims the next available job for processing.

Return value

\Drupal\advancedqueue\Job|null The job, or NULL if none available.

Overrides BackendInterface::claimJob

File

src/Plugin/AdvancedQueue/Backend/Database.php, line 165

Class

Database
Provides the database queue backend.

Namespace

Drupal\advancedqueue\Plugin\AdvancedQueue\Backend

Code

public function claimJob() {

  // Claim a job by updating its expire fields. If the claim is not successful
  // another thread may have claimed the job in the meantime. Therefore loop
  // until a job is successfully claimed or we are reasonably sure there
  // are no unclaimed jobs left.
  while (TRUE) {
    $query = 'SELECT * FROM {advancedqueue}
        WHERE queue_id = :queue_id AND state = :state AND available <= :now AND expires = 0
        ORDER BY available, job_id ASC';
    $params = [
      ':queue_id' => $this->queueId,
      ':state' => Job::STATE_QUEUED,
      ':now' => $this->time
        ->getCurrentTime(),
    ];
    $job_definition = $this->connection
      ->queryRange($query, 0, 1, $params)
      ->fetchAssoc();
    if (!$job_definition) {

      // No jobs left to claim.
      return NULL;
    }

    // Try to update the item. Only one thread can succeed in updating the
    // same row. We cannot rely on the request time because items might be
    // claimed by a single consumer which runs longer than 1 second. If we
    // continue to use request time instead of current time, we steal
    // time from the lease, and will tend to reset items before the lease
    // should really expire.
    $state = Job::STATE_PROCESSING;
    $expires = $this->time
      ->getCurrentTime() + $this->configuration['lease_time'];
    $update = $this->connection
      ->update('advancedqueue')
      ->fields([
      'state' => $state,
      'expires' => $expires,
    ])
      ->condition('job_id', $job_definition['job_id'])
      ->condition('expires', 0);

    // If there are affected rows, the claim succeeded.
    if ($update
      ->execute()) {
      $job_definition['id'] = $job_definition['job_id'];
      unset($job_definition['job_id']);
      $job_definition['payload'] = json_decode($job_definition['payload'], TRUE);
      $job_definition['state'] = $state;
      $job_definition['expires'] = $expires;
      return new Job($job_definition);
    }
  }
}