class MemoryLimitDatabaseQueue in Acquia Content Hub 8
Overridden queue implementation that evaluates memory usage.
Hierarchy
- class \Drupal\Core\Queue\DatabaseQueue implements QueueGarbageCollectionInterface, ReliableQueueInterface uses DependencySerializationTrait
- class \Drupal\acquia_contenthub\Queue\MemoryLimitDatabaseQueue
Expanded class hierarchy of MemoryLimitDatabaseQueue
File
- src/
Queue/ MemoryLimitDatabaseQueue.php, line 15
Namespace
Drupal\acquia_contenthub\QueueView source
class MemoryLimitDatabaseQueue extends DatabaseQueue {
/**
* The memory limit of the php runtime.
*
* @var int
*/
protected $memoryLimit;
/**
* Constructs \Drupal\acquia_contenthub\Queue\MemoryLimitDatabaseQueue.
*
* @param string $name
* The name of the queue.
* @param \Drupal\Core\Database\Connection $connection
* The Connection object containing the key-value tables.
*/
public function __construct($name, Connection $connection) {
// Record the memory limit in bytes.
$limit = trim(ini_get('memory_limit'));
if ($limit == '-1') {
$this->memoryLimit = PHP_INT_MAX;
}
else {
$this->memoryLimit = Bytes::toInt($limit);
}
parent::__construct($name, $connection);
}
/**
* Find the php memory limit.
*
* @return int
* The memory limit.
*/
protected function memoryLimit() {
return $this->memoryLimit;
}
/**
* Tries to reclaim memory.
*
* @return int
* The memory usage after reclaim.
*/
protected function attemptMemoryReclaim() {
// Entity storage can blow up with caches so clear them out.
$manager = \Drupal::entityTypeManager();
foreach ($manager
->getDefinitions() as $id => $definition) {
$manager
->getStorage($id)
->resetCache();
}
// @todo explore resetting the container.
// Run garbage collector to further reduce memory.
gc_collect_cycles();
return memory_get_usage();
}
/**
* {@inheritdoc}
*/
public function claimItem($lease_time = 30) {
// Allow for the queue to be paused via Drupal state. Useful when the queue
// is processed by multiple workers and distributed across machines, this
// central switch allow for a graceful pause of the entire processing.
$paused = \Drupal::state()
->get('acquia_contenthub.export_queue.paused');
if ($paused) {
Drush::output()
->writeln(dt("Acquia Content Hub export queue is currently paused."));
return FALSE;
}
// Claim an item by updating its expire fields. If claim is not successful
// another thread may have claimed the item in the meantime. Therefore loop
// until an item is successfully claimed or we are reasonably sure there
// are no unclaimed items left.
while ($this
->attemptMemoryReclaim() * 2 <= $this
->memoryLimit()) {
try {
$item = $this->connection
->queryRange('SELECT data, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, [
':name' => $this->name,
])
->fetchObject();
} catch (\Exception $e) {
$this
->catchException($e);
// If the table does not exist there are no items currently available to
// claim.
return FALSE;
}
if ($item) {
// Try to update the item. Only one thread can succeed in UPDATEing the
// same row. We cannot rely on REQUEST_TIME because items might be
// claimed by a single consumer which runs longer than 1 second. If we
// continue to use REQUEST_TIME instead of the current time(), we steal
// time from the lease, and will tend to reset items before the lease
// should really expire.
$update = $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => time() + $lease_time,
])
->condition('item_id', $item->item_id)
->condition('expire', 0);
// If there are affected rows, this update succeeded.
if ($update
->execute()) {
$item->data = unserialize($item->data);
return $item;
}
}
else {
// No items currently available to claim.
return FALSE;
}
}
if ($this
->numberOfItems()) {
drush_log(dt("The queue operation has run out of memory. There are @number items left in the queue. Restart the queue to continue processing.", [
'@number' => $this
->numberOfItems(),
]));
}
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
DatabaseQueue:: |
protected | property | The database connection. | |
DatabaseQueue:: |
protected | property | The name of the queue this instance is working with. | |
DatabaseQueue:: |
protected | function | Act on an exception when queue might be stale. | |
DatabaseQueue:: |
public | function |
Adds a queue item and store it directly to the queue. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function |
Creates a queue. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function |
Deletes a finished item from the queue. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function |
Deletes a queue and every item in the queue. Overrides QueueInterface:: |
|
DatabaseQueue:: |
protected | function | Adds a queue item and store it directly to the queue. | |
DatabaseQueue:: |
protected | function | Check if the table exists and create it if not. | |
DatabaseQueue:: |
public | function |
Cleans queues of garbage. Overrides QueueGarbageCollectionInterface:: |
|
DatabaseQueue:: |
public | function |
Retrieves the number of items in the queue. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function |
Releases an item that the worker could not process. Overrides QueueInterface:: |
|
DatabaseQueue:: |
public | function | Defines the schema for the queue table. | |
DatabaseQueue:: |
constant | The database table name. | ||
DependencySerializationTrait:: |
protected | property | An array of entity type IDs keyed by the property name of their storages. | |
DependencySerializationTrait:: |
protected | property | An array of service IDs keyed by property name used for serialization. | |
DependencySerializationTrait:: |
public | function | 1 | |
DependencySerializationTrait:: |
public | function | 2 | |
MemoryLimitDatabaseQueue:: |
protected | property | The memory limit of the php runtime. | |
MemoryLimitDatabaseQueue:: |
protected | function | Tries to reclaim memory. | |
MemoryLimitDatabaseQueue:: |
public | function |
Claims an item in the queue for processing. Overrides DatabaseQueue:: |
|
MemoryLimitDatabaseQueue:: |
protected | function | Find the php memory limit. | |
MemoryLimitDatabaseQueue:: |
public | function |
Constructs \Drupal\acquia_contenthub\Queue\MemoryLimitDatabaseQueue. Overrides DatabaseQueue:: |