You are here

class ReliablePredis in Redis 8

Redis queue implementation using Predis library backend.

Hierarchy

Expanded class hierarchy of ReliablePredis

File

src/Queue/ReliablePredis.php, line 10

Namespace

Drupal\redis\Queue
View source
class ReliablePredis extends ReliableQueueBase {

  /**
   * The Redis connection.
   *
   * @var \Predis\Client $client
   */
  protected $client;

  /**
   * Constructs a \Drupal\redis\Queue\Predis object.
   *
   * @param string $name
   *   The name of the queue.
   * @param array $settings
   *   Array of Redis-related settings for this queue.
   * @param \Predis\Client $client
   *   The Predis client.
   */
  public function __construct($name, array $settings, \Predis\Client $client) {
    parent::__construct($name, $settings);
    $this->client = $client;
  }

  /**
   * {@inheritdoc}
   */
  public function createItem($data) {
    $record = new \stdClass();
    $record->data = $data;
    $record->qid = $this
      ->incrementId();

    // We cannot rely on REQUEST_TIME because many items might be created
    // by a single request which takes longer than 1 second.
    $record->timestamp = time();
    $pipe = $this->client
      ->pipeline();
    $pipe
      ->hsetnx($this->availableItems, $record->qid, serialize($record));
    $pipe
      ->lLen($this->availableListKey);
    $pipe
      ->lpush($this->availableListKey, $record->qid);
    $result = $pipe
      ->execute();
    $success = $result[0] && $result[2] > $result[1];
    return $success ? $record->qid : FALSE;
  }

  /**
   * Gets next serial ID for Redis queue items.
   *
   * @return int
   *   Next serial ID for Redis queue item.
   */
  protected function incrementId() {
    return $this->client
      ->incr($this->incrementCounterKey);
  }

  /**
   * {@inheritdoc}
   */
  public function numberOfItems() {
    return $this->client
      ->lLen($this->availableListKey) + $this->client
      ->lLen($this->claimedListKey);
  }

  /**
   * {@inheritdoc}
   */
  public function claimItem($lease_time = 30) {

    // Is it OK to do garbage collection here (we need to loop list of claimed
    // items)?
    $this
      ->garbageCollection();
    $item = FALSE;
    if ($this->reserveTimeout !== NULL) {

      // A blocking version of claimItem to be used with long-running queue workers.
      $qid = $this->client
        ->brpoplpush($this->availableListKey, $this->claimedListKey, $this->reserveTimeout);
    }
    else {
      $qid = $this->client
        ->rpoplpush($this->availableListKey, $this->claimedListKey);
    }
    if ($qid) {
      $job = $this->client
        ->hget($this->availableItems, $qid);
      if ($job) {
        $item = unserialize($job);
        $this->client
          ->setex($this->leasedKeyPrefix . $item->qid, $lease_time, '1');
      }
    }
    return $item;
  }

  /**
   * {@inheritdoc}
   */
  public function releaseItem($item) {
    $this->client
      ->pipeline()
      ->lrem($this->claimedListKey, -1, $item->qid)
      ->lpush($this->availableListKey, $item->qid)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public function deleteItem($item) {
    $this->client
      ->pipeline()
      ->lrem($this->claimedListKey, -1, $item->qid)
      ->lrem($this->availableListKey, -1, $item->qid)
      ->hdel($this->availableItems, $item->qid)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public function deleteQueue() {

    // TODO: Fixme
    $keys_to_remove = [
      $this->claimedListKey,
      $this->availableListKey,
      $this->availableItems,
      $this->incrementCounterKey,
    ];
    foreach ($this->client
      ->keys($this->leasedKeyPrefix . '*') as $key) {
      $keys_to_remove[] = $key;
    }
    $this->client
      ->del($keys_to_remove);
  }

  /**
   * Automatically release items, that have been claimed and exceeded lease time.
   */
  protected function garbageCollection() {
    foreach ($this->client
      ->lrange($this->claimedListKey, 0, -1) as $qid) {
      if (!$this->client
        ->exists($this->leasedKeyPrefix . $qid)) {

        // The lease expired for this ID.
        $this->client
          ->lrem($this->claimedListKey, -1, $qid);
        $this->client
          ->lpush($this->availableListKey, $qid);
      }
    }
  }

}

Members

Namesort descending Modifiers Type Description Overrides
QueueBase::$availableItems protected property Key for hash table of available queue items.
QueueBase::$availableListKey protected property Key for list of available items.
QueueBase::$claimedListKey protected property Key for list of claimed items.
QueueBase::$incrementCounterKey protected property Key of increment counter key.
QueueBase::$leasedKeyPrefix protected property Key prefix for items that are used to track expiration of leased items.
QueueBase::$name protected property The name of the queue this instance is working with.
QueueBase::$reserveTimeout protected property Reserve timeout for blocking item claim.
QueueBase::createQueue public function Creates a queue. Overrides QueueInterface::createQueue
QueueBase::KEY_PREFIX constant Prefix used with all keys.
ReliablePredis::$client protected property The Redis connection.
ReliablePredis::claimItem public function Claims an item in the queue for processing. Overrides QueueInterface::claimItem
ReliablePredis::createItem public function Adds a queue item and store it directly to the queue. Overrides QueueInterface::createItem
ReliablePredis::deleteItem public function Deletes a finished item from the queue. Overrides QueueInterface::deleteItem
ReliablePredis::deleteQueue public function Deletes a queue and every item in the queue. Overrides QueueInterface::deleteQueue
ReliablePredis::garbageCollection protected function Automatically release items, that have been claimed and exceeded lease time.
ReliablePredis::incrementId protected function Gets next serial ID for Redis queue items.
ReliablePredis::numberOfItems public function Retrieves the number of items in the queue. Overrides QueueInterface::numberOfItems
ReliablePredis::releaseItem public function Releases an item that the worker could not process. Overrides QueueInterface::releaseItem
ReliablePredis::__construct public function Constructs a \Drupal\redis\Queue\Predis object. Overrides QueueBase::__construct