View source
<?php
namespace Drupal\acquia_contenthub_subscriber;
use Drupal\acquia_contenthub\AcquiaContentHubStatusMetricsTrait;
use Drupal\Core\Database\Connection;
use Drupal\Core\Entity\EntityInterface;
class SubscriberTracker {
use AcquiaContentHubStatusMetricsTrait;
const QUEUED = 'queued';
const IMPORTED = 'imported';
const AUTO_UPDATE_DISABLED = 'auto_update_disabled';
const IMPORT_TRACKING_TABLE = 'acquia_contenthub_subscriber_import_tracking';
public function __construct(Connection $database) {
$this->database = $database;
}
public function isTracked($uuids) {
if (!is_array($uuids)) {
$uuids = [
$uuids,
];
}
$query = $this->database
->select(self::IMPORT_TRACKING_TABLE, 't');
$query
->fields('t', [
'entity_type',
'entity_id',
'entity_uuid',
]);
$query
->condition('entity_uuid', $uuids, 'IN');
$results = $query
->execute()
->fetchAll();
if (array_diff($uuids, array_column($results, 'entity_uuid'))) {
return FALSE;
}
return TRUE;
}
public function getUntracked(array $uuids) {
$query = $this->database
->select(self::IMPORT_TRACKING_TABLE, 't')
->fields('t');
$query
->condition('t.entity_uuid', $uuids, 'IN');
$query
->condition('t.entity_id', NULL, 'IS NOT NULL');
$results = $query
->execute();
$uuids = array_combine($uuids, $uuids);
foreach ($results as $result) {
if ($result->hash) {
unset($uuids[$result->entity_uuid]);
}
}
return array_values($uuids);
}
public function track(EntityInterface $entity, string $hash, $remote_uuid = NULL) {
$values = [
'entity_uuid' => $remote_uuid ?? $entity
->uuid(),
'entity_type' => $entity
->getEntityTypeId(),
'entity_id' => $entity
->id(),
'last_imported' => date('c'),
];
$this
->insertOrUpdate($values, self::IMPORTED, $hash);
}
public function queue($uuid) {
$values = [
'entity_uuid' => $uuid,
];
$this
->insertOrUpdate($values, self::QUEUED);
}
protected function insertOrUpdate(array $values, $status, $hash = "") {
if (empty($values['entity_uuid'])) {
throw new \Exception("Cannot track a subscription without an entity uuid.");
}
$values['status'] = $status;
if ($hash) {
$values['hash'] = $hash;
}
$query = $this->database
->select(self::IMPORT_TRACKING_TABLE, 't')
->fields('t', [
'first_imported',
]);
$query
->condition('entity_uuid', $values['entity_uuid']);
$results = $query
->execute()
->fetchObject();
if ($results) {
$query = $this->database
->update(self::IMPORT_TRACKING_TABLE)
->fields($values);
$query
->condition('entity_uuid', $values['entity_uuid']);
return $query
->execute();
}
$values['first_imported'] = date('c');
return $this->database
->insert(self::IMPORT_TRACKING_TABLE)
->fields($values)
->execute();
}
public function getEntityByRemoteIdAndHash($uuid, $hash = NULL) {
$query = $this->database
->select(self::IMPORT_TRACKING_TABLE, 't')
->fields('t', [
'entity_type',
'entity_id',
]);
$query
->condition('entity_uuid', $uuid);
if (NULL !== $hash) {
$query
->condition('hash', $hash);
}
$result = $query
->execute()
->fetchObject();
if ($result && $result->entity_type && $result->entity_id) {
return \Drupal::entityTypeManager()
->getStorage($result->entity_type)
->load($result->entity_id);
}
}
public function delete(string $uuid) : void {
$query = $this->database
->delete(self::IMPORT_TRACKING_TABLE);
$query
->condition('entity_uuid', $uuid);
$query
->execute();
}
public function getStatusByTypeId(string $type, string $id) {
$query = $this->database
->select(self::IMPORT_TRACKING_TABLE, 't')
->fields('t', [
'status',
]);
$query
->condition('entity_type', $type);
$query
->condition('entity_id', $id);
$result = $query
->execute()
->fetchObject();
if ($result && $result->status) {
return $result->status;
}
}
public function getStatusByUuid(string $uuid) {
$query = $this->database
->select(self::IMPORT_TRACKING_TABLE, 't')
->fields('t', [
'status',
]);
$query
->condition('entity_uuid', $uuid);
$result = $query
->execute()
->fetchObject();
if ($result && $result->status) {
return $result->status;
}
}
public function setStatusByUuid(string $uuid, string $status) {
$acceptable_statuses = [
$this::AUTO_UPDATE_DISABLED,
$this::IMPORTED,
$this::QUEUED,
];
if (!in_array($status, $acceptable_statuses)) {
throw new \Exception(sprintf("The '%s' status is not valid. Please pass one of the following options: '%s'", $status, implode("', '", $acceptable_statuses)));
}
if (!$this
->isTracked($uuid)) {
return;
}
$query = $this->database
->update(self::IMPORT_TRACKING_TABLE);
$query
->fields([
'status' => $status,
]);
$query
->condition('entity_uuid', $uuid);
$query
->execute();
}
public function setQueueItemByUuids(array $uuids, string $queue_id) {
if (!$this
->isTracked($uuids)) {
return;
}
$query = $this->database
->update(self::IMPORT_TRACKING_TABLE);
$query
->fields([
'queue_id' => $queue_id,
]);
$query
->condition('entity_uuid', $uuids, 'IN');
$query
->execute();
}
public function setStatusByTypeId(string $type, string $id, string $status) {
$query = $this->database
->select(self::IMPORT_TRACKING_TABLE, 't')
->fields('t', [
'entity_uuid',
]);
$query
->condition('entity_type', $type);
$query
->condition('entity_id', $id);
$result = $query
->execute()
->fetchObject();
if (!(bool) $result) {
return;
}
$this
->setStatusByUuid($result->entity_uuid, $status);
}
public function listTrackedEntities(string $status, $entity_type_id = '') : array {
$query = $this->database
->select(self::IMPORT_TRACKING_TABLE, 'ci')
->fields('ci')
->condition('status', $status);
if (!empty($entity_type_id)) {
$query = $query
->condition('entity_type', $entity_type_id);
}
return $query
->execute()
->fetchAll(\PDO::FETCH_ASSOC);
}
}