You are here

class Predis in Redis 8

Same name in this branch
  1. 8 src/Lock/Predis.php \Drupal\redis\Lock\Predis
  2. 8 src/Flood/Predis.php \Drupal\redis\Flood\Predis
  3. 8 src/Queue/Predis.php \Drupal\redis\Queue\Predis
  4. 8 src/Client/Predis.php \Drupal\redis\Client\Predis
  5. 8 src/Cache/Predis.php \Drupal\redis\Cache\Predis
  6. 8 src/PersistentLock/Predis.php \Drupal\redis\PersistentLock\Predis

Redis queue implementation using Predis library backend.

Hierarchy

Expanded class hierarchy of Predis

3 string references to 'Predis'
ClientFactory::getClientInterface in src/ClientFactory.php
Lazy instantiates client proxy depending on the actual configuration.
Predis::getName in src/Client/Predis.php
Get underlying library name used.
RedisCacheTagsChecksum::doInvalidateTags in src/Cache/RedisCacheTagsChecksum.php
Marks cache items with any of the specified tags as invalid.

File

src/Queue/Predis.php, line 10

Namespace

Drupal\redis\Queue
View source
class Predis extends QueueBase {

  /**
   * The Redis connection.
   *
   * @var \Predis\Client $client
   */
  protected $client;

  /**
   * Constructs a \Drupal\redis\Queue\Predis object.
   *
   * @param string $name
   *   The name of the queue.
   * @param array $settings
   *   Array of Redis-related settings for this queue.
   * @param \Predis\Client $client
   *   The Predis client.
   */
  public function __construct($name, array $settings, \Predis\Client $client) {
    parent::__construct($name, $settings);
    $this->client = $client;
  }

  /**
   * {@inheritdoc}
   */
  public function createItem($data) {

    // TODO: Fixme
    $record = new \stdClass();
    $record->data = $data;
    $record->qid = $this
      ->incrementId();

    // We cannot rely on REQUEST_TIME because many items might be created
    // by a single request which takes longer than 1 second.
    $record->timestamp = time();
    if (!$this->client
      ->hsetnx($this->availableItems, $record->qid, serialize($record))) {
      return FALSE;
    }
    $start_len = $this->client
      ->lLen($this->availableListKey);
    if ($start_len < $this->client
      ->lpush($this->availableListKey, $record->qid)) {
      return $record->qid;
    }
  }

  /**
   * Gets next serial ID for Redis queue items.
   *
   * @return int
   *   Next serial ID for Redis queue item.
   */
  protected function incrementId() {
    return $this->client
      ->incr($this->incrementCounterKey);
  }

  /**
   * {@inheritdoc}
   */
  public function numberOfItems() {
    return $this->client
      ->lLen($this->availableListKey) + $this->client
      ->lLen($this->claimedListKey);
  }

  /**
   * {@inheritdoc}
   */
  public function claimItem($lease_time = 30) {

    // Is it OK to do garbage collection here (we need to loop list of claimed
    // items)?
    $this
      ->garbageCollection();
    $item = FALSE;
    if ($this->reserveTimeout !== NULL) {

      // A blocking version of claimItem to be used with long-running queue workers.
      $qid = $this->client
        ->brpoplpush($this->availableListKey, $this->claimedListKey, $this->reserveTimeout);
    }
    else {
      $qid = $this->client
        ->rpoplpush($this->availableListKey, $this->claimedListKey);
    }
    if ($qid) {
      $job = $this->client
        ->hget($this->availableItems, $qid);
      if ($job) {
        $item = unserialize($job);
        $this->client
          ->setex($this->leasedKeyPrefix . $item->qid, $lease_time, '1');
      }
    }
    return $item;
  }

  /**
   * {@inheritdoc}
   */
  public function releaseItem($item) {
    $this->client
      ->lrem($this->claimedListKey, -1, $item->qid);
    $this->client
      ->lpush($this->availableListKey, $item->qid);
  }

  /**
   * {@inheritdoc}
   */
  public function deleteItem($item) {
    $this->client
      ->lrem($this->claimedListKey, -1, $item->qid);
    $this->client
      ->lrem($this->availableListKey, -1, $item->qid);
    $this->client
      ->hdel($this->availableItems, $item->qid);
  }

  /**
   * {@inheritdoc}
   */
  public function deleteQueue() {

    // TODO: Fixme
    $keys_to_remove = [
      $this->claimedListKey,
      $this->availableListKey,
      $this->availableItems,
      $this->incrementCounterKey,
    ];
    foreach ($this->client
      ->keys($this->leasedKeyPrefix . '*') as $key) {
      $keys_to_remove[] = $key;
    }
    $this->client
      ->del($keys_to_remove);
  }

  /**
   * Automatically release items, that have been claimed and exceeded lease time.
   */
  protected function garbageCollection() {
    foreach ($this->client
      ->lrange($this->claimedListKey, 0, -1) as $qid) {
      if (!$this->client
        ->exists($this->leasedKeyPrefix . $qid)) {

        // The lease expired for this ID.
        $this->client
          ->lrem($this->claimedListKey, $qid, -1);
        $this->client
          ->lpush($this->availableListKey, $qid);
      }
    }
  }

}

Members

Namesort descending Modifiers Type Description Overrides
Predis::$client protected property The Redis connection.
Predis::claimItem public function Claims an item in the queue for processing. Overrides QueueInterface::claimItem
Predis::createItem public function Adds a queue item and store it directly to the queue. Overrides QueueInterface::createItem
Predis::deleteItem public function Deletes a finished item from the queue. Overrides QueueInterface::deleteItem
Predis::deleteQueue public function Deletes a queue and every item in the queue. Overrides QueueInterface::deleteQueue
Predis::garbageCollection protected function Automatically release items, that have been claimed and exceeded lease time.
Predis::incrementId protected function Gets next serial ID for Redis queue items.
Predis::numberOfItems public function Retrieves the number of items in the queue. Overrides QueueInterface::numberOfItems
Predis::releaseItem public function Releases an item that the worker could not process. Overrides QueueInterface::releaseItem
Predis::__construct public function Constructs a \Drupal\redis\Queue\Predis object. Overrides QueueBase::__construct
QueueBase::$availableItems protected property Key for hash table of available queue items.
QueueBase::$availableListKey protected property Key for list of available items.
QueueBase::$claimedListKey protected property Key for list of claimed items.
QueueBase::$incrementCounterKey protected property Key of increment counter key.
QueueBase::$leasedKeyPrefix protected property Key prefix for items that are used to track expiration of leased items.
QueueBase::$name protected property The name of the queue this instance is working with.
QueueBase::$reserveTimeout protected property Reserve timeout for blocking item claim.
QueueBase::createQueue public function Creates a queue. Overrides QueueInterface::createQueue
QueueBase::KEY_PREFIX constant Prefix used with all keys.