You are here

public function PushQueue::claimItems in Salesforce Suite 5.0.x

Same name and namespace in other branches
  1. 8.4 modules/salesforce_push/src/PushQueue.php \Drupal\salesforce_push\PushQueue::claimItems()
  2. 8.3 modules/salesforce_push/src/PushQueue.php \Drupal\salesforce_push\PushQueue::claimItems()

Claim up to $n items from the current queue.

If queue is empty, return an empty array.

Parameters

int $n: Number of items to claim.

int $fail_limit: Do not claim items with this many or more failures.

int $lease_time: Time, in seconds, for which to hold this claim.

Return value

array Zero to $n Items indexed by item_id

Overrides PushQueueInterface::claimItems

See also

DatabaseQueue::claimItem

1 call to PushQueue::claimItems()
PushQueue::processQueue in modules/salesforce_push/src/PushQueue.php
Given a salesforce mapping, process all its push queue entries.

File

modules/salesforce_push/src/PushQueue.php, line 274

Class

PushQueue
Salesforce push queue.

Namespace

Drupal\salesforce_push

Code

public function claimItems($n, $fail_limit = self::DEFAULT_MAX_FAILS, $lease_time = self::DEFAULT_LEASE_TIME) {
  while (TRUE) {
    try {
      if ($n <= 0) {

        // If $n is zero, process as many items as possible.
        $n = $this->globalLimit;
      }

      // @TODO: convert items to content entities.
      // @see \Drupal::entityQuery()
      $items = $this->connection
        ->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name AND failures < :fail_limit ORDER BY created, item_id ASC', 0, $n, [
        ':name' => $this->name,
        ':fail_limit' => $fail_limit,
      ])
        ->fetchAllAssoc('item_id');
    } catch (\Exception $e) {
      $this
        ->catchException($e);

      // If the table does not exist there are no items currently available to
      // claim.
      return [];
    }
    if ($items) {

      // Try to update the item. Only one thread can succeed in UPDATEing the
      // same row. We cannot rely on REQUEST_TIME because items might be
      // claimed by a single consumer which runs longer than 1 second. If we
      // continue to use REQUEST_TIME instead of the current time(), we steal
      // time from the lease, and will tend to reset items before the lease
      // should really expire.
      $update = $this->connection
        ->update(static::TABLE_NAME)
        ->fields([
        'expire' => $this->time
          ->getRequestTime() + $lease_time,
      ])
        ->condition('item_id', array_keys($items), 'IN')
        ->condition('expire', 0);

      // If there are affected rows, this update succeeded.
      if ($update
        ->execute()) {
        return $items;
      }
    }
    else {

      // No items currently available to claim.
      return [];
    }
  }
}