public function WorkspaceReplication::processItem in Workspace 8
Throws
\Drupal\Core\Entity\EntityStorageException
Overrides QueueWorkerInterface::processItem
File
- src/
Plugin/ QueueWorker/ WorkspaceReplication.php, line 164
Class
- WorkspaceReplication
- Class WorkspaceReplication.
Namespace
Drupal\workspace\Plugin\QueueWorkerCode
public function processItem($data) {
if ($this->state
->get('workspace.last_replication_failed', FALSE)) {
throw new SuspendQueueException('Replication is blocked!');
}
/** @var \Drupal\workspace\Entity\Replication $replication */
$replication = $data['replication'];
if ($replication_new = $this->entityTypeManager
->getStorage('replication')
->load($replication
->id())) {
$replication = $replication_new;
}
$replication_status = $replication
->getReplicationStatus();
if ($replication_status == Replication::QUEUED) {
$account = User::load(1);
$this->accountSwitcher
->switchTo($account);
$replication
->setReplicationStatusReplicating()
->save();
$this->logger
->info('Replication "@replication" has started.', [
'@replication' => $replication
->label(),
]);
$this->eventDispatcher
->dispatch(ReplicationEvents::PRE_REPLICATION, new ReplicationEvent($replication));
$response = FALSE;
try {
$response = $this->replicatorManager
->doReplication($replication, $data['task']);
} catch (PeerNotReachableException $e) {
$this->logger
->error('The deployment could not start. Reason: ' . $e
->getMessage());
$replication
->setReplicationFailInfo($e
->getMessage())
->setReplicationStatusQueued()
->save();
throw new SuspendQueueException('Peer not reachable. Reason: ' . $e
->getMessage());
} catch (\Exception $e) {
// When exception is thrown during replication process we want
// replication to be marked as failed and removed from queue.
$this->logger
->error('%type: @message in %function (line %line of %file).', $variables = Error::decodeException($e));
$replication
->setReplicationFailInfo($e
->getMessage())
->setArchiveSource(FALSE)
->save();
}
if ($response instanceof ReplicationLogInterface && $response
->get('ok')->value == TRUE) {
$default_workspace = Workspace::load($this->workspaceDefault);
if ($replication
->getArchiveSource() && !empty($replication
->get('source')->entity
->getWorkspace())) {
$source_workspace = $replication
->get('source')->entity
->getWorkspace();
$source_workspace
->setUnpublished()
->save();
if ($source_workspace
->id() != $this->workspaceDefault) {
$this->workspaceManager
->setActiveWorkspace($default_workspace);
$this
->messenger()
->addMessage($this
->t('Workspace %workspace has been archived and workspace %default has been set as active.', [
'%workspace' => $replication
->get('source')->entity
->label(),
'%default' => $default_workspace
->label(),
]));
}
else {
$this
->messenger()
->addMessage($this
->t('Workspace %workspace has been archived.', [
'%workspace' => $replication
->get('source')->entity
->label(),
]));
}
}
$replication
->setReplicationStatusReplicated();
$replication
->set('replicated', $this->time
->getRequestTime());
$replication
->save();
$this->logger
->info('Replication "@replication" has finished successfully.', [
'@replication' => $replication
->label(),
]);
}
else {
if ($response instanceof ReplicationLogInterface && !empty($response->history->fail_info)) {
$replication
->setReplicationFailInfo($response->history->fail_info);
}
$replication
->setReplicationStatusFailed()
->set('replicated', $this->time
->getRequestTime())
->setArchiveSource(FALSE)
->save();
$this->state
->set('workspace.last_replication_failed', TRUE);
$this->logger
->info('Replication "@replication" has failed.', [
'@replication' => $replication
->label(),
]);
}
$this->eventDispatcher
->dispatch(ReplicationEvents::POST_REPLICATION, new ReplicationEvent($replication));
$this->accountSwitcher
->switchBack();
}
elseif ($replication_status == Replication::FAILED) {
// If the replication has been marked as failed before it started to be
// processed, do nothing, the item will just be removed from the queue.
}
elseif ($replication_status == Replication::REPLICATING) {
$limit = $this->replicationConfig
->get('replication_execution_limit');
$limit = $limit ?: 1;
$request_time = $this->time
->getRequestTime();
if ($request_time - $replication
->getChangedTime() > 60 * 60 * $limit) {
$replication
->setReplicationFailInfo($this
->t('Replication "@replication" took too much time', [
'@replication' => $replication
->label(),
]))
->setReplicationStatusFailed()
->set('replicated', $this->time
->getRequestTime())
->setArchiveSource(FALSE)
->save();
$this->state
->set('workspace.last_replication_failed', TRUE);
$this->logger
->info('Replication "@replication" exceeded the running time of @limit hours, because of that it is considered as FAILED.', [
'@replication' => $replication
->label(),
'@limit' => $limit,
]);
}
else {
// Log this only when the verbose logging is enabled because in some
// rare cases a replication can fail in a way when we can't handle to
// set the correct failed status, but it will stay in the queue as in
// progress until it exceeds the replication_execution_limit limit.
// This will avoid spamming watchdog with lots of replication in
// progress messages when they are not wanted.
if ($this->replicationConfig
->get('verbose_logging')) {
$this->logger
->info('Replication "@replication" is already in progress.', [
'@replication' => $replication
->label(),
]);
}
throw new SuspendQueueException('Replication is already in progress!');
}
}
}