View source
<?php
namespace Drupal\workspace\Plugin\QueueWorker;
use Drupal\Component\Datetime\Time;
use Drupal\Core\Config\ConfigFactoryInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Messenger\MessengerTrait;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\Session\AccountSwitcherInterface;
use Drupal\Core\State\StateInterface;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Drupal\Core\Utility\Error;
use Drupal\multiversion\Entity\Workspace;
use Drupal\multiversion\Workspace\WorkspaceManagerInterface;
use Drupal\replication\Entity\ReplicationLogInterface;
use Drupal\user\Entity\User;
use Drupal\workspace\Entity\Replication;
use Drupal\workspace\Event\ReplicationEvent;
use Drupal\workspace\Event\ReplicationEvents;
use Drupal\workspace\ReplicatorManager;
use Relaxed\Replicator\Exception\PeerNotReachableException;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class WorkspaceReplication extends QueueWorkerBase implements ContainerFactoryPluginInterface {
use StringTranslationTrait;
use MessengerTrait;
protected $replicatorManager;
protected $time;
protected $accountSwitcher;
private $state;
protected $logger;
protected $entityTypeManager;
protected $workspaceManager;
protected $workspaceDefault;
protected $eventDispatcher;
protected $replicationConfig;
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static($configuration, $plugin_id, $plugin_definition, $container
->get('workspace.replicator_manager'), $container
->get('datetime.time'), $container
->get('account_switcher'), $container
->get('state'), $container
->get('logger.factory'), $container
->get('entity_type.manager'), $container
->get('workspace.manager'), $container
->getParameter('workspace.default'), $container
->get('event_dispatcher'), $container
->get('config.factory'));
}
public function __construct(array $configuration, $plugin_id, $plugin_definition, ReplicatorManager $replicator_manager, Time $time, AccountSwitcherInterface $account_switcher, StateInterface $state, LoggerChannelFactoryInterface $logger, EntityTypeManagerInterface $entity_type_manager, WorkspaceManagerInterface $workspace_manager, $workspace_default, EventDispatcherInterface $event_dispatcher, ConfigFactoryInterface $config_factory) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->replicatorManager = $replicator_manager;
$this->time = $time;
$this->accountSwitcher = $account_switcher;
$this->state = $state;
$this->logger = $logger
->get('workspace');
$this->entityTypeManager = $entity_type_manager;
$this->workspaceManager = $workspace_manager;
$this->workspaceDefault = $workspace_default;
$this->eventDispatcher = $event_dispatcher;
$this->replicationConfig = $config_factory
->get('replication.settings');
}
public function processItem($data) {
if ($this->state
->get('workspace.last_replication_failed', FALSE)) {
throw new SuspendQueueException('Replication is blocked!');
}
$replication = $data['replication'];
if ($replication_new = $this->entityTypeManager
->getStorage('replication')
->load($replication
->id())) {
$replication = $replication_new;
}
$replication_status = $replication
->getReplicationStatus();
if ($replication_status == Replication::QUEUED) {
$account = User::load(1);
$this->accountSwitcher
->switchTo($account);
$replication
->setReplicationStatusReplicating()
->save();
$this->logger
->info('Replication "@replication" has started.', [
'@replication' => $replication
->label(),
]);
$this->eventDispatcher
->dispatch(ReplicationEvents::PRE_REPLICATION, new ReplicationEvent($replication));
$response = FALSE;
try {
$response = $this->replicatorManager
->doReplication($replication, $data['task']);
} catch (PeerNotReachableException $e) {
$this->logger
->error('The deployment could not start. Reason: ' . $e
->getMessage());
$replication
->setReplicationFailInfo($e
->getMessage())
->setReplicationStatusQueued()
->save();
throw new SuspendQueueException('Peer not reachable. Reason: ' . $e
->getMessage());
} catch (\Exception $e) {
$this->logger
->error('%type: @message in %function (line %line of %file).', $variables = Error::decodeException($e));
$replication
->setReplicationFailInfo($e
->getMessage())
->setArchiveSource(FALSE)
->save();
}
if ($response instanceof ReplicationLogInterface && $response
->get('ok')->value == TRUE) {
$default_workspace = Workspace::load($this->workspaceDefault);
if ($replication
->getArchiveSource() && !empty($replication
->get('source')->entity
->getWorkspace())) {
$source_workspace = $replication
->get('source')->entity
->getWorkspace();
$source_workspace
->setUnpublished()
->save();
if ($source_workspace
->id() != $this->workspaceDefault) {
$this->workspaceManager
->setActiveWorkspace($default_workspace);
$this
->messenger()
->addMessage($this
->t('Workspace %workspace has been archived and workspace %default has been set as active.', [
'%workspace' => $replication
->get('source')->entity
->label(),
'%default' => $default_workspace
->label(),
]));
}
else {
$this
->messenger()
->addMessage($this
->t('Workspace %workspace has been archived.', [
'%workspace' => $replication
->get('source')->entity
->label(),
]));
}
}
$replication
->setReplicationStatusReplicated();
$replication
->set('replicated', $this->time
->getRequestTime());
$replication
->save();
$this->logger
->info('Replication "@replication" has finished successfully.', [
'@replication' => $replication
->label(),
]);
}
else {
if ($response instanceof ReplicationLogInterface && !empty($response->history->fail_info)) {
$replication
->setReplicationFailInfo($response->history->fail_info);
}
$replication
->setReplicationStatusFailed()
->set('replicated', $this->time
->getRequestTime())
->setArchiveSource(FALSE)
->save();
$this->state
->set('workspace.last_replication_failed', TRUE);
$this->logger
->info('Replication "@replication" has failed.', [
'@replication' => $replication
->label(),
]);
}
$this->eventDispatcher
->dispatch(ReplicationEvents::POST_REPLICATION, new ReplicationEvent($replication));
$this->accountSwitcher
->switchBack();
}
elseif ($replication_status == Replication::FAILED) {
}
elseif ($replication_status == Replication::REPLICATING) {
$limit = $this->replicationConfig
->get('replication_execution_limit');
$limit = $limit ?: 1;
$request_time = $this->time
->getRequestTime();
if ($request_time - $replication
->getChangedTime() > 60 * 60 * $limit) {
$replication
->setReplicationFailInfo($this
->t('Replication "@replication" took too much time', [
'@replication' => $replication
->label(),
]))
->setReplicationStatusFailed()
->set('replicated', $this->time
->getRequestTime())
->setArchiveSource(FALSE)
->save();
$this->state
->set('workspace.last_replication_failed', TRUE);
$this->logger
->info('Replication "@replication" exceeded the running time of @limit hours, because of that it is considered as FAILED.', [
'@replication' => $replication
->label(),
'@limit' => $limit,
]);
}
else {
if ($this->replicationConfig
->get('verbose_logging')) {
$this->logger
->info('Replication "@replication" is already in progress.', [
'@replication' => $replication
->label(),
]);
}
throw new SuspendQueueException('Replication is already in progress!');
}
}
}
}