View source  
  <?php
namespace Drupal\salesforce_push;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\Query\Merge;
use Drupal\Core\Entity\EntityInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\State\StateInterface;
use Drupal\salesforce\EntityNotFoundException;
use Drupal\salesforce\Event\SalesforceErrorEvent;
use Drupal\salesforce\Event\SalesforceEvents;
use Drupal\salesforce\Event\SalesforceNoticeEvent;
use Drupal\salesforce_mapping\Entity\SalesforceMappingInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PushQueue extends DatabaseQueue implements PushQueueInterface {
  
  const TABLE_NAME = 'salesforce_push_queue';
  
  const DEFAULT_GLOBAL_LIMIT = 10000;
  
  const DEFAULT_QUEUE_PROCESSOR = 'rest';
  
  const DEFAULT_MAX_FAILS = 10;
  
  const DEFAULT_LEASE_TIME = 300;
  
  protected $globalLimit;
  
  protected $maxFails;
  
  protected $connection;
  
  protected $state;
  
  protected $queueManager;
  
  protected $eventDispatcher;
  
  protected $garbageCollected;
  
  protected $mappingStorage;
  
  protected $mappedObjectStorage;
  
  protected $time;
  
  protected $config;
  
  protected $etm;
  
  public function __construct(Connection $connection, StateInterface $state, PushQueueProcessorPluginManager $queue_manager, EntityTypeManagerInterface $etm, EventDispatcherInterface $event_dispatcher, TimeInterface $time, ConfigFactoryInterface $config) {
    $this->connection = $connection;
    $this->state = $state;
    $this->queueManager = $queue_manager;
    $this->etm = $etm;
    $this->mappingStorage = $etm
      ->getStorage('salesforce_mapping');
    $this->mappedObjectStorage = $etm
      ->getStorage('salesforce_mapped_object');
    $this->eventDispatcher = $event_dispatcher;
    $this->time = $time;
    $this->config = $config
      ->get('salesforce.settings');
    $this->globalLimit = $this->config
      ->get('global_push_limit') ?: static::DEFAULT_GLOBAL_LIMIT;
    if (empty($this->globalLimit)) {
      $this->globalLimit = static::DEFAULT_GLOBAL_LIMIT;
    }
    $this->maxFails = $state
      ->get('salesforce.push_queue_max_fails', static::DEFAULT_MAX_FAILS);
    if (empty($this->maxFails)) {
      $this->maxFails = static::DEFAULT_MAX_FAILS;
    }
    $this->garbageCollected = FALSE;
  }
  
  public static function create(ContainerInterface $container) {
    return new static($container
      ->get('database'), $container
      ->get('state'), $container
      ->get('plugin.manager.salesforce_push_queue_processor'), $container
      ->get('entity_type.manager'), $container
      ->get('event_dispatcher'), $container
      ->get('datetime.time'), $container
      ->get('config.factory'));
  }
  
  public function setName($name) {
    $this->name = $name;
    return $this;
  }
  
  protected function doCreateItem($data) {
    
    if (empty($data['name']) || empty($data['entity_id']) || empty($data['op'])) {
      throw new \Exception('Salesforce push queue data values are required for "name", "entity_id" and "op"');
    }
    $this->name = $data['name'];
    $time = $this->time
      ->getRequestTime();
    $fields = [
      'name' => $this->name,
      'entity_id' => $data['entity_id'],
      'op' => $data['op'],
      'updated' => $time,
      'failures' => empty($data['failures']) ? 0 : $data['failures'],
      'mapped_object_id' => empty($data['mapped_object_id']) ? 0 : $data['mapped_object_id'],
    ];
    $query = $this->connection
      ->merge(static::TABLE_NAME)
      ->key([
      'name' => $this->name,
      'entity_id' => $data['entity_id'],
    ])
      ->fields($fields);
    
    $ret = $query
      ->execute();
    
    if ($ret == Merge::STATUS_INSERT) {
      $this->connection
        ->merge(static::TABLE_NAME)
        ->key([
        'name' => $this->name,
        'entity_id' => $data['entity_id'],
      ])
        ->fields([
        'created' => $time,
      ])
        ->execute();
    }
    return $ret;
  }
  
  public function claimItems($n, $fail_limit = self::DEFAULT_MAX_FAILS, $lease_time = self::DEFAULT_LEASE_TIME) {
    while (TRUE) {
      try {
        if ($n <= 0) {
          
          $n = $this->globalLimit;
        }
        
        $items = $this->connection
          ->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name AND failures < :fail_limit ORDER BY created, item_id ASC', 0, $n, [
          ':name' => $this->name,
          ':fail_limit' => $fail_limit,
        ])
          ->fetchAllAssoc('item_id');
      } catch (\Exception $e) {
        $this
          ->catchException($e);
        
        return [];
      }
      if ($items) {
        
        $update = $this->connection
          ->update(static::TABLE_NAME)
          ->fields([
          'expire' => $this->time
            ->getRequestTime() + $lease_time,
        ])
          ->condition('item_id', array_keys($items), 'IN')
          ->condition('expire', 0);
        
        if ($update
          ->execute()) {
          return $items;
        }
      }
      else {
        
        return [];
      }
    }
  }
  
  public function claimItem($lease_time = NULL) {
    throw new \Exception('This queue is designed to process multiple items at once. Please use "claimItems" instead.');
  }
  
  public function schemaDefinition() {
    return [
      'description' => 'Drupal entities to push to Salesforce.',
      'fields' => [
        'item_id' => [
          'type' => 'serial',
          'unsigned' => TRUE,
          'not null' => TRUE,
          'description' => 'Primary Key: Unique item ID.',
        ],
        'name' => [
          'type' => 'varchar_ascii',
          'length' => 255,
          'not null' => TRUE,
          'default' => '',
          'description' => 'The salesforce mapping id',
        ],
        'entity_id' => [
          'type' => 'int',
          'not null' => TRUE,
          'default' => 0,
          'description' => 'The entity id',
        ],
        'mapped_object_id' => [
          'type' => 'int',
          'not null' => TRUE,
          'default' => 0,
          'description' => 'Foreign key for salesforce_mapped_object table.',
        ],
        'op' => [
          'type' => 'varchar_ascii',
          'length' => 16,
          'not null' => TRUE,
          'default' => '',
          'description' => 'The operation which triggered this push',
        ],
        'failures' => [
          'type' => 'int',
          'not null' => TRUE,
          'default' => 0,
          'description' => 'Number of failed push attempts for this queue item.',
        ],
        'expire' => [
          'type' => 'int',
          'not null' => TRUE,
          'default' => 0,
          'description' => 'Timestamp when the claim lease expires on the item.',
        ],
        'created' => [
          'type' => 'int',
          'not null' => TRUE,
          'default' => 0,
          'description' => 'Timestamp when the item was created.',
        ],
        'updated' => [
          'type' => 'int',
          'not null' => TRUE,
          'default' => 0,
          'description' => 'Timestamp when the item was created.',
        ],
      ],
      'primary key' => [
        'item_id',
      ],
      'unique keys' => [
        'name_entity_id' => [
          'name',
          'entity_id',
        ],
      ],
      'indexes' => [
        'entity_id' => [
          'entity_id',
        ],
        'name_created' => [
          'name',
          'created',
        ],
        'expire' => [
          'expire',
        ],
      ],
    ];
  }
  
  public function processQueues($mappings = []) {
    if (empty($mappings)) {
      $mappings = $this->mappingStorage
        ->loadPushMappings();
    }
    if (empty($mappings)) {
      return $this;
    }
    $i = 0;
    foreach ($mappings as $mapping) {
      $i += $this
        ->processQueue($mapping);
      if ($i >= $this->globalLimit) {
        break;
      }
    }
    return $this;
  }
  
  public function processQueue(SalesforceMappingInterface $mapping) {
    if (!$this->connection
      ->schema()
      ->tableExists(static::TABLE_NAME)) {
      return 0;
    }
    $this
      ->garbageCollection();
    static $queue_processor = FALSE;
    
    if ($mapping
      ->getNextPushTime() > $this->time
      ->getRequestTime()) {
      return 0;
    }
    if (!$queue_processor) {
      
      $plugin_name = $this->state
        ->get('salesforce.push_queue_processor', static::DEFAULT_QUEUE_PROCESSOR);
      $queue_processor = $this->queueManager
        ->createInstance($plugin_name);
    }
    $i = 0;
    
    $this
      ->setName($mapping
      ->id());
    
    while (TRUE) {
      
      $items = $this
        ->claimItems($mapping->push_limit, $mapping->push_retries);
      if (empty($items)) {
        $mapping
          ->setLastPushTime($this->time
          ->getRequestTime());
        return $i;
      }
      
      try {
        $queue_processor
          ->process($items);
      } catch (RequeueException $e) {
        
        $this
          ->releaseItems($items);
        $this->eventDispatcher
          ->dispatch(SalesforceEvents::WARNING, new SalesforceErrorEvent($e));
        continue;
      } catch (SuspendQueueException $e) {
        
        $this
          ->releaseItems($items);
        $this->eventDispatcher
          ->dispatch(SalesforceEvents::WARNING, new SalesforceErrorEvent($e));
        return $i;
      } catch (\Exception $e) {
        
        $this->eventDispatcher
          ->dispatch(SalesforceEvents::ERROR, new SalesforceErrorEvent($e));
      } finally {
        
        $i += count($items);
        if ($i >= $this->globalLimit) {
          return $i;
        }
      }
    }
    return $i;
  }
  
  public function failItem(\Throwable $e, \stdClass $item) {
    $mapping = $this->mappingStorage
      ->load($item->name);
    if ($e instanceof EntityNotFoundException) {
      
      $message = 'Exception while loading entity %type %id for salesforce mapping %mapping. Queue item deleted.';
      $args = [
        '%type' => $mapping
          ->get('drupal_entity_type'),
        '%id' => $item->entity_id,
        '%mapping' => $mapping
          ->id(),
      ];
      $this->eventDispatcher
        ->dispatch(SalesforceEvents::NOTICE, new SalesforceNoticeEvent(NULL, $message, $args));
      $this
        ->deleteItem($item);
      return;
    }
    $item->failures++;
    $message = $e
      ->getMessage();
    if ($item->failures >= $this->maxFails) {
      $message = 'Permanently failed queue item %item failed %fail times. Exception while pushing entity %type %id for salesforce mapping %mapping. ' . $message;
    }
    else {
      $message = 'Queue item %item failed %fail times. Exception while pushing entity %type %id for salesforce mapping %mapping. ' . $message;
    }
    $args = [
      '%type' => $mapping
        ->get('drupal_entity_type'),
      '%id' => $item->entity_id,
      '%mapping' => $mapping
        ->id(),
      '%item' => $item->item_id,
      '%fail' => $item->failures,
    ];
    $this->eventDispatcher
      ->dispatch(SalesforceEvents::NOTICE, new SalesforceNoticeEvent(NULL, $message, $args));
    
    $this
      ->doCreateItem(get_object_vars($item));
  }
  
  public function releaseItems(array $items) {
    try {
      $update = $this->connection
        ->update(static::TABLE_NAME)
        ->fields([
        'expire' => 0,
      ])
        ->condition('item_id', array_keys($items), 'IN');
      return $update
        ->execute();
    } catch (\Exception $e) {
      $this->eventDispatcher
        ->dispatch(SalesforceEvents::ERROR, new SalesforceErrorEvent($e));
      $this
        ->catchException($e);
      
      return TRUE;
    }
  }
  
  public function deleteItemByEntity(EntityInterface $entity) {
    try {
      $this->connection
        ->delete(static::TABLE_NAME)
        ->condition('entity_id', $entity
        ->id())
        ->condition('name', $this->name)
        ->execute();
    } catch (\Exception $e) {
      $this
        ->catchException($e);
    }
  }
  
  public function deleteTable() {
    $this->connection
      ->schema()
      ->dropTable(static::TABLE_NAME);
  }
  
  public function garbageCollection() {
    if ($this->garbageCollected) {
      
      return;
    }
    try {
      
      $this->connection
        ->update(static::TABLE_NAME)
        ->fields([
        'expire' => 0,
      ])
        ->condition('expire', 0, '<>')
        ->condition('expire', $this->time
        ->getRequestTime(), '<')
        ->execute();
      $this->garbageCollected = TRUE;
    } catch (\Exception $e) {
      $this
        ->catchException($e);
    }
  }
}