You are here

public function InternalReplicator::replicate in Workspace 8

Perform the replication from the source to target workspace.

Parameters

\Drupal\workspace\WorkspacePointerInterface $source: The workspace to replicate from.

\Drupal\workspace\WorkspacePointerInterface $target: The workspace to replicate to.

mixed $task: Optional information that defines the replication task to perform.

Return value

\Drupal\replication\Entity\ReplicationLog The replication log entry.

Overrides ReplicatorInterface::replicate

File

src/InternalReplicator.php, line 97

Class

InternalReplicator
A replicator within the current Drupal runtime.

Namespace

Drupal\workspace

Code

public function replicate(WorkspacePointerInterface $source, WorkspacePointerInterface $target, $task = NULL) {
  if ($task !== NULL && !$task instanceof ReplicationTaskInterface) {
    throw new UnexpectedTypeException($task, 'ReplicationTaskInterface');
  }
  $missing_found = 0;
  $docs_read = 0;
  $docs_written = 0;
  $doc_write_failures = 0;

  // Get the source and target workspaces.
  $source_workspace = $source
    ->getWorkspace();
  $target_workspace = $target
    ->getWorkspace();

  // Set active workspace to source.
  // @todo Avoid modifying the user's active workspace.
  $current_active = $this->workspaceManager
    ->getActiveWorkspace();
  try {
    $this->workspaceManager
      ->setActiveWorkspace($source_workspace);
  } catch (\Throwable $e) {
    $this->logger
      ->error('%type: @message in %function (line %line of %file).', Error::decodeException($e));
    drupal_set_message($e
      ->getMessage(), 'error');
  }

  // Fetch the site time.
  $start_time = new \DateTime();

  // If no task sent, create an empty task for its defaults.
  if ($task === NULL) {
    $task = new ReplicationTask();
  }
  $replication_log_id = $source
    ->generateReplicationId($target, $task);

  /** @var \Drupal\replication\Entity\ReplicationLogInterface $replication_log */
  $replication_logs = \Drupal::entityTypeManager()
    ->getStorage('replication_log')
    ->loadByProperties([
    'uuid' => $replication_log_id,
  ]);
  $replication_log = reset($replication_logs);
  $since = 0;
  if (!empty($replication_log) && $replication_log
    ->get('ok')->value == TRUE && ($replication_log_history = $replication_log
    ->getHistory())) {
    $dw = $replication_log_history[0]['docs_written'];
    $mf = $replication_log_history[0]['missing_found'];
    if ($dw !== NULL && $mf !== NULL && $dw == $mf) {
      $since = $replication_log
        ->getSourceLastSeq() ?: $since;
    }
  }

  // Get changes on the source workspace.
  $parameters = $task
    ->getParameters();
  if (!isset($parameters['doc_ids']) && ($doc_ids = $task
    ->getDocIds())) {
    $parameters['doc_ids'] = $doc_ids;
  }
  $source_changes = $this->changesFactory
    ->get($source_workspace)
    ->filter($task
    ->getFilter())
    ->parameters($parameters)
    ->setSince($since)
    ->getNormal();
  $data = [];
  foreach ($source_changes as $source_change) {
    $data[$source_change['id']] = [];
    foreach ($source_change['changes'] as $change) {
      $data[$source_change['id']][] = $change['rev'];
    }
  }

  // Get revisions the target workspace is missing.
  $revs_diff = $this->revisionDiffFactory
    ->get($target_workspace)
    ->setRevisionIds($data)
    ->getMissing();
  while (!empty($revs_diff)) {
    $entities = [];
    $process_revs = array_splice($revs_diff, 0, 100);
    foreach ($process_revs as $uuid => $revs) {
      foreach ($revs['missing'] as $rev) {
        $missing_found++;
        $item = $this->revIndex
          ->useWorkspace($source_workspace
          ->id())
          ->get("{$uuid}:{$rev}");
        $entity_type_id = $item['entity_type_id'];
        $revision_id = $item['revision_id'];
        $storage = $this->entityTypeManager
          ->getStorage($entity_type_id);
        $entity = $storage
          ->loadRevision($revision_id);
        if ($entity instanceof ContentEntityInterface) {
          $docs_read++;
          $entities[] = $this->serializer
            ->normalize($entity, 'json', [
            'new_revision_id' => TRUE,
          ]);
        }
      }
    }
    $data = [
      'new_edits' => FALSE,
      'docs' => $entities,
    ];

    // Save all entities in bulk.
    $bulk_docs = $this->serializer
      ->denormalize($data, 'Drupal\\replication\\BulkDocs\\BulkDocs', 'json', [
      'workspace' => $target_workspace,
    ]);
    $bulk_docs
      ->save();
    foreach ($bulk_docs
      ->getResult() as $result) {
      if (isset($result['error'])) {
        $doc_write_failures++;
      }
      elseif (!empty($result['ok'])) {
        $docs_written++;
      }
    }
  }
  $end_time = new \DateTime();
  $history = [
    'docs_read' => $docs_read,
    'docs_written' => $docs_written,
    'doc_write_failures' => $doc_write_failures,
    'missing_checked' => count($source_changes),
    'missing_found' => $missing_found,
    'start_time' => $start_time
      ->format('D, d M Y H:i:s e'),
    'end_time' => $end_time
      ->format('D, d M Y H:i:s e'),
    'session_id' => \md5($start_time
      ->getTimestamp()),
    'start_last_seq' => $source_workspace
      ->getUpdateSeq(),
  ];
  $replication_log_id = $source
    ->generateReplicationId($target, $task);

  /** @var \Drupal\replication\Entity\ReplicationLogInterface $replication_log */
  $replication_log = ReplicationLog::loadOrCreate($replication_log_id);
  $replication_log
    ->set('ok', TRUE);
  $replication_log
    ->setSessionId(\md5((int) (microtime(TRUE) * 1000000)));
  $replication_log
    ->setSourceLastSeq($source_workspace
    ->getUpdateSeq());
  $replication_log
    ->setHistory($history);
  $replication_log
    ->save();

  // Switch back to the workspace that was originally active.
  $this->workspaceManager
    ->setActiveWorkspace($current_active);
  return $replication_log;
}