You are here

public function EntityShareAsyncWorker::processItem in Entity Share 8.2

Same name and namespace in other branches
  1. 8.3 modules/entity_share_async/src/Plugin/QueueWorker/EntityShareAsyncWorker.php \Drupal\entity_share_async\Plugin\QueueWorker\EntityShareAsyncWorker::processItem()

Works on a single queue item.

Parameters

mixed $data: The data that was passed to \Drupal\Core\Queue\QueueInterface::createItem() when the item was queued.

Throws

\Drupal\Core\Queue\RequeueException Processing is not yet finished. This will allow another process to claim the item immediately.

\Exception A QueueWorker plugin may throw an exception to indicate there was a problem. The cron process will log the exception, and leave the item in the queue to be processed again later.

\Drupal\Core\Queue\SuspendQueueException More specifically, a SuspendQueueException should be thrown when a QueueWorker plugin is aware that the problem will affect all subsequent workers of its queue. For example, a callback that makes HTTP requests may find that the remote server is not responding. The cron process will behave as with a normal Exception, and in addition will not attempt to process further items from the current item's queue during the current cron run.

Overrides QueueWorkerInterface::processItem

See also

\Drupal\Core\Cron::processQueues()

File

modules/entity_share_async/src/Plugin/QueueWorker/EntityShareAsyncWorker.php, line 117

Class

EntityShareAsyncWorker
Asynchronous import queue worker.

Namespace

Drupal\entity_share_async\Plugin\QueueWorker

Code

public function processItem($item) {
  $async_states = $this->stateStorage
    ->get(QueueHelperInterface::STATE_ID, []);
  $remote = $this->entityTypeManager
    ->getStorage('remote')
    ->load($item['remote_id']);
  $channel_infos = $this->remoteManager
    ->getChannelsInfos($remote);
  $this->jsonapiHelper
    ->setRemote($remote);
  $http_client = $this->remoteManager
    ->prepareJsonApiClient($remote);
  $url = $channel_infos[$item['channel_id']]['url'];
  $parsed_url = UrlHelper::parse($url);
  $query = $parsed_url['query'];
  $query['filter']['uuid-filter'] = [
    'condition' => [
      'path' => 'id',
      'operator' => 'IN',
      'value' => [
        $item['uuid'],
      ],
    ],
  ];
  $query = UrlHelper::buildQuery($query);
  $prepared_url = $parsed_url['path'] . '?' . $query;

  // Get the entity json data.
  $response = $this->request
    ->request($http_client, 'GET', $prepared_url);
  $json = Json::decode((string) $response
    ->getBody());

  // Import the entity.
  $id = $this->jsonapiHelper
    ->importEntityListData(EntityShareUtility::prepareData($json['data']));
  if (empty($id)) {
    $this->loggerChannelFactory
      ->get('entity_share_async')
      ->warning("Cannot synchronise item @uuid from channel @channel_id of remote @remote_id", [
      '@uuid' => $item['uuid'],
      '@channel_id' => $item['channel_id'],
      '@remote_id' => $item['remote_id'],
    ]);
  }
  if (isset($async_states[$item['remote_id']][$item['channel_id']][$item['uuid']])) {
    unset($async_states[$item['remote_id']][$item['channel_id']][$item['uuid']]);
  }

  // Update states.
  $this->stateStorage
    ->set(QueueHelperInterface::STATE_ID, $async_states);
}