View source
<?php
namespace Drupal\acquia_contenthub_subscriber\Plugin\QueueWorker;
use Acquia\Hmac\Key;
use Drupal\acquia_contenthub\AcquiaContentHubEvents;
use Drupal\acquia_contenthub\Client\ClientFactory;
use Drupal\acquia_contenthub\Event\HandleWebhookEvent;
use Drupal\Core\Logger\LoggerChannelInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\HttpFoundation\Request;
class ContentHubFilterExecuteWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {
protected const SCROLL_SIZE = 100;
protected const SCROLL_TIME_WINDOW = '10m';
protected $dispatcher;
protected $factory;
protected $client;
protected $loggerChannel;
public function __construct(EventDispatcherInterface $dispatcher, ClientFactory $factory, LoggerChannelInterface $logger_channel, array $configuration, string $plugin_id, $plugin_definition) {
$this->dispatcher = $dispatcher;
$this->factory = $factory;
$this->loggerChannel = $logger_channel;
$this->client = $factory
->getClient();
parent::__construct($configuration, $plugin_id, $plugin_definition);
}
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static($container
->get('event_dispatcher'), $container
->get('acquia_contenthub.client.factory'), $container
->get('acquia_contenthub_subscriber.logger_channel'), $configuration, $plugin_id, $plugin_definition);
}
private function extractEntityUuids(array $filter_response) : array {
$uuids = [];
if ($this
->isFinalPage($filter_response)) {
return [];
}
foreach ($filter_response['hits']['hits'] as $item) {
$uuids[$item['_source']['uuid']] = [
'uuid' => $item['_source']['uuid'],
'type' => $item['_source']['data']['type'],
];
}
return $uuids;
}
private function isFinalPage(array $filter_response) : bool {
return empty($filter_response['hits']['hits']);
}
public function processItem($data) : void {
if (!isset($data->filter_uuid)) {
throw new \Exception('Filter uuid not found.');
}
$client = $this->client;
$settings = $client
->getSettings();
$webhook_uuid = $settings
->getWebhook('uuid');
if (!$webhook_uuid) {
$this->loggerChannel
->critical('Your webhook is not properly setup. ContentHub cannot execute filters without an active webhook. Please set your webhook up properly and try again.');
return;
}
$uuids = [];
$interest_list = $client
->getInterestsByWebhook($webhook_uuid);
$matched_data = $this->client
->startScrollByFilter($data->filter_uuid, self::SCROLL_TIME_WINDOW, self::SCROLL_SIZE);
$is_final_page = $this
->isFinalPage($matched_data);
$uuids = array_merge($uuids, $this
->extractEntityUuids(array_merge($matched_data, $interest_list)));
$scroll_id = $matched_data['_scroll_id'];
try {
while (!$is_final_page) {
$matched_data = $this->client
->continueScroll($scroll_id, self::SCROLL_TIME_WINDOW);
$uuids = array_merge($uuids, $this
->extractEntityUuids(array_merge($matched_data, $interest_list)));
$scroll_id = $matched_data['_scroll_id'];
$is_final_page = $this
->isFinalPage($matched_data);
}
} finally {
$this->client
->cancelScroll($scroll_id);
}
if (!$uuids) {
return;
}
$chunks = array_chunk($uuids, 50);
foreach ($chunks as $chunk) {
$payload = [
'status' => 'successful',
'crud' => 'update',
'assets' => $chunk,
'initiator' => 'some-initiator',
];
$request = new Request([], [], [], [], [], [], json_encode($payload));
$key = new Key($settings
->getApiKey(), $settings
->getSecretKey());
$event = new HandleWebhookEvent($request, $payload, $key, $client);
$this->dispatcher
->dispatch(AcquiaContentHubEvents::HANDLE_WEBHOOK, $event);
}
}
}