View source  
  <?php
declare (strict_types=1);
namespace Drupal\entity_share_cron;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Component\Serialization\Json;
use Drupal\entity_share\EntityShareUtility;
use Drupal\entity_share_client\ImportContext;
use Drupal\entity_share_client\Service\ImportServiceInterface;
class EntityShareCronService implements EntityShareCronServiceInterface {
  
  protected $config;
  
  protected $queueFactory;
  
  protected $entityTypeManager;
  
  protected $importService;
  
  public function __construct(ConfigFactoryInterface $config_factory, QueueFactory $queue_factory, EntityTypeManagerInterface $entity_type_manager, ImportServiceInterface $import_service) {
    $this->config = $config_factory
      ->get('entity_share_cron.settings');
    $this->queueFactory = $queue_factory;
    $this->entityTypeManager = $entity_type_manager;
    $this->importService = $import_service;
  }
  
  public function enqueue($remote_id, $channel_id, $url) {
    $queue = $this->queueFactory
      ->get(EntityShareCronServiceInterface::PENDING_QUEUE_NAME);
    $item = [
      'remote_id' => $remote_id,
      'channel_id' => $channel_id,
      'url' => $url,
    ];
    $queue
      ->createItem($item);
  }
  
  public function sync($remote_id, $channel_id, $url) {
    $page_limit = $this->config
      ->get('page_limit');
    $channel_config = $this
      ->getChannelConfig($remote_id, $channel_id);
    $import_context = new ImportContext($remote_id, $channel_id, $channel_config['import_config']);
    if (!$this->importService
      ->prepareImport($import_context)) {
      return;
    }
    
    $data_to_import = [];
    $next_page = 1;
    if (is_null($url)) {
      $url = $this->importService
        ->getRuntimeImportContext()
        ->getChannelUrl();
    }
    while ($url) {
      
      $page_data = $this
        ->getPage($url);
      $data_to_import = array_merge($data_to_import, $page_data['data']);
      $next_page++;
      $url = $page_data['next'];
      if ($url && $page_limit != 0 && $next_page > $page_limit) {
        
        $this
          ->enqueue($remote_id, $channel_id, $url);
        $url = FALSE;
      }
    }
    
    $channel_config = $this
      ->getChannelConfig($remote_id, $channel_id);
    if (empty($channel_config['operations']['create'])) {
      $this
        ->filterDataToImport($data_to_import, TRUE);
    }
    if (empty($channel_config['operations']['update'])) {
      $this
        ->filterDataToImport($data_to_import, FALSE);
    }
    
    $this->importService
      ->importEntityListData($data_to_import);
  }
  
  protected function getPage($url) {
    $data = [
      'data' => [],
      'next' => FALSE,
    ];
    $response = $this->importService
      ->jsonApiRequest('GET', $url);
    $json = Json::decode((string) $response
      ->getBody());
    
    $data['data'] = EntityShareUtility::prepareData($json['data']);
    
    $data['next'] = !empty($json['links']['next']['href']) ? $json['links']['next']['href'] : FALSE;
    return $data;
  }
  
  protected function filterDataToImport(array &$data, $keep_existing) {
    
    $uuid_by_type = [];
    foreach ($data as $entity_data) {
      $parsed_type = explode('--', $entity_data['type']);
      $entity_type = $parsed_type[0];
      $uuid = $entity_data['id'];
      $uuid_by_type[$entity_type][] = $uuid;
    }
    
    $existing_uuids = [];
    foreach ($uuid_by_type as $entity_type => $uuids) {
      $definition = $this->entityTypeManager
        ->getDefinition($entity_type);
      $uuid_property = $definition
        ->getKey('uuid');
      
      $storage = $this->entityTypeManager
        ->getStorage($entity_type);
      $existing_entities = $storage
        ->loadByProperties([
        $uuid_property => $uuids,
      ]);
      
      foreach ($existing_entities as $entity) {
        $uuid = $entity
          ->uuid();
        $existing_uuids[$uuid] = $uuid;
      }
    }
    
    $data_updated = [];
    foreach ($data as $entity_data) {
      $uuid = $entity_data['id'];
      if ($keep_existing && !empty($existing_uuids[$uuid])) {
        $data_updated[] = $entity_data;
      }
      elseif (!$keep_existing && empty($existing_uuids[$uuid])) {
        $data_updated[] = $entity_data;
      }
    }
    $data = $data_updated;
  }
  
  protected function getChannelConfig($remote_id, $channel_id) {
    $settings = [];
    $remotes = $this->config
      ->get('remotes');
    if (!empty($remotes[$remote_id]['channels'][$channel_id])) {
      $settings = $remotes[$remote_id]['channels'][$channel_id];
    }
    return $settings;
  }
}