class Predis in Redis 8
Same name in this branch
- 8 src/Lock/Predis.php \Drupal\redis\Lock\Predis
- 8 src/Flood/Predis.php \Drupal\redis\Flood\Predis
- 8 src/Queue/Predis.php \Drupal\redis\Queue\Predis
- 8 src/Client/Predis.php \Drupal\redis\Client\Predis
- 8 src/Cache/Predis.php \Drupal\redis\Cache\Predis
- 8 src/PersistentLock/Predis.php \Drupal\redis\PersistentLock\Predis
Redis queue implementation using Predis library backend.
Hierarchy
- class \Drupal\redis\Queue\QueueBase implements QueueInterface
- class \Drupal\redis\Queue\Predis
Expanded class hierarchy of Predis
3 string references to 'Predis'
- ClientFactory::getClientInterface in src/
ClientFactory.php - Lazy instantiates client proxy depending on the actual configuration.
- Predis::getName in src/
Client/ Predis.php - Get underlying library name used.
- RedisCacheTagsChecksum::doInvalidateTags in src/
Cache/ RedisCacheTagsChecksum.php - Marks cache items with any of the specified tags as invalid.
File
- src/
Queue/ Predis.php, line 10
Namespace
Drupal\redis\QueueView source
class Predis extends QueueBase {
/**
* The Redis connection.
*
* @var \Predis\Client $client
*/
protected $client;
/**
* Constructs a \Drupal\redis\Queue\Predis object.
*
* @param string $name
* The name of the queue.
* @param array $settings
* Array of Redis-related settings for this queue.
* @param \Predis\Client $client
* The Predis client.
*/
public function __construct($name, array $settings, \Predis\Client $client) {
parent::__construct($name, $settings);
$this->client = $client;
}
/**
* {@inheritdoc}
*/
public function createItem($data) {
// TODO: Fixme
$record = new \stdClass();
$record->data = $data;
$record->qid = $this
->incrementId();
// We cannot rely on REQUEST_TIME because many items might be created
// by a single request which takes longer than 1 second.
$record->timestamp = time();
if (!$this->client
->hsetnx($this->availableItems, $record->qid, serialize($record))) {
return FALSE;
}
$start_len = $this->client
->lLen($this->availableListKey);
if ($start_len < $this->client
->lpush($this->availableListKey, $record->qid)) {
return $record->qid;
}
}
/**
* Gets next serial ID for Redis queue items.
*
* @return int
* Next serial ID for Redis queue item.
*/
protected function incrementId() {
return $this->client
->incr($this->incrementCounterKey);
}
/**
* {@inheritdoc}
*/
public function numberOfItems() {
return $this->client
->lLen($this->availableListKey) + $this->client
->lLen($this->claimedListKey);
}
/**
* {@inheritdoc}
*/
public function claimItem($lease_time = 30) {
// Is it OK to do garbage collection here (we need to loop list of claimed
// items)?
$this
->garbageCollection();
$item = FALSE;
if ($this->reserveTimeout !== NULL) {
// A blocking version of claimItem to be used with long-running queue workers.
$qid = $this->client
->brpoplpush($this->availableListKey, $this->claimedListKey, $this->reserveTimeout);
}
else {
$qid = $this->client
->rpoplpush($this->availableListKey, $this->claimedListKey);
}
if ($qid) {
$job = $this->client
->hget($this->availableItems, $qid);
if ($job) {
$item = unserialize($job);
$this->client
->setex($this->leasedKeyPrefix . $item->qid, $lease_time, '1');
}
}
return $item;
}
/**
* {@inheritdoc}
*/
public function releaseItem($item) {
$this->client
->lrem($this->claimedListKey, -1, $item->qid);
$this->client
->lpush($this->availableListKey, $item->qid);
}
/**
* {@inheritdoc}
*/
public function deleteItem($item) {
$this->client
->lrem($this->claimedListKey, -1, $item->qid);
$this->client
->lrem($this->availableListKey, -1, $item->qid);
$this->client
->hdel($this->availableItems, $item->qid);
}
/**
* {@inheritdoc}
*/
public function deleteQueue() {
// TODO: Fixme
$keys_to_remove = [
$this->claimedListKey,
$this->availableListKey,
$this->availableItems,
$this->incrementCounterKey,
];
foreach ($this->client
->keys($this->leasedKeyPrefix . '*') as $key) {
$keys_to_remove[] = $key;
}
$this->client
->del($keys_to_remove);
}
/**
* Automatically release items, that have been claimed and exceeded lease time.
*/
protected function garbageCollection() {
foreach ($this->client
->lrange($this->claimedListKey, 0, -1) as $qid) {
if (!$this->client
->exists($this->leasedKeyPrefix . $qid)) {
// The lease expired for this ID.
$this->client
->lrem($this->claimedListKey, $qid, -1);
$this->client
->lpush($this->availableListKey, $qid);
}
}
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
Predis:: |
protected | property | The Redis connection. | |
Predis:: |
public | function |
Claims an item in the queue for processing. Overrides QueueInterface:: |
|
Predis:: |
public | function |
Adds a queue item and store it directly to the queue. Overrides QueueInterface:: |
|
Predis:: |
public | function |
Deletes a finished item from the queue. Overrides QueueInterface:: |
|
Predis:: |
public | function |
Deletes a queue and every item in the queue. Overrides QueueInterface:: |
|
Predis:: |
protected | function | Automatically release items, that have been claimed and exceeded lease time. | |
Predis:: |
protected | function | Gets next serial ID for Redis queue items. | |
Predis:: |
public | function |
Retrieves the number of items in the queue. Overrides QueueInterface:: |
|
Predis:: |
public | function |
Releases an item that the worker could not process. Overrides QueueInterface:: |
|
Predis:: |
public | function |
Constructs a \Drupal\redis\Queue\Predis object. Overrides QueueBase:: |
|
QueueBase:: |
protected | property | Key for hash table of available queue items. | |
QueueBase:: |
protected | property | Key for list of available items. | |
QueueBase:: |
protected | property | Key for list of claimed items. | |
QueueBase:: |
protected | property | Key of increment counter key. | |
QueueBase:: |
protected | property | Key prefix for items that are used to track expiration of leased items. | |
QueueBase:: |
protected | property | The name of the queue this instance is working with. | |
QueueBase:: |
protected | property | Reserve timeout for blocking item claim. | |
QueueBase:: |
public | function |
Creates a queue. Overrides QueueInterface:: |
|
QueueBase:: |
constant | Prefix used with all keys. |