View source
<?php
namespace Drupal\salesforce_push;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\Query\Merge;
use Drupal\Core\Entity\EntityInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\State\StateInterface;
use Drupal\salesforce\EntityNotFoundException;
use Drupal\salesforce\Event\SalesforceErrorEvent;
use Drupal\salesforce\Event\SalesforceEvents;
use Drupal\salesforce\Event\SalesforceNoticeEvent;
use Drupal\salesforce_mapping\Entity\SalesforceMappingInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PushQueue extends DatabaseQueue implements PushQueueInterface {
const TABLE_NAME = 'salesforce_push_queue';
const DEFAULT_GLOBAL_LIMIT = 10000;
const DEFAULT_QUEUE_PROCESSOR = 'rest';
const DEFAULT_MAX_FAILS = 10;
const DEFAULT_LEASE_TIME = 300;
protected $globalLimit;
protected $maxFails;
protected $connection;
protected $state;
protected $queueManager;
protected $eventDispatcher;
protected $garbageCollected;
protected $mappingStorage;
protected $mappedObjectStorage;
protected $time;
protected $config;
protected $etm;
public function __construct(Connection $connection, StateInterface $state, PushQueueProcessorPluginManager $queue_manager, EntityTypeManagerInterface $etm, EventDispatcherInterface $event_dispatcher, TimeInterface $time, ConfigFactoryInterface $config) {
$this->connection = $connection;
$this->state = $state;
$this->queueManager = $queue_manager;
$this->etm = $etm;
$this->mappingStorage = $etm
->getStorage('salesforce_mapping');
$this->mappedObjectStorage = $etm
->getStorage('salesforce_mapped_object');
$this->eventDispatcher = $event_dispatcher;
$this->time = $time;
$this->config = $config
->get('salesforce.settings');
$this->globalLimit = $this->config
->get('global_push_limit') ?: static::DEFAULT_GLOBAL_LIMIT;
if (empty($this->globalLimit)) {
$this->globalLimit = static::DEFAULT_GLOBAL_LIMIT;
}
$this->maxFails = $state
->get('salesforce.push_queue_max_fails', static::DEFAULT_MAX_FAILS);
if (empty($this->maxFails)) {
$this->maxFails = static::DEFAULT_MAX_FAILS;
}
$this->garbageCollected = FALSE;
}
public static function create(ContainerInterface $container) {
return new static($container
->get('database'), $container
->get('state'), $container
->get('plugin.manager.salesforce_push_queue_processor'), $container
->get('entity_type.manager'), $container
->get('event_dispatcher'), $container
->get('datetime.time'), $container
->get('config.factory'));
}
public function setName($name) {
$this->name = $name;
return $this;
}
protected function doCreateItem($data) {
if (empty($data['name']) || empty($data['entity_id']) || empty($data['op'])) {
throw new \Exception('Salesforce push queue data values are required for "name", "entity_id" and "op"');
}
$this->name = $data['name'];
$time = $this->time
->getRequestTime();
$fields = [
'name' => $this->name,
'entity_id' => $data['entity_id'],
'op' => $data['op'],
'updated' => $time,
'failures' => empty($data['failures']) ? 0 : $data['failures'],
'mapped_object_id' => empty($data['mapped_object_id']) ? 0 : $data['mapped_object_id'],
];
$query = $this->connection
->merge(static::TABLE_NAME)
->key([
'name' => $this->name,
'entity_id' => $data['entity_id'],
])
->fields($fields);
$ret = $query
->execute();
if ($ret == Merge::STATUS_INSERT) {
$this->connection
->merge(static::TABLE_NAME)
->key([
'name' => $this->name,
'entity_id' => $data['entity_id'],
])
->fields([
'created' => $time,
])
->execute();
}
return $ret;
}
public function claimItems($n, $fail_limit = self::DEFAULT_MAX_FAILS, $lease_time = self::DEFAULT_LEASE_TIME) {
while (TRUE) {
try {
if ($n <= 0) {
$n = $this->globalLimit;
}
$items = $this->connection
->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name AND failures < :fail_limit ORDER BY created, item_id ASC', 0, $n, [
':name' => $this->name,
':fail_limit' => $fail_limit,
])
->fetchAllAssoc('item_id');
} catch (\Exception $e) {
$this
->catchException($e);
return [];
}
if ($items) {
$update = $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => $this->time
->getRequestTime() + $lease_time,
])
->condition('item_id', array_keys($items), 'IN')
->condition('expire', 0);
if ($update
->execute()) {
return $items;
}
}
else {
return [];
}
}
}
public function claimItem($lease_time = NULL) {
throw new \Exception('This queue is designed to process multiple items at once. Please use "claimItems" instead.');
}
public function schemaDefinition() {
return [
'description' => 'Drupal entities to push to Salesforce.',
'fields' => [
'item_id' => [
'type' => 'serial',
'unsigned' => TRUE,
'not null' => TRUE,
'description' => 'Primary Key: Unique item ID.',
],
'name' => [
'type' => 'varchar_ascii',
'length' => 255,
'not null' => TRUE,
'default' => '',
'description' => 'The salesforce mapping id',
],
'entity_id' => [
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'The entity id',
],
'mapped_object_id' => [
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Foreign key for salesforce_mapped_object table.',
],
'op' => [
'type' => 'varchar_ascii',
'length' => 16,
'not null' => TRUE,
'default' => '',
'description' => 'The operation which triggered this push',
],
'failures' => [
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Number of failed push attempts for this queue item.',
],
'expire' => [
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the claim lease expires on the item.',
],
'created' => [
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the item was created.',
],
'updated' => [
'type' => 'int',
'not null' => TRUE,
'default' => 0,
'description' => 'Timestamp when the item was created.',
],
],
'primary key' => [
'item_id',
],
'unique keys' => [
'name_entity_id' => [
'name',
'entity_id',
],
],
'indexes' => [
'entity_id' => [
'entity_id',
],
'name_created' => [
'name',
'created',
],
'expire' => [
'expire',
],
],
];
}
public function processQueues($mappings = []) {
if (empty($mappings)) {
$mappings = $this->mappingStorage
->loadPushMappings();
}
if (empty($mappings)) {
return $this;
}
$i = 0;
foreach ($mappings as $mapping) {
$i += $this
->processQueue($mapping);
if ($i >= $this->globalLimit) {
break;
}
}
return $this;
}
public function processQueue(SalesforceMappingInterface $mapping) {
if (!$this->connection
->schema()
->tableExists(static::TABLE_NAME)) {
return 0;
}
$this
->garbageCollection();
static $queue_processor = FALSE;
if ($mapping
->getNextPushTime() > $this->time
->getRequestTime()) {
return 0;
}
if (!$queue_processor) {
$plugin_name = $this->state
->get('salesforce.push_queue_processor', static::DEFAULT_QUEUE_PROCESSOR);
$queue_processor = $this->queueManager
->createInstance($plugin_name);
}
$i = 0;
$this
->setName($mapping
->id());
while (TRUE) {
$items = $this
->claimItems($mapping->push_limit, $mapping->push_retries);
if (empty($items)) {
$mapping
->setLastPushTime($this->time
->getRequestTime());
return $i;
}
try {
$queue_processor
->process($items);
} catch (RequeueException $e) {
$this
->releaseItems($items);
$this->eventDispatcher
->dispatch(SalesforceEvents::WARNING, new SalesforceErrorEvent($e));
continue;
} catch (SuspendQueueException $e) {
$this
->releaseItems($items);
$this->eventDispatcher
->dispatch(SalesforceEvents::WARNING, new SalesforceErrorEvent($e));
return $i;
} catch (\Exception $e) {
$this->eventDispatcher
->dispatch(SalesforceEvents::ERROR, new SalesforceErrorEvent($e));
} finally {
$i += count($items);
if ($i >= $this->globalLimit) {
return $i;
}
}
}
return $i;
}
public function failItem(\Exception $e, \stdClass $item) {
$mapping = $this->mappingStorage
->load($item->name);
if ($e instanceof EntityNotFoundException) {
$message = 'Exception while loading entity %type %id for salesforce mapping %mapping. Queue item deleted.';
$args = [
'%type' => $mapping
->get('drupal_entity_type'),
'%id' => $item->entity_id,
'%mapping' => $mapping
->id(),
];
$this->eventDispatcher
->dispatch(SalesforceEvents::NOTICE, new SalesforceNoticeEvent(NULL, $message, $args));
$this
->deleteItem($item);
return;
}
$item->failures++;
$message = $e
->getMessage();
if ($item->failures >= $this->maxFails) {
$message = 'Permanently failed queue item %item failed %fail times. Exception while pushing entity %type %id for salesforce mapping %mapping. ' . $message;
}
else {
$message = 'Queue item %item failed %fail times. Exception while pushing entity %type %id for salesforce mapping %mapping. ' . $message;
}
$args = [
'%type' => $mapping
->get('drupal_entity_type'),
'%id' => $item->entity_id,
'%mapping' => $mapping
->id(),
'%item' => $item->item_id,
'%fail' => $item->failures,
];
$this->eventDispatcher
->dispatch(SalesforceEvents::NOTICE, new SalesforceNoticeEvent(NULL, $message, $args));
$this
->doCreateItem(get_object_vars($item));
}
public function releaseItems(array $items) {
try {
$update = $this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => 0,
])
->condition('item_id', array_keys($items), 'IN');
return $update
->execute();
} catch (\Exception $e) {
$this->eventDispatcher
->dispatch(SalesforceEvents::ERROR, new SalesforceErrorEvent($e));
$this
->catchException($e);
return TRUE;
}
}
public function deleteItemByEntity(EntityInterface $entity) {
try {
$this->connection
->delete(static::TABLE_NAME)
->condition('entity_id', $entity
->id())
->condition('name', $this->name)
->execute();
} catch (\Exception $e) {
$this
->catchException($e);
}
}
public function deleteTable() {
$this->connection
->schema()
->dropTable(static::TABLE_NAME);
}
public function garbageCollection() {
if ($this->garbageCollected) {
return;
}
try {
$this->connection
->update(static::TABLE_NAME)
->fields([
'expire' => 0,
])
->condition('expire', 0, '<>')
->condition('expire', REQUEST_TIME, '<')
->execute();
$this->garbageCollected = TRUE;
} catch (\Exception $e) {
$this
->catchException($e);
}
}
}