You are here

class Redis_Queue_PhpRedis in Redis 7.2

Same name and namespace in other branches
  1. 7.3 lib/Redis/Queue/PhpRedis.php \Redis_Queue_PhpRedis

@todo Set high expire value to the hash for rotation when memory is empty React upon cache clear all and rebuild path list?

Hierarchy

Expanded class hierarchy of Redis_Queue_PhpRedis

File

lib/Redis/Queue/PhpRedis.php, line 8

View source
class Redis_Queue_PhpRedis extends Redis_Queue_Base {
  public function createItem($data) {
    $client = $this
      ->getClient();
    $dKey = $this
      ->getKeyForData();
    $qKey = $this
      ->getKeyForQueue();

    // Identifier does not not need to be in the transaction,
    // in case of any error we'll just skip a value in the sequence.
    $id = $client
      ->hincrby($dKey, self::QUEUE_HKEY_SEQ, 1);
    $record = new stdClass();
    $record->qid = $id;
    $record->data = $data;
    $record->timestamp = time();
    $pipe = $client
      ->multi(Redis::PIPELINE);

    // Thanks to the redis_queue standalone module maintainers for
    // this piece of code, very effective. Note that we added the
    // pipeline thought.
    $pipe
      ->hsetnx($dKey, $id, serialize($record));
    $pipe
      ->llen($qKey);
    $pipe
      ->lpush($qKey, $id);
    $ret = $pipe
      ->exec();
    if (!($success = $ret[0] && $ret[1] < $ret[2])) {
      if ($ret[0]) {

        // HSETNEX worked but not the PUSH command we therefore
        // need to drop the inserted data. I would have prefered
        // a DISCARD instead but we are in pipelined transaction
        // we cannot actually do a DISCARD here.
        $client
          ->hdel($dKey, $id);
      }
    }
    return $success;
  }
  public function numberOfItems() {
    return $this
      ->getClient()
      ->llen($this
      ->getKeyForQueue());
  }
  public function claimItem($lease_time = 30) {

    // @todo Deal with lease
    $client = $this
      ->getClient();
    $id = $client
      ->rpoplpush($this
      ->getKeyForQueue(), $this
      ->getKeyForClaimed());
    if ($id) {
      if ($item = $client
        ->hget($this
        ->getKeyForData(), $id)) {
        if ($item = unserialize($item)) {
          return $item;
        }
      }
    }
    return false;
  }
  public function deleteItem($item) {
    $pipe = $this
      ->getClient()
      ->multi(Redis::PIPELINE);
    $pipe
      ->lrem($this
      ->getKeyForQueue(), $item->qid);
    $pipe
      ->lrem($this
      ->getKeyForClaimed(), $item->qid);
    $pipe
      ->hdel($this
      ->getKeyForData(), $item->qid);
    $pipe
      ->exec();
  }
  public function releaseItem($item) {
    $pipe = $this
      ->getClient()
      ->multi(Redis::PIPELINE);
    $pipe
      ->lrem($this
      ->getKeyForClaimed(), $item->qid, -1);
    $pipe
      ->lpush($this
      ->getKeyForQueue(), $item->qid);
    $ret = $pipe
      ->exec();
    return $ret[0] && $ret[1];
  }
  public function createQueue() {
  }
  public function deleteQueue() {
    $this
      ->getClient()
      ->del($this
      ->getKeyForQueue(), $this
      ->getKeyForClaimed(), $this
      ->getKeyForData());
  }

}

Members

Namesort descending Modifiers Type Description Overrides
Redis_AbstractBackend::$globalPrefix protected static property
Redis_AbstractBackend::$prefix private property
Redis_AbstractBackend::getClient public function Get redis client
Redis_AbstractBackend::getDefaultPrefix public static function Get global default prefix
Redis_AbstractBackend::getGlobalPrefix public static function Get site default global prefix
Redis_AbstractBackend::getKey public function Get full key name using the set prefix 2
Redis_AbstractBackend::getPrefix final public function Get prefix
Redis_AbstractBackend::KEY_SEPARATOR constant Key components name separator
Redis_AbstractBackend::setPrefix final public function Set prefix
Redis_Queue_Base::$name protected property Queue name
Redis_Queue_Base::getKeyForClaimed public function Get claimed LIST key
Redis_Queue_Base::getKeyForData public function Get data HASH key
Redis_Queue_Base::getKeyForQueue public function Get queued items LIST key
Redis_Queue_Base::QUEUE_HKEY_SEQ constant Data HASH sequence key name.
Redis_Queue_Base::QUEUE_KEY_PREFIX constant Key prefix for queue data.
Redis_Queue_Base::__construct public function Default contructor Overrides Redis_AbstractBackend::__construct
Redis_Queue_PhpRedis::claimItem public function Claim an item in the queue for processing. Overrides DrupalQueueInterface::claimItem
Redis_Queue_PhpRedis::createItem public function Add a queue item and store it directly to the queue. Overrides DrupalQueueInterface::createItem
Redis_Queue_PhpRedis::createQueue public function Create a queue. Overrides DrupalQueueInterface::createQueue
Redis_Queue_PhpRedis::deleteItem public function Delete a finished item from the queue. Overrides DrupalQueueInterface::deleteItem
Redis_Queue_PhpRedis::deleteQueue public function Delete a queue and every item in the queue. Overrides DrupalQueueInterface::deleteQueue
Redis_Queue_PhpRedis::numberOfItems public function Retrieve the number of items in the queue. Overrides DrupalQueueInterface::numberOfItems
Redis_Queue_PhpRedis::releaseItem public function Release an item that the worker could not process, so another worker can come in and process it before the timeout expires. Overrides DrupalQueueInterface::releaseItem