You are here

class QueueHandler in Salesforce Suite 5.0.x

Same name and namespace in other branches
  1. 8.4 modules/salesforce_pull/src/QueueHandler.php \Drupal\salesforce_pull\QueueHandler
  2. 8.3 modules/salesforce_pull/src/QueueHandler.php \Drupal\salesforce_pull\QueueHandler

Handles pull cron queue set up.


Expanded class hierarchy of QueueHandler

See also


3 files declare their use of QueueHandler
PullController.php in modules/salesforce_pull/src/Controller/PullController.php
QueueHandlerTest.php in modules/salesforce_pull/tests/src/Unit/QueueHandlerTest.php
SalesforcePullCommands.php in modules/salesforce_pull/src/Commands/SalesforcePullCommands.php
1 string reference to 'QueueHandler' in modules/salesforce_pull/
1 service uses QueueHandler
salesforce_pull.queue_handler in modules/salesforce_pull/


modules/salesforce_pull/src/QueueHandler.php, line 26


View source
class QueueHandler {
  const PULL_MAX_QUEUE_SIZE = 100000;
  const PULL_QUEUE_NAME = 'cron_salesforce_pull';

   * Salesforce client.
   * @var \Drupal\salesforce\Rest\RestClientInterface
  protected $sfapi;

   * Queue service.
   * @var \Drupal\Core\Queue\QueueInterface
  protected $queue;

   * All pull mappings.
   * @var \Drupal\salesforce_mapping\Entity\SalesforceMapping[]
  protected $mappings;

   * Config service.
   * @var \Drupal\Core\Config\Config
  protected $config;

   * The current request.
   * @var \Symfony\Component\HttpFoundation\Request
  protected $request;

   * Event dispatcher service.
   * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
  protected $eventDispatcher;

   * QueueHandler constructor.
   * @param \Drupal\salesforce\Rest\RestClientInterface $sfapi
   *   Salesforce service.
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
   *   Entity type manager service.
   * @param \Drupal\Core\Queue\QueueDatabaseFactory $queue_factory
   *   Queue service.
   * @param \Drupal\Core\Config\ConfigFactoryInterface $config
   *   Config service.
   * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
   *   Event dispatcher service.
   * @param \Drupal\Component\Datetime\TimeInterface $time
   *   Time service.
   * @throws \Drupal\Component\Plugin\Exception\InvalidPluginDefinitionException
   * @throws \Drupal\Component\Plugin\Exception\PluginNotFoundException
  public function __construct(RestClientInterface $sfapi, EntityTypeManagerInterface $entity_type_manager, QueueDatabaseFactory $queue_factory, ConfigFactoryInterface $config, EventDispatcherInterface $event_dispatcher, TimeInterface $time) {
    $this->sfapi = $sfapi;
    $this->queue = $queue_factory
    $this->config = $config
    $this->eventDispatcher = $event_dispatcher;
    $this->time = $time;
    $this->mappings = $entity_type_manager

   * Pull updated records from Salesforce and place them in the queue.
   * Executes a SOQL query based on defined mappings, loops through the results,
   * and places each updated SF object into the queue for later processing.
   * @param bool $force_pull
   *   Whether to force the queried records to be pulled.
   * @param int $start
   *   Timestamp of starting window from which to pull records. If omitted, use
   *   ::getLastPullTime().
   * @param int $stop
   *   Timestamp of ending window from which to pull records. If omitted, use
   *   "now".
   * @return bool
   *   TRUE if there was room to add items, FALSE otherwise.
  public function getUpdatedRecords($force_pull = FALSE, $start = 0, $stop = 0) {

    // Avoid overloading the processing queue and pass this time around if it's
    // over a configurable limit.
    $max_size = $this->config
      ->get('pull_max_queue_size') ?: static::PULL_MAX_QUEUE_SIZE;
    if ($max_size && $this->queue
      ->numberOfItems() > $max_size) {
      $message = 'Pull Queue contains %noi items, exceeding the max size of %max items. Pull processing will be blocked until the number of items in the queue is reduced to below the max size.';
      $args = [
        '%noi' => $this->queue
        '%max' => $max_size,
        ->dispatch(new SalesforceNoticeEvent(NULL, $message, $args), SalesforceEvents::NOTICE);
      return FALSE;

    // Iterate over each field mapping to determine our query parameters.
    foreach ($this->mappings as $mapping) {
        ->getUpdatedRecordsForMapping($mapping, $force_pull, $start, $stop);
    return TRUE;

   * Fetch and enqueue records from Salesforce.
   * Given a mapping and optional timeframe, perform an API query for updated
   * records and enqueue them into the pull queue.
   * @param \Drupal\salesforce_mapping\Entity\SalesforceMappingInterface $mapping
   *   The salesforce mapping for which to query.
   * @param bool $force_pull
   *   Whether to force the queried records to be pulled.
   * @param int $start
   *   Timestamp of starting window from which to pull records. If omitted, use
   *   ::getLastPullTime().
   * @param int $stop
   *   Timestamp of ending window from which to pull records. If omitted, use
   *   "now".
   * @return false|int
   *   Return the number of records fetched by the pull query, or FALSE no
   *   query was executed.
   * @see SalesforceMappingInterface
  public function getUpdatedRecordsForMapping(SalesforceMappingInterface $mapping, $force_pull = FALSE, $start = 0, $stop = 0) {
    if (!$mapping
      ->doesPull()) {
      return FALSE;
    if ($start == 0 && $mapping
      ->getNextPullTime() > $this->time
      ->getRequestTime()) {

      // Skip this mapping, based on pull frequency.
      return FALSE;
    $results = $this
      ->doSfoQuery($mapping, [], $start, $stop);
    if ($results) {
        ->enqueueAllResults($mapping, $results, $force_pull);
      return $results

   * Given a single mapping/id pair, enqueue it.
   * @param \Drupal\salesforce_mapping\Entity\SalesforceMappingInterface $mapping
   *   The mapping.
   * @param \Drupal\salesforce\SFID $id
   *   The record id.
   * @param bool $force_pull
   *   Whether to force a pull. TRUE by default.
   * @return bool
   *   TRUE if the record was enqueued successfully. Otherwise FALSE.
  public function getSingleUpdatedRecord(SalesforceMappingInterface $mapping, SFID $id, $force_pull = TRUE) {
    if (!$mapping
      ->doesPull()) {
      return FALSE;
    $record = $this->sfapi
      ->getSalesforceObjectType(), (string) $id);
    if ($record) {
      $results = SelectQueryResult::createSingle($record);
        ->enqueueAllResults($mapping, $results, $force_pull);
      return TRUE;
    return FALSE;

   * Perform the SFO Query for a mapping and its mapped fields.
   * @param \Drupal\salesforce_mapping\Entity\SalesforceMappingInterface $mapping
   *   Mapping for which to execute pull.
   * @param array $mapped_fields
   *   Fetch only these fields, if given, otherwise fetch all mapped fields.
   * @param int $start
   *   Timestamp of starting window from which to pull records. If omitted, use
   *   ::getLastPullTime().
   * @param int $stop
   *   Timestamp of ending window from which to pull records. If omitted, use
   *   "now".
   * @return \Drupal\salesforce\SelectQueryResult
   *   returned result object from Salesforce
   * @see SalesforceMappingInterface
  public function doSfoQuery(SalesforceMappingInterface $mapping, array $mapped_fields = [], $start = 0, $stop = 0) {

    // @TODO figure out the new way to build the query.
    // Execute query.
    try {
      $soql = $mapping
        ->getPullQuery($mapped_fields, $start, $stop);
        ->dispatch(new SalesforceQueryEvent($mapping, $soql), SalesforceEvents::PULL_QUERY);
      return $this->sfapi
    } catch (\Exception $e) {
      $message = '%type: @message in %function (line %line of %file).';
      $args = Error::decodeException($e);
        ->dispatch(new SalesforceErrorEvent($e, $message, $args), SalesforceEvents::ERROR);

   * Inserts the given records into pull queue.
   * @param \Drupal\salesforce_mapping\Entity\SalesforceMappingInterface $mapping
   *   Mapping.
   * @param \Drupal\salesforce\SelectQueryResult $results
   *   Results.
   * @param bool $force_pull
   *   Force flag.
  public function enqueueAllResults(SalesforceMappingInterface $mapping, SelectQueryResult $results, $force_pull = FALSE) {
    while (!$this
      ->enqueueResultSet($mapping, $results, $force_pull)) {
      try {
        $results = $this->sfapi
      } catch (\Exception $e) {
        $message = '%type: @message in %function (line %line of %file).';
        $args = Error::decodeException($e);
          ->dispatch(new SalesforceErrorEvent($e, $message, $args), SalesforceEvents::ERROR);

        // @TODO do we really want to eat this exception here?

   * Enqueue a set of results into pull queue.
   * @param \Drupal\salesforce_mapping\Entity\SalesforceMappingInterface $mapping
   *   Mapping object currently being processed.
   * @param \Drupal\salesforce\SelectQueryResult $results
   *   Result record set.
   * @param bool $force_pull
   *   Whether to force pull for enqueued items.
   * @return bool
   *   Returns results->done(): TRUE if there are no more results, or FALSE if
   *   there are additional records to be queried.
  public function enqueueResultSet(SalesforceMappingInterface $mapping, SelectQueryResult $results, $force_pull = FALSE) {
    $max_time = 0;
    $triggerField = $mapping
    try {
      foreach ($results
        ->records() as $record) {

        // @TODO? Pull Queue Enqueue Event
          ->enqueueRecord($mapping, $record, $force_pull);
        $record_time = strtotime($record
        if ($max_time < $record_time) {
          $max_time = $record_time;
      return $results
    } catch (\Exception $e) {
      $message = '%type: @message in %function (line %line of %file).';
      $args = Error::decodeException($e);
        ->dispatch(new SalesforceErrorEvent($e, $message, $args), SalesforceEvents::ERROR);

   * Enqueue a single record for pull.
   * @param \Drupal\salesforce_mapping\Entity\SalesforceMappingInterface $mapping
   *   Mapping.
   * @param \Drupal\salesforce\SObject $record
   *   Salesforce data.
   * @param bool $force_pull
   *   If TRUE, ignore timestamps and force data to be pulled.
   * @throws \Exception
  public function enqueueRecord(SalesforceMappingInterface $mapping, SObject $record, $force_pull = FALSE) {
      ->createItem(new PullQueueItem($record, $mapping, $force_pull));



Namesort descending Modifiers Type Description Overrides
QueueHandler::$config protected property Config service.
QueueHandler::$eventDispatcher protected property Event dispatcher service.
QueueHandler::$mappings protected property All pull mappings.
QueueHandler::$queue protected property Queue service.
QueueHandler::$request protected property The current request.
QueueHandler::$sfapi protected property Salesforce client.
QueueHandler::doSfoQuery public function Perform the SFO Query for a mapping and its mapped fields.
QueueHandler::enqueueAllResults public function Inserts the given records into pull queue.
QueueHandler::enqueueRecord public function Enqueue a single record for pull.
QueueHandler::enqueueResultSet public function Enqueue a set of results into pull queue.
QueueHandler::getSingleUpdatedRecord public function Given a single mapping/id pair, enqueue it.
QueueHandler::getUpdatedRecords public function Pull updated records from Salesforce and place them in the queue.
QueueHandler::getUpdatedRecordsForMapping public function Fetch and enqueue records from Salesforce.
QueueHandler::PULL_MAX_QUEUE_SIZE constant
QueueHandler::PULL_QUEUE_NAME constant
QueueHandler::__construct public function QueueHandler constructor.