View source
<?php
namespace Drupal\redis\Queue;
class ReliablePredis extends ReliableQueueBase {
protected $client;
public function __construct($name, array $settings, \Predis\Client $client) {
parent::__construct($name, $settings);
$this->client = $client;
}
public function createItem($data) {
$record = new \stdClass();
$record->data = $data;
$record->qid = $this
->incrementId();
$record->timestamp = time();
$pipe = $this->client
->pipeline();
$pipe
->hsetnx($this->availableItems, $record->qid, serialize($record));
$pipe
->lLen($this->availableListKey);
$pipe
->lpush($this->availableListKey, $record->qid);
$result = $pipe
->execute();
$success = $result[0] && $result[2] > $result[1];
return $success ? $record->qid : FALSE;
}
protected function incrementId() {
return $this->client
->incr($this->incrementCounterKey);
}
public function numberOfItems() {
return $this->client
->lLen($this->availableListKey) + $this->client
->lLen($this->claimedListKey);
}
public function claimItem($lease_time = 30) {
$this
->garbageCollection();
$item = FALSE;
if ($this->reserveTimeout !== NULL) {
$qid = $this->client
->brpoplpush($this->availableListKey, $this->claimedListKey, $this->reserveTimeout);
}
else {
$qid = $this->client
->rpoplpush($this->availableListKey, $this->claimedListKey);
}
if ($qid) {
$job = $this->client
->hget($this->availableItems, $qid);
if ($job) {
$item = unserialize($job);
$this->client
->setex($this->leasedKeyPrefix . $item->qid, $lease_time, '1');
}
}
return $item;
}
public function releaseItem($item) {
$this->client
->pipeline()
->lrem($this->claimedListKey, -1, $item->qid)
->lpush($this->availableListKey, $item->qid)
->execute();
}
public function deleteItem($item) {
$this->client
->pipeline()
->lrem($this->claimedListKey, -1, $item->qid)
->lrem($this->availableListKey, -1, $item->qid)
->hdel($this->availableItems, $item->qid)
->execute();
}
public function deleteQueue() {
$keys_to_remove = [
$this->claimedListKey,
$this->availableListKey,
$this->availableItems,
$this->incrementCounterKey,
];
foreach ($this->client
->keys($this->leasedKeyPrefix . '*') as $key) {
$keys_to_remove[] = $key;
}
$this->client
->del($keys_to_remove);
}
protected function garbageCollection() {
foreach ($this->client
->lrange($this->claimedListKey, 0, -1) as $qid) {
if (!$this->client
->exists($this->leasedKeyPrefix . $qid)) {
$this->client
->lrem($this->claimedListKey, -1, $qid);
$this->client
->lpush($this->availableListKey, $qid);
}
}
}
}