You are here

public function SystemSetQueue::claimItem in Queue Unique 7

Same name and namespace in other branches
  1. 7.2 src/Queue/SystemSetQueue.php \Drupal\queue_unique\Queue\SystemSetQueue::claimItem()

File

src/Queue/SystemSetQueue.php, line 60
Contains \Drupal\queue_unique\Queue\SystemSetQueue

Class

SystemSetQueue

Namespace

Drupal\queue_unique\Queue

Code

public function claimItem($lease_time = 30) {

  // Claim an item by updating its expire fields. If claim is not successful
  // another thread may have claimed the item in the meantime. Therefore loop
  // until an item is successfully claimed or we are reasonably sure there
  // are no unclaimed items left.
  while (TRUE) {
    $item = db_query_range('SELECT data, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(
      ':name' => $this->name,
    ))
      ->fetchObject();
    if ($item) {

      // Try to update the item. Only one thread can succeed in UPDATEing the
      // same row. We cannot rely on REQUEST_TIME because items might be
      // claimed by a single consumer which runs longer than 1 second. If we
      // continue to use REQUEST_TIME instead of the current time(), we steal
      // time from the lease, and will tend to reset items before the lease
      // should really expire.
      $update = db_update(static::TABLE_NAME)
        ->fields(array(
        'expire' => time() + $lease_time,
      ))
        ->condition('item_id', $item->item_id)
        ->condition('expire', 0);

      // If there are affected rows, this update succeeded.
      if ($update
        ->execute()) {
        $item->data = unserialize($item->data);
        return $item;
      }
    }
    else {

      // No items currently available to claim.
      return FALSE;
    }
  }
}