View source
<?php
declare (strict_types=1);
namespace Drupal\entity_share_cron;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Component\Serialization\Json;
use Drupal\entity_share\EntityShareUtility;
use Drupal\entity_share_client\ImportContext;
use Drupal\entity_share_client\Service\ImportServiceInterface;
class EntityShareCronService implements EntityShareCronServiceInterface {
protected $config;
protected $queueFactory;
protected $entityTypeManager;
protected $importService;
public function __construct(ConfigFactoryInterface $config_factory, QueueFactory $queue_factory, EntityTypeManagerInterface $entity_type_manager, ImportServiceInterface $import_service) {
$this->config = $config_factory
->get('entity_share_cron.settings');
$this->queueFactory = $queue_factory;
$this->entityTypeManager = $entity_type_manager;
$this->importService = $import_service;
}
public function enqueue($remote_id, $channel_id, $url) {
$queue = $this->queueFactory
->get(EntityShareCronServiceInterface::PENDING_QUEUE_NAME);
$item = [
'remote_id' => $remote_id,
'channel_id' => $channel_id,
'url' => $url,
];
$queue
->createItem($item);
}
public function sync($remote_id, $channel_id, $url) {
$page_limit = $this->config
->get('page_limit');
$channel_config = $this
->getChannelConfig($remote_id, $channel_id);
$import_context = new ImportContext($remote_id, $channel_id, $channel_config['import_config']);
if (!$this->importService
->prepareImport($import_context)) {
return;
}
$data_to_import = [];
$next_page = 1;
if (is_null($url)) {
$url = $this->importService
->getRuntimeImportContext()
->getChannelUrl();
}
while ($url) {
$page_data = $this
->getPage($url);
$data_to_import = array_merge($data_to_import, $page_data['data']);
$next_page++;
$url = $page_data['next'];
if ($url && $page_limit != 0 && $next_page > $page_limit) {
$this
->enqueue($remote_id, $channel_id, $url);
$url = FALSE;
}
}
$channel_config = $this
->getChannelConfig($remote_id, $channel_id);
if (empty($channel_config['operations']['create'])) {
$this
->filterDataToImport($data_to_import, TRUE);
}
if (empty($channel_config['operations']['update'])) {
$this
->filterDataToImport($data_to_import, FALSE);
}
$this->importService
->importEntityListData($data_to_import);
}
protected function getPage($url) {
$data = [
'data' => [],
'next' => FALSE,
];
$response = $this->importService
->jsonApiRequest('GET', $url);
$json = Json::decode((string) $response
->getBody());
$data['data'] = EntityShareUtility::prepareData($json['data']);
$data['next'] = !empty($json['links']['next']['href']) ? $json['links']['next']['href'] : FALSE;
return $data;
}
protected function filterDataToImport(array &$data, $keep_existing) {
$uuid_by_type = [];
foreach ($data as $entity_data) {
$parsed_type = explode('--', $entity_data['type']);
$entity_type = $parsed_type[0];
$uuid = $entity_data['id'];
$uuid_by_type[$entity_type][] = $uuid;
}
$existing_uuids = [];
foreach ($uuid_by_type as $entity_type => $uuids) {
$definition = $this->entityTypeManager
->getDefinition($entity_type);
$uuid_property = $definition
->getKey('uuid');
$storage = $this->entityTypeManager
->getStorage($entity_type);
$existing_entities = $storage
->loadByProperties([
$uuid_property => $uuids,
]);
foreach ($existing_entities as $entity) {
$uuid = $entity
->uuid();
$existing_uuids[$uuid] = $uuid;
}
}
$data_updated = [];
foreach ($data as $entity_data) {
$uuid = $entity_data['id'];
if ($keep_existing && !empty($existing_uuids[$uuid])) {
$data_updated[] = $entity_data;
}
elseif (!$keep_existing && empty($existing_uuids[$uuid])) {
$data_updated[] = $entity_data;
}
}
$data = $data_updated;
}
protected function getChannelConfig($remote_id, $channel_id) {
$settings = [];
$remotes = $this->config
->get('remotes');
if (!empty($remotes[$remote_id]['channels'][$channel_id])) {
$settings = $remotes[$remote_id]['channels'][$channel_id];
}
return $settings;
}
}