You are here

public function PushQueue::processQueue in Salesforce Suite 8.4

Same name and namespace in other branches
  1. 8.3 modules/salesforce_push/src/PushQueue.php \Drupal\salesforce_push\PushQueue::processQueue()
  2. 5.0.x modules/salesforce_push/src/PushQueue.php \Drupal\salesforce_push\PushQueue::processQueue()

Given a salesforce mapping, process all its push queue entries.

Parameters

\Drupal\salesforce_mapping\Entity\SalesforceMappingInterface $mapping: Salesforce mapping.

Return value

int The number of items procesed, or -1 if there was any error, And also dispatches a SalesforceEvents::ERROR event.

Throws

\Drupal\Component\Plugin\Exception\PluginException

1 call to PushQueue::processQueue()
PushQueue::processQueues in modules/salesforce_push/src/PushQueue.php
Process Salesforce queues.

File

modules/salesforce_push/src/PushQueue.php, line 434

Class

PushQueue
Salesforce push queue.

Namespace

Drupal\salesforce_push

Code

public function processQueue(SalesforceMappingInterface $mapping) {
  if (!$this->connection
    ->schema()
    ->tableExists(static::TABLE_NAME)) {
    return 0;
  }
  $this
    ->garbageCollection();
  static $queue_processor = FALSE;

  // Check mapping frequency before proceeding.
  if ($mapping
    ->getNextPushTime() > $this->time
    ->getRequestTime()) {
    return 0;
  }
  if (!$queue_processor) {

    // @TODO push queue processor could be set globally, or per-mapping. Exposing some UI setting would probably be better than this:
    $plugin_name = $this->state
      ->get('salesforce.push_queue_processor', static::DEFAULT_QUEUE_PROCESSOR);
    $queue_processor = $this->queueManager
      ->createInstance($plugin_name);
  }
  $i = 0;

  // Set the queue name, which is the mapping id.
  $this
    ->setName($mapping
    ->id());

  // Iterate through items in this queue (mapping) until we run out or hit
  // the mapping limit, then move to the next queue. If we hit the global
  // limit, return immediately.
  while (TRUE) {

    // Claim as many items as we can from this queue and advance our counter.
    // If this queue is empty, move to the next mapping.
    $items = $this
      ->claimItems($mapping->push_limit, $mapping->push_retries);
    if (empty($items)) {
      $mapping
        ->setLastPushTime($this->time
        ->getRequestTime());
      return $i;
    }

    // Hand them to the queue processor.
    try {
      $queue_processor
        ->process($items);
    } catch (RequeueException $e) {

      // Getting a Requeue here is weird for a group of items, but we'll
      // deal with it.
      $this
        ->releaseItems($items);
      $this->eventDispatcher
        ->dispatch(SalesforceEvents::WARNING, new SalesforceErrorEvent($e));
      continue;
    } catch (SuspendQueueException $e) {

      // Getting a SuspendQueue is more likely, e.g. because of a network
      // or authorization error. Release items and move on to the next
      // mapping in this case.
      $this
        ->releaseItems($items);
      $this->eventDispatcher
        ->dispatch(SalesforceEvents::WARNING, new SalesforceErrorEvent($e));
      return $i;
    } catch (\Exception $e) {

      // In case of any other kind of exception, log it and leave the item
      // in the queue to be processed again later.
      // @TODO: this is how Cron.php queue works, but I don't really understand why it doesn't get re-queued.
      $this->eventDispatcher
        ->dispatch(SalesforceEvents::ERROR, new SalesforceErrorEvent($e));
    } finally {

      // If we've reached our limit, we're done. Otherwise, continue to next
      // items.
      $i += count($items);
      if ($i >= $this->globalLimit) {
        return $i;
      }
    }
  }
  return $i;
}