You are here

public function MemoryLimitDatabaseQueue::claimItem in Acquia Content Hub 8

Claims an item in the queue for processing.

Parameters

$lease_time: How long the processing is expected to take in seconds, defaults to an hour. After this lease expires, the item will be reset and another consumer can claim the item. For idempotent tasks (which can be run multiple times without side effects), shorter lease times would result in lower latency in case a consumer fails. For tasks that should not be run more than once (non-idempotent), a larger lease time will make it more rare for a given task to run multiple times in cases of failure, at the cost of higher latency.

Return value

On success we return an item object. If the queue is unable to claim an item it returns false. This implies a best effort to retrieve an item and either the queue is empty or there is some other non-recoverable problem.

If returned, the object will have at least the following properties:

  • data: the same as what what passed into createItem().
  • item_id: the unique ID returned from createItem().
  • created: timestamp when the item was put into the queue.

Overrides DatabaseQueue::claimItem

File

src/Queue/MemoryLimitDatabaseQueue.php, line 77

Class

MemoryLimitDatabaseQueue
Overridden queue implementation that evaluates memory usage.

Namespace

Drupal\acquia_contenthub\Queue

Code

public function claimItem($lease_time = 30) {

  // Allow for the queue to be paused via Drupal state. Useful when the queue
  // is processed by multiple workers and distributed across machines, this
  // central switch allow for a graceful pause of the entire processing.
  $paused = \Drupal::state()
    ->get('acquia_contenthub.export_queue.paused');
  if ($paused) {
    Drush::output()
      ->writeln(dt("Acquia Content Hub export queue is currently paused."));
    return FALSE;
  }

  // Claim an item by updating its expire fields. If claim is not successful
  // another thread may have claimed the item in the meantime. Therefore loop
  // until an item is successfully claimed or we are reasonably sure there
  // are no unclaimed items left.
  while ($this
    ->attemptMemoryReclaim() * 2 <= $this
    ->memoryLimit()) {
    try {
      $item = $this->connection
        ->queryRange('SELECT data, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, [
        ':name' => $this->name,
      ])
        ->fetchObject();
    } catch (\Exception $e) {
      $this
        ->catchException($e);

      // If the table does not exist there are no items currently available to
      // claim.
      return FALSE;
    }
    if ($item) {

      // Try to update the item. Only one thread can succeed in UPDATEing the
      // same row. We cannot rely on REQUEST_TIME because items might be
      // claimed by a single consumer which runs longer than 1 second. If we
      // continue to use REQUEST_TIME instead of the current time(), we steal
      // time from the lease, and will tend to reset items before the lease
      // should really expire.
      $update = $this->connection
        ->update(static::TABLE_NAME)
        ->fields([
        'expire' => time() + $lease_time,
      ])
        ->condition('item_id', $item->item_id)
        ->condition('expire', 0);

      // If there are affected rows, this update succeeded.
      if ($update
        ->execute()) {
        $item->data = unserialize($item->data);
        return $item;
      }
    }
    else {

      // No items currently available to claim.
      return FALSE;
    }
  }
  if ($this
    ->numberOfItems()) {
    drush_log(dt("The queue operation has run out of memory. There are @number items left in the queue. Restart the queue to continue processing.", [
      '@number' => $this
        ->numberOfItems(),
    ]));
  }
}