View source
<?php
namespace Drupal\message_digest\Plugin\Notifier;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\Database\Database;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Render\RendererInterface;
use Drupal\Core\State\StateInterface;
use Drupal\message\MessageInterface;
use Drupal\message_digest\Exception\InvalidDigestGroupingException;
use Drupal\message_notify\Plugin\Notifier\MessageNotifierBase;
use Drupal\user\UserInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
abstract class DigestBase extends MessageNotifierBase implements ContainerFactoryPluginInterface, DigestInterface {
protected $connection;
protected $digestInterval;
protected $time;
protected $state;
public function __construct(array $configuration, $plugin_id, $plugin_definition, LoggerChannelInterface $logger, EntityTypeManagerInterface $entity_type_manager, RendererInterface $renderer, MessageInterface $message = NULL, StateInterface $state, Connection $connection, TimeInterface $time) {
$configuration += [
'entity_type' => '',
'entity_id' => '',
];
parent::__construct($configuration, $plugin_id, $plugin_definition, $logger, $entity_type_manager, $renderer, $message);
$this->connection = $connection;
$this->digestInterval = $plugin_definition['digest_interval'];
$this->time = $time;
$this->state = $state;
}
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MessageInterface $message = NULL) {
return new static($configuration, $plugin_id, $plugin_definition, $container
->get('logger.channel.message_notify'), $container
->get('entity_type.manager'), $container
->get('renderer'), $message, $container
->get('state'), $container
->get('database'), $container
->get('datetime.time'));
}
public function deliver(array $output = []) {
$message = $this->message;
$message_digest = [
'receiver' => $message
->getOwnerId(),
'entity_type' => $this->configuration['entity_type'],
'entity_id' => $this->configuration['entity_id'],
'notifier' => $this
->getPluginId(),
'timestamp' => $message
->getCreatedTime(),
];
if ($this->configuration['entity_type'] xor $this->configuration['entity_id']) {
throw new InvalidDigestGroupingException(sprintf('Tried to create a message digest without both entity_type (%s) and entity_id (%s). These either both need to be empty, or have values.', $this->configuration['entity_type'], $this->configuration['entity_id']));
}
$mid = $message
->id();
if (!$mid && isset($message->original_message)) {
$mid = $message->original_message
->id();
}
assert(!empty($mid), 'The message entity (or $message->original_message) must be saved in order to create a digest entry.');
$message_digest['mid'] = $mid;
$this->connection
->insert('message_digest')
->fields($message_digest)
->execute();
return TRUE;
}
public function getInterval() {
return $this->digestInterval;
}
public function getRecipients() {
$query = $this->connection
->select('message_digest', 'md');
$query
->fields('md', [
'receiver',
]);
$query
->condition('timestamp', $this
->getEndTime(), '<=');
$query
->condition('sent', 0);
$query
->condition('notifier', $this
->getPluginId());
$query
->distinct();
return $query
->execute()
->fetchCol();
}
public function aggregate($uid, $end) {
$message_groups = [];
$query = $this->connection
->select('message_digest', 'md');
$query
->fields('md')
->condition('timestamp', $end, '<=')
->condition('receiver', $uid)
->condition('sent', 0)
->condition('notifier', $this
->getPluginId());
$query
->orderBy('id');
$result = $query
->execute();
foreach ($result as $row) {
$entity_type = $row->entity_type;
$entity_id = $row->entity_id;
$context = [
'data' => $row,
'entity_type' => $entity_type,
'entity_id' => $entity_id,
];
if (!empty($context['data']->mid)) {
$message_groups[$context['entity_type']][$context['entity_id']][] = $context['data']->mid;
}
}
return $message_groups;
}
public function processDigest() {
$interval = $this
->getInterval();
$key = $this->pluginId . '_last_run';
$last_run = $this->state
->get($key, 0);
$buffer = 30;
return $last_run < strtotime('-' . $interval, $this->time
->getRequestTime()) + $buffer;
}
public function markSent(UserInterface $account, $last_mid) {
$this->connection
->update('message_digest')
->fields([
'sent' => 1,
])
->condition('receiver', $account
->id())
->condition('notifier', $this
->getPluginId())
->condition('mid', $last_mid, '<=')
->execute();
}
public function getEndTime() {
return $this->time
->getRequestTime();
}
public function setLastSent() {
$this->state
->set($this
->getPluginId() . '_last_run', $this->time
->getRequestTime());
}
public function __sleep() {
return [
'configuration',
'pluginId',
'pluginDefinition',
'message',
'digestInterval',
];
}
public function __wakeup() {
$this->connection = Database::getConnection();
$container = \Drupal::getContainer();
$this->entityTypeManager = $container
->get('entity_type.manager');
$this->logger = $container
->get('logger.channel.message_notify');
$this->renderer = $container
->get('renderer');
$this->state = $container
->get('state');
$this->time = $container
->get('datetime.time');
}
}