public function InternalReplicator::replicate in Workspace 8
Perform the replication from the source to target workspace.
Parameters
\Drupal\workspace\WorkspacePointerInterface $source: The workspace to replicate from.
\Drupal\workspace\WorkspacePointerInterface $target: The workspace to replicate to.
mixed $task: Optional information that defines the replication task to perform.
Return value
\Drupal\replication\Entity\ReplicationLog The replication log entry.
Overrides ReplicatorInterface::replicate
File
- src/
InternalReplicator.php, line 97
Class
- InternalReplicator
- A replicator within the current Drupal runtime.
Namespace
Drupal\workspaceCode
public function replicate(WorkspacePointerInterface $source, WorkspacePointerInterface $target, $task = NULL) {
if ($task !== NULL && !$task instanceof ReplicationTaskInterface) {
throw new UnexpectedTypeException($task, 'ReplicationTaskInterface');
}
$missing_found = 0;
$docs_read = 0;
$docs_written = 0;
$doc_write_failures = 0;
// Get the source and target workspaces.
$source_workspace = $source
->getWorkspace();
$target_workspace = $target
->getWorkspace();
// Set active workspace to source.
// @todo Avoid modifying the user's active workspace.
$current_active = $this->workspaceManager
->getActiveWorkspace();
try {
$this->workspaceManager
->setActiveWorkspace($source_workspace);
} catch (\Throwable $e) {
$this->logger
->error('%type: @message in %function (line %line of %file).', Error::decodeException($e));
drupal_set_message($e
->getMessage(), 'error');
}
// Fetch the site time.
$start_time = new \DateTime();
// If no task sent, create an empty task for its defaults.
if ($task === NULL) {
$task = new ReplicationTask();
}
$replication_log_id = $source
->generateReplicationId($target, $task);
/** @var \Drupal\replication\Entity\ReplicationLogInterface $replication_log */
$replication_logs = \Drupal::entityTypeManager()
->getStorage('replication_log')
->loadByProperties([
'uuid' => $replication_log_id,
]);
$replication_log = reset($replication_logs);
$since = 0;
if (!empty($replication_log) && $replication_log
->get('ok')->value == TRUE && ($replication_log_history = $replication_log
->getHistory())) {
$dw = $replication_log_history[0]['docs_written'];
$mf = $replication_log_history[0]['missing_found'];
if ($dw !== NULL && $mf !== NULL && $dw == $mf) {
$since = $replication_log
->getSourceLastSeq() ?: $since;
}
}
// Get changes on the source workspace.
$parameters = $task
->getParameters();
if (!isset($parameters['doc_ids']) && ($doc_ids = $task
->getDocIds())) {
$parameters['doc_ids'] = $doc_ids;
}
$source_changes = $this->changesFactory
->get($source_workspace)
->filter($task
->getFilter())
->parameters($parameters)
->setSince($since)
->getNormal();
$data = [];
foreach ($source_changes as $source_change) {
$data[$source_change['id']] = [];
foreach ($source_change['changes'] as $change) {
$data[$source_change['id']][] = $change['rev'];
}
}
// Get revisions the target workspace is missing.
$revs_diff = $this->revisionDiffFactory
->get($target_workspace)
->setRevisionIds($data)
->getMissing();
while (!empty($revs_diff)) {
$entities = [];
$process_revs = array_splice($revs_diff, 0, 100);
foreach ($process_revs as $uuid => $revs) {
foreach ($revs['missing'] as $rev) {
$missing_found++;
$item = $this->revIndex
->useWorkspace($source_workspace
->id())
->get("{$uuid}:{$rev}");
$entity_type_id = $item['entity_type_id'];
$revision_id = $item['revision_id'];
$storage = $this->entityTypeManager
->getStorage($entity_type_id);
$entity = $storage
->loadRevision($revision_id);
if ($entity instanceof ContentEntityInterface) {
$docs_read++;
$entities[] = $this->serializer
->normalize($entity, 'json', [
'new_revision_id' => TRUE,
]);
}
}
}
$data = [
'new_edits' => FALSE,
'docs' => $entities,
];
// Save all entities in bulk.
$bulk_docs = $this->serializer
->denormalize($data, 'Drupal\\replication\\BulkDocs\\BulkDocs', 'json', [
'workspace' => $target_workspace,
]);
$bulk_docs
->save();
foreach ($bulk_docs
->getResult() as $result) {
if (isset($result['error'])) {
$doc_write_failures++;
}
elseif (!empty($result['ok'])) {
$docs_written++;
}
}
}
$end_time = new \DateTime();
$history = [
'docs_read' => $docs_read,
'docs_written' => $docs_written,
'doc_write_failures' => $doc_write_failures,
'missing_checked' => count($source_changes),
'missing_found' => $missing_found,
'start_time' => $start_time
->format('D, d M Y H:i:s e'),
'end_time' => $end_time
->format('D, d M Y H:i:s e'),
'session_id' => \md5($start_time
->getTimestamp()),
'start_last_seq' => $source_workspace
->getUpdateSeq(),
];
$replication_log_id = $source
->generateReplicationId($target, $task);
/** @var \Drupal\replication\Entity\ReplicationLogInterface $replication_log */
$replication_log = ReplicationLog::loadOrCreate($replication_log_id);
$replication_log
->set('ok', TRUE);
$replication_log
->setSessionId(\md5((int) (microtime(TRUE) * 1000000)));
$replication_log
->setSourceLastSeq($source_workspace
->getUpdateSeq());
$replication_log
->setHistory($history);
$replication_log
->save();
// Switch back to the workspace that was originally active.
$this->workspaceManager
->setActiveWorkspace($current_active);
return $replication_log;
}