Changes.php in Replication 8
File
src/Changes/Changes.php
View source
<?php
namespace Drupal\replication\Changes;
use Drupal\Core\DependencyInjection\DependencySerializationTrait;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\multiversion\Entity\Index\SequenceIndexInterface;
use Drupal\multiversion\Entity\WorkspaceInterface;
use Drupal\replication\Plugin\ReplicationFilterManagerInterface;
use Symfony\Component\Serializer\SerializerInterface;
class Changes implements ChangesInterface {
use DependencySerializationTrait;
protected $sequenceIndex;
protected $workspaceId;
protected $entityTypeManager;
protected $serializer;
protected $filterManager;
protected $filter;
protected $parameters;
protected $includeDocs = FALSE;
protected $since = 0;
protected $stop = NULL;
protected $limit = NULL;
public function __construct(SequenceIndexInterface $sequence_index, WorkspaceInterface $workspace, EntityTypeManagerInterface $entity_type_manager, SerializerInterface $serializer, ReplicationFilterManagerInterface $filter_manager) {
$this->sequenceIndex = $sequence_index;
$this->workspaceId = $workspace
->id();
$this->entityTypeManager = $entity_type_manager;
$this->serializer = $serializer;
$this->filterManager = $filter_manager;
}
public function filter($filter) {
$this->filter = $filter;
return $this;
}
public function parameters(array $parameters = NULL) {
$this->parameters = $parameters;
return $this;
}
public function includeDocs($include_docs) {
$this->includeDocs = $include_docs;
return $this;
}
public function setSince($seq) {
$this->since = $seq;
return $this;
}
public function getSince() {
return $this->since;
}
public function setStop($seq) {
$this->stop = $seq;
return $this;
}
public function setLimit($limit) {
$this->limit = $limit;
return $this;
}
public function getNormal() {
$sequences = $this->sequenceIndex
->useWorkspace($this->workspaceId)
->getRange($this->since, $this->stop);
$parameters = is_array($this->parameters) ? $this->parameters : [];
$filter = NULL;
if (is_string($this->filter) && $this->filter) {
$filter = $this->filterManager
->createInstance($this->filter, $parameters);
}
elseif (isset($parameters['doc_ids'])) {
$filter = $this->filterManager
->createInstance('_doc_ids', $parameters);
}
elseif (isset($parameters['uuids'])) {
$filter = $this->filterManager
->createInstance('uuid', $parameters);
}
$changes = [];
$count = 0;
foreach ($sequences as $sequence) {
if (!empty($sequence['local']) || !empty($sequence['is_stub'])) {
continue;
}
if ($this->since > 0 && $sequence['seq'] == $this->since) {
continue;
}
$revision = NULL;
if ($this->includeDocs == TRUE || $filter !== NULL) {
$storage = $this->entityTypeManager
->getStorage($sequence['entity_type_id']);
$storage
->useWorkspace($this->workspaceId);
$revision = $storage
->loadRevision($sequence['revision_id']);
$storage
->useWorkspace(NULL);
}
if ($revision && $filter !== NULL && !$filter
->filter($revision)) {
continue;
}
if ($this->limit && $count >= $this->limit) {
break;
}
$uuid = $sequence['entity_uuid'];
if (!isset($changes[$uuid])) {
$count++;
}
$changes[$uuid] = [
'changes' => [
[
'rev' => $sequence['rev'],
],
],
'id' => $uuid,
'seq' => $sequence['seq'],
];
if ($sequence['deleted']) {
$changes[$uuid]['deleted'] = TRUE;
}
if ($this->includeDocs == TRUE) {
$changes[$uuid]['doc'] = $this->serializer
->normalize($revision);
}
}
$return = array_values($changes);
usort($return, function ($a, $b) {
return $a['seq'] - $b['seq'];
});
return $return;
}
public function getLongpoll() {
$no_change = TRUE;
do {
$change = $this->sequenceIndex
->useWorkspace($this->workspaceId)
->getRange($this->since, NULL);
$no_change = empty($change) ? TRUE : FALSE;
} while ($no_change);
return $change;
}
}