View source
<?php
namespace Drupal\radioactivity;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Entity\EntityPublishedInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\State\StateInterface;
use Drupal\field\FieldStorageConfigInterface;
use Drupal\radioactivity\Event\EnergyBelowCutoffEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class RadioactivityProcessor implements RadioactivityProcessorInterface {
protected $entityTypeManager;
protected $state;
protected $log;
protected $storage;
protected $requestTime;
protected $queue;
protected $eventDispatcher;
public function __construct(EntityTypeManagerInterface $entity_type_manager, StateInterface $state, LoggerChannelFactoryInterface $logger_factory, StorageFactory $storage, TimeInterface $time, QueueFactory $queue, EventDispatcherInterface $event_dispatcher) {
$this->entityTypeManager = $entity_type_manager;
$this->state = $state;
$this->log = $logger_factory
->get(self::LOGGER_CHANNEL);
$this->storage = $storage
->getConfiguredStorage();
$this->requestTime = $time
->getRequestTime();
$this->queue = $queue;
$this->eventDispatcher = $event_dispatcher;
}
public function processDecay() {
$resultCount = 0;
$processed = FALSE;
$fieldConfigs = $this->entityTypeManager
->getStorage('field_storage_config')
->loadByProperties([
'type' => 'radioactivity',
]);
if (empty($fieldConfigs)) {
return 0;
}
foreach ($fieldConfigs as $fieldConfig) {
$profile = $fieldConfig
->getSetting('profile');
if ($fieldConfig
->hasData() && ($profile === 'linear' || $profile === 'decay') && $this
->hasReachedGranularityThreshold($fieldConfig)) {
$resultCount += $this
->processFieldDecay($fieldConfig);
$processed = TRUE;
}
}
if ($processed) {
$this->state
->set(self::LAST_PROCESSED_STATE_KEY, $this->requestTime);
}
$this->log
->notice('Processed @count radioactivity decays.', [
'@count' => $resultCount,
]);
return $resultCount;
}
private function hasReachedGranularityThreshold(FieldStorageConfigInterface $fieldConfig) {
$granularity = $fieldConfig
->getSetting('granularity');
if ($granularity == 0) {
return TRUE;
}
$lastCronTimestamp = $this->state
->get(self::LAST_PROCESSED_STATE_KEY, 0);
$threshold = $lastCronTimestamp - $lastCronTimestamp % $granularity + $granularity;
return $this->requestTime >= $threshold;
}
private function processFieldDecay(FieldStorageConfigInterface $fieldConfig) {
$fieldName = $fieldConfig
->get('field_name');
$entityType = $fieldConfig
->getTargetEntityTypeId();
$query = $this->entityTypeManager
->getStorage($entityType)
->getQuery()
->condition($fieldName . '.timestamp', $this->requestTime, ' <= ')
->condition($fieldName . '.energy', NULL, 'IS NOT NULL')
->condition($fieldName . '.energy', 0, '>')
->AccessCheck(FALSE);
$entityIds = $query
->execute();
$chunks = array_chunk($entityIds, self::QUEUE_CHUNK_SIZE, TRUE);
foreach ($chunks as $chunk) {
$queue = $this->queue
->get(self::QUEUE_WORKER_DECAY);
$queue
->createItem([
'field_config' => $fieldConfig,
'entity_ids' => $chunk,
]);
}
return count($entityIds);
}
public function queueProcessDecay(FieldStorageConfigInterface $fieldConfig, array $entityIds) {
$entityType = $fieldConfig
->getTargetEntityTypeId();
$fieldName = $fieldConfig
->get('field_name');
$profile = $fieldConfig
->getSetting('profile');
$halfLife = $fieldConfig
->getSetting('halflife');
$cutoff = $fieldConfig
->getSetting('cutoff');
$entities = $this->entityTypeManager
->getStorage($entityType)
->loadMultiple($entityIds);
foreach ($entities as $entity) {
if ($entity instanceof EntityPublishedInterface && !$entity
->isPublished()) {
continue;
}
$values = $entity
->get($fieldName)
->getValue();
$timestamp = $values[0]['timestamp'];
$energy = $values[0]['energy'];
$elapsed = $timestamp ? $this->requestTime - $timestamp : 0;
switch ($profile) {
case 'linear':
$energy = $energy > $elapsed ? $energy - $elapsed : 0;
break;
case 'decay':
$energy = $energy * pow(2, -$elapsed / $halfLife);
break;
}
if ($energy > $cutoff) {
$entity
->get($fieldName)
->setValue([
'energy' => $energy,
'timestamp' => $this->requestTime,
]);
}
else {
$entity
->get($fieldName)
->setValue([
'energy' => 0,
'timestamp' => $this->requestTime,
]);
$event = new EnergyBelowCutoffEvent($entity);
$this->eventDispatcher
->dispatch('radioactivity_field_cutoff', $event);
}
if ($entity
->getEntityType()
->isRevisionable()) {
$entity
->setNewRevision(FALSE);
}
$entity->radioactivityUpdate = TRUE;
$entity
->save();
}
}
public function processIncidents() {
$resultCount = 0;
$incidentsByType = $this->storage
->getIncidentsByType();
$this->storage
->clearIncidents();
foreach ($incidentsByType as $entityType => $incidents) {
$chunks = array_chunk($incidents, self::QUEUE_CHUNK_SIZE, TRUE);
foreach ($chunks as $chunk) {
$queue = $this->queue
->get(self::QUEUE_WORKER_INCIDENTS);
$queue
->createItem([
'entity_type' => $entityType,
'incidents' => $chunk,
]);
}
$resultCount += count($incidents);
}
$this->log
->notice('Processed @count radioactivity incidents.', [
'@count' => $resultCount,
]);
return $resultCount;
}
public function queueProcessIncidents($entityType, array $entityIncidents) {
$entities = $this->entityTypeManager
->getStorage($entityType)
->loadMultiple(array_keys($entityIncidents));
foreach ($entities as $entity) {
foreach ($entityIncidents[$entity
->id()] as $incident) {
$entity
->get($incident
->getFieldName())->energy += $incident
->getEnergy();
}
if ($entity
->getEntityType()
->isRevisionable()) {
$entity
->setNewRevision(FALSE);
}
$entity->radioactivityUpdate = TRUE;
$entity
->save();
}
}
}