View source
<?php
namespace Drupal\feeds;
use Drupal\Core\DependencyInjection\ContainerInjectionInterface;
use Drupal\Core\DependencyInjection\DependencySerializationTrait;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Messenger\MessengerInterface;
use Drupal\Core\Session\AccountProxy;
use Drupal\Core\Session\AccountSwitcherInterface;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Drupal\feeds\Event\CleanEvent;
use Drupal\feeds\Event\EventDispatcherTrait;
use Drupal\feeds\Event\FeedsEvents;
use Drupal\feeds\Event\FetchEvent;
use Drupal\feeds\Event\InitEvent;
use Drupal\feeds\Event\ParseEvent;
use Drupal\feeds\Event\ProcessEvent;
use Drupal\feeds\Exception\EmptyFeedException;
use Drupal\feeds\Exception\LockException;
use Drupal\feeds\Feeds\Item\ItemInterface;
use Drupal\feeds\Result\FetcherResultInterface;
use Exception;
use RuntimeException;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class FeedsExecutable implements FeedsExecutableInterface, ContainerInjectionInterface {
use DependencySerializationTrait;
use EventDispatcherTrait;
use StringTranslationTrait;
protected $entityTypeManager;
protected $accountSwitcher;
protected $messenger;
public function __construct(EntityTypeManagerInterface $entity_type_manager, EventDispatcherInterface $event_dispatcher, AccountSwitcherInterface $account_switcher, MessengerInterface $messenger) {
$this
->setEventDispatcher($event_dispatcher);
$this->accountSwitcher = $account_switcher;
$this->entityTypeManager = $entity_type_manager;
$this->messenger = $messenger;
}
public static function create(ContainerInterface $container) {
return new static($container
->get('entity_type.manager'), $container
->get('event_dispatcher'), $container
->get('account_switcher'), $container
->get('messenger'));
}
public function processItem(FeedInterface $feed, $stage, array $params = []) {
$feed
->getType();
$switcher = $this
->switchAccount($feed);
try {
switch ($stage) {
case static::BEGIN:
$this
->import($feed);
break;
case static::FETCH:
$this
->doFetch($feed);
break;
case static::PARSE:
$this
->doParse($feed, $params['fetcher_result']);
break;
case static::PROCESS:
$this
->doProcess($feed, $params['item']);
break;
case static::CLEAN:
$this
->doClean($feed);
break;
case static::FINISH:
$this
->finish($feed, $params['fetcher_result']);
break;
}
} catch (Exception $exception) {
return $this
->handleException($feed, $stage, $params, $exception);
} finally {
$switcher
->switchBack();
}
}
protected function createBatch(FeedInterface $feed, $stage) {
return new FeedsDirectBatch($this, $feed, $stage);
}
protected function switchAccount(FeedInterface $feed) {
$account = new AccountProxy($this
->getEventDispatcher());
$account
->setInitialAccountId($feed
->getOwnerId());
return $this->accountSwitcher
->switchTo($account);
}
protected function handleException(FeedInterface $feed, $stage, array $params, Exception $exception) {
$feed
->finishImport();
if ($exception instanceof EmptyFeedException) {
return;
}
if ($exception instanceof RuntimeException) {
$this->messenger
->addError($exception
->getMessage());
return;
}
throw $exception;
}
protected function import(FeedInterface $feed) {
try {
$feed
->lock();
} catch (LockException $e) {
$this->messenger
->addWarning($this
->t('The feed became locked before the import could begin.'));
return;
}
$feed
->clearStates();
$this
->createBatch($feed, static::FETCH)
->addOperation(static::FETCH)
->run();
}
protected function doFetch(FeedInterface $feed) {
$this
->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'fetch'));
$fetch_event = $this
->dispatchEvent(FeedsEvents::FETCH, new FetchEvent($feed));
$feed
->setState(StateInterface::PARSE, NULL);
$feed
->saveStates();
$this
->createBatch($feed, static::PARSE)
->addOperation(static::PARSE, [
'fetcher_result' => $fetch_event
->getFetcherResult(),
])
->run();
}
protected function doParse(FeedInterface $feed, FetcherResultInterface $fetcher_result) {
$this
->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'parse'));
$parse_event = $this
->dispatchEvent(FeedsEvents::PARSE, new ParseEvent($feed, $fetcher_result));
$feed
->saveStates();
$batch = $this
->createBatch($feed, static::PROCESS);
foreach ($parse_event
->getParserResult() as $item) {
$batch
->addOperation(static::PROCESS, [
'item' => $item,
]);
}
$batch
->addOperation(static::FINISH, [
'fetcher_result' => $fetcher_result,
]);
$batch
->run();
}
protected function doProcess(FeedInterface $feed, ItemInterface $item) {
$this
->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'process'));
$this
->dispatchEvent(FeedsEvents::PROCESS, new ProcessEvent($feed, $item));
$feed
->saveStates();
}
protected function doClean(FeedInterface $feed) {
$state = $feed
->getState(StateInterface::CLEAN);
$entity = $state
->nextEntity();
if ($entity) {
$this
->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'clean'));
$this
->dispatchEvent(FeedsEvents::CLEAN, new CleanEvent($feed, $entity));
}
if (!$state
->count()) {
$state
->setCompleted();
}
$feed
->saveStates();
}
protected function finish(FeedInterface $feed, FetcherResultInterface $fetcher_result) {
$feed
->save();
if ($feed
->progressParsing() !== StateInterface::BATCH_COMPLETE) {
$this
->createBatch($feed, static::PARSE)
->addOperation(static::PARSE, [
'fetcher_result' => $fetcher_result,
])
->run();
return FALSE;
}
elseif ($feed
->progressFetching() !== StateInterface::BATCH_COMPLETE) {
$this
->createBatch($feed, static::FETCH)
->addOperation(static::FETCH)
->run();
return FALSE;
}
elseif ($feed
->progressCleaning() !== StateInterface::BATCH_COMPLETE) {
$clean_state = $feed
->getState(StateInterface::CLEAN);
$batch = $this
->createBatch($feed, static::CLEAN);
for ($i = 0; $i < $clean_state
->count(); $i++) {
$batch
->addOperation(static::CLEAN);
}
$batch
->addOperation(static::FINISH, [
'fetcher_result' => $fetcher_result,
]);
$batch
->run();
return FALSE;
}
else {
$feed
->finishImport();
return TRUE;
}
}
}