public function PushQueue::processQueue in Salesforce Suite 8.4
Same name and namespace in other branches
- 8.3 modules/salesforce_push/src/PushQueue.php \Drupal\salesforce_push\PushQueue::processQueue()
- 5.0.x modules/salesforce_push/src/PushQueue.php \Drupal\salesforce_push\PushQueue::processQueue()
Given a salesforce mapping, process all its push queue entries.
Parameters
\Drupal\salesforce_mapping\Entity\SalesforceMappingInterface $mapping: Salesforce mapping.
Return value
int The number of items procesed, or -1 if there was any error, And also dispatches a SalesforceEvents::ERROR event.
Throws
\Drupal\Component\Plugin\Exception\PluginException
1 call to PushQueue::processQueue()
- PushQueue::processQueues in modules/
salesforce_push/ src/ PushQueue.php - Process Salesforce queues.
File
- modules/
salesforce_push/ src/ PushQueue.php, line 434
Class
- PushQueue
- Salesforce push queue.
Namespace
Drupal\salesforce_pushCode
public function processQueue(SalesforceMappingInterface $mapping) {
if (!$this->connection
->schema()
->tableExists(static::TABLE_NAME)) {
return 0;
}
$this
->garbageCollection();
static $queue_processor = FALSE;
// Check mapping frequency before proceeding.
if ($mapping
->getNextPushTime() > $this->time
->getRequestTime()) {
return 0;
}
if (!$queue_processor) {
// @TODO push queue processor could be set globally, or per-mapping. Exposing some UI setting would probably be better than this:
$plugin_name = $this->state
->get('salesforce.push_queue_processor', static::DEFAULT_QUEUE_PROCESSOR);
$queue_processor = $this->queueManager
->createInstance($plugin_name);
}
$i = 0;
// Set the queue name, which is the mapping id.
$this
->setName($mapping
->id());
// Iterate through items in this queue (mapping) until we run out or hit
// the mapping limit, then move to the next queue. If we hit the global
// limit, return immediately.
while (TRUE) {
// Claim as many items as we can from this queue and advance our counter.
// If this queue is empty, move to the next mapping.
$items = $this
->claimItems($mapping->push_limit, $mapping->push_retries);
if (empty($items)) {
$mapping
->setLastPushTime($this->time
->getRequestTime());
return $i;
}
// Hand them to the queue processor.
try {
$queue_processor
->process($items);
} catch (RequeueException $e) {
// Getting a Requeue here is weird for a group of items, but we'll
// deal with it.
$this
->releaseItems($items);
$this->eventDispatcher
->dispatch(SalesforceEvents::WARNING, new SalesforceErrorEvent($e));
continue;
} catch (SuspendQueueException $e) {
// Getting a SuspendQueue is more likely, e.g. because of a network
// or authorization error. Release items and move on to the next
// mapping in this case.
$this
->releaseItems($items);
$this->eventDispatcher
->dispatch(SalesforceEvents::WARNING, new SalesforceErrorEvent($e));
return $i;
} catch (\Exception $e) {
// In case of any other kind of exception, log it and leave the item
// in the queue to be processed again later.
// @TODO: this is how Cron.php queue works, but I don't really understand why it doesn't get re-queued.
$this->eventDispatcher
->dispatch(SalesforceEvents::ERROR, new SalesforceErrorEvent($e));
} finally {
// If we've reached our limit, we're done. Otherwise, continue to next
// items.
$i += count($items);
if ($i >= $this->globalLimit) {
return $i;
}
}
}
return $i;
}