View source
<?php
namespace Drupal\purge_drush\Commands;
use Consolidation\AnnotatedCommand\AnnotationData;
use Consolidation\OutputFormatters\StructuredData\RowsOfFields;
use Consolidation\SiteAlias\SiteAliasManagerAwareInterface;
use Consolidation\SiteAlias\SiteAliasManagerAwareTrait;
use Drupal\Component\Plugin\Exception\PluginNotFoundException;
use Drupal\purge\Plugin\Purge\Invalidation\Exception\InvalidExpressionException;
use Drupal\purge\Plugin\Purge\Invalidation\Exception\MissingExpressionException;
use Drupal\purge\Plugin\Purge\Invalidation\Exception\TypeUnsupportedException;
use Drupal\purge\Plugin\Purge\Invalidation\InvalidationsServiceInterface;
use Drupal\purge\Plugin\Purge\Processor\ProcessorsServiceInterface;
use Drupal\purge\Plugin\Purge\Purger\Exception\CapacityException;
use Drupal\purge\Plugin\Purge\Purger\Exception\DiagnosticsException;
use Drupal\purge\Plugin\Purge\Purger\Exception\LockException;
use Drupal\purge\Plugin\Purge\Purger\PurgersServiceInterface;
use Drupal\purge\Plugin\Purge\Queue\QueueServiceInterface;
use Drupal\purge\Plugin\Purge\Queue\StatsTrackerInterface;
use Drupal\purge\Plugin\Purge\Queuer\QueuersServiceInterface;
use Drush\Commands\DrushCommands;
use Drush\Drush;
use Drush\Exceptions\UserAbortException;
use Symfony\Component\Console\Helper\Table;
use Symfony\Component\Console\Helper\TableCell;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\BufferedOutput;
class QueueCommands extends DrushCommands implements SiteAliasManagerAwareInterface {
use SiteAliasManagerAwareTrait;
protected $purgeProcessors;
protected $purgePurgers;
protected $purgeInvalidationFactory;
protected $purgeQueue;
protected $purgeQueueStats;
protected $purgeQueuers;
public function __construct(ProcessorsServiceInterface $purge_processors, PurgersServiceInterface $purge_purgers, InvalidationsServiceInterface $purge_invalidation_factory, QueueServiceInterface $purge_queue, StatsTrackerInterface $purge_queue_stats, QueuersServiceInterface $purge_queuers) {
parent::__construct();
$this->purgeProcessors = $purge_processors;
$this->purgePurgers = $purge_purgers;
$this->purgeInvalidationFactory = $purge_invalidation_factory;
$this->purgeQueue = $purge_queue;
$this->purgeQueueStats = $purge_queue_stats;
$this->purgeQueuers = $purge_queuers;
}
public function queueAddParseExpressions(InputInterface $input, AnnotationData $annotationData) {
$raw = trim(implode(" ", $input
->getArguments()['expressions']));
$expressions = [];
if ($raw) {
$expressions = explode(' ', $raw);
$expressions = array_map('trim', explode(',', implode(' ', $expressions)));
array_walk($expressions, function (&$value, $key) {
$value = explode(' ', $value);
if (!isset($value[1])) {
$value = [
'type' => $value[0],
'expression' => NULL,
];
}
else {
$value = [
'type' => $value[0],
'expression' => $value[1],
];
}
});
}
$input
->setArgument('expressions', $expressions);
}
public function queueAdd(array $expressions, array $options = [
'format' => 'string',
]) {
if (!($queuer = $this->purgeQueuers
->get('drush_purge_queue_add'))) {
throw new \Exception(dt("Please add the required queuer:\ndrush p:queuer-add drush_purge_queue_add"));
}
if (empty($expressions)) {
throw new \Exception(dt("Please provide one or more expressions."));
}
$invalidations = [];
foreach ($expressions as $expression) {
$type = $expression['type'];
$expression = $expression['expression'];
if (is_null($type) || empty($type)) {
continue;
}
try {
$invalidations[] = $this->purgeInvalidationFactory
->get($type, $expression);
} catch (PluginNotFoundException $e) {
throw new \Exception(dt("Type '@type' does not exist, see 'drush p:types' for available types.", [
'@type' => $type,
]));
} catch (InvalidExpressionException $e) {
throw new \Exception($e
->getMessage());
} catch (TypeUnsupportedException $e) {
throw new \Exception(dt("There is no purger supporting '@type', please install one!", [
'@type' => $type,
]));
} catch (MissingExpressionException $e) {
throw new \Exception($e
->getMessage());
}
if ($options['format'] === 'string' && $type === 'everything') {
$this
->io()
->caution(dt("Invalidating everything will mass-clear potentially" . " thousands of pages, which could temporarily make your site really" . " slow as external caches will have to warm up again.\n"));
if (!$this
->io()
->confirm(dt("Are you really sure?"))) {
throw new UserAbortException();
}
}
}
$this->purgeQueue
->add($queuer, $invalidations);
if ($options['format'] == 'string') {
$this
->io()
->success(dt('Added @count item(s) to the queue.', [
'@count' => count($invalidations),
]));
}
}
public function queueBrowse(array $options = [
'format' => 'table',
'limit' => 30,
'page' => 1,
'no-translations' => FALSE,
]) {
$options['limit'] = (int) $options['limit'];
$options['page'] = (int) $options['page'];
$this->purgeQueue
->selectPageLimit($options['limit']);
if ($options['format'] !== 'table' || $options['no-interaction']) {
$output = [];
foreach ($this->purgeQueue
->selectPage($options['page']) as $immutable) {
if ($options['format'] === 'list') {
$output[] = $immutable
->getExpression();
}
elseif ($options['no-translations']) {
$output[] = [
'type' => $immutable
->getType(),
'state' => $immutable
->getStateString(),
'expression' => $immutable
->getExpression(),
];
}
else {
$output[] = [
'type' => (string) $immutable
->getPluginDefinition()['label'],
'state' => (string) $immutable
->getStateStringTranslated(),
'expression' => $immutable
->getExpression(),
];
}
}
return new RowsOfFields($output);
}
for ($page = $options['page']; $page <= ($max = $this->purgeQueue
->selectPageMax()); $page++) {
$pgrprev = ($prev = $page !== 1) ? '[←]' : ' ';
$pgrnext = ($next = $page !== $max) ? '[→]' : ' ';
$pgrpage = dt("page") . ' ' . sprintf("%d/%d", $page, $max);
$pager = sprintf("%s %s %s", $pgrprev, $pgrpage, $pgrnext);
$options['page'] = $page;
$options['format'] = 'json';
$options['no-interaction'] = TRUE;
$rows = (array) $this
->queueBrowse($options);
$rows[] = new TableSeparator();
$rows[] = [
new TableCell(dt("[q]uit")),
new TableCell($pager, [
'colspan' => 2,
]),
];
$output = new BufferedOutput();
$table = new Table($output);
$table
->setHeaders([
dt("Type"),
dt("State"),
dt("Expression"),
]);
$table
->setColumnWidths([
15,
12,
40,
]);
$table
->setRows($rows);
$table
->render();
$table = $output
->fetch();
$table_lines = substr_count($table, "\n");
$this->output
->write($table);
system('stty cbreak');
while (TRUE) {
$char = ord(fgetc(STDIN));
$this->output
->write("\33[2K");
$this->output
->write("\r");
if (in_array($char, [
27,
91,
])) {
continue;
}
if ($prev && $char === 68) {
$page = $page - 2;
$this->output
->write(str_repeat("\33[1A\33[2K", $table_lines));
break;
}
elseif ($next && in_array($char, [
32,
67,
])) {
$this->output
->write(str_repeat("\33[1A\33[2K", $table_lines));
break;
}
elseif ($char === 113 || !$next && $char === 67) {
return NULL;
}
}
}
return NULL;
}
public function queueEmpty(array $options = [
'format' => 'string',
]) {
$total = (int) $this->purgeQueueStats
->numberOfItems()
->get();
if ($options['format'] === 'string' && $total) {
$question = dt("Are you certain you want to delete @total items?", [
'@total' => $total,
]);
if (!$this
->io()
->confirm($question)) {
throw new UserAbortException();
}
}
$this->purgeQueue
->emptyQueue();
if ($options['format'] == 'string') {
if ($total !== 0) {
return $this
->io()
->success(dt('Cleared @total items from the queue.', [
'@total' => $total,
]));
}
else {
return $this
->io()
->success(dt('The queue was empty, nothing to clear!'));
}
}
return (string) $total;
}
public function queueStatistics(array $options = [
'format' => 'table',
'reset-totals' => FALSE,
]) {
if ($options['reset-totals'] && $options['format'] === 'table') {
$this
->output()
->writeln(dt("You are about to reset all total counters...\n"));
if (!$this
->io()
->confirm(dt("Are you really sure?"))) {
throw new UserAbortException();
}
$this->purgeQueueStats
->resetTotals();
return;
}
elseif ($options['reset-totals']) {
$this->purgeQueueStats
->resetTotals();
return;
}
if ($options['format'] === 'table') {
$table = [];
$align_right = function ($input, $size = 20) {
return str_repeat(' ', $size - strlen($input)) . $input;
};
foreach ($this->purgeQueueStats as $statistic) {
$table[] = [
'left' => $align_right(strtoupper($statistic
->getId())),
'right' => $statistic
->getTitle(),
];
$table[] = [
'left' => $align_right($statistic
->getInteger()),
'right' => '',
];
$table[] = [
'left' => '',
'right' => wordwrap($statistic
->getDescription(), 80),
];
$table[] = [
'left' => '',
'right' => '',
];
}
$this
->io()
->table([], $table);
return;
}
else {
$statistics = [];
foreach ($this->purgeQueueStats as $statistic) {
$statistics[$statistic
->getId()] = $statistic
->getInteger();
}
return $statistics;
}
}
public function queueVolume(array $options = [
'format' => 'string',
]) {
$volume = $this->purgeQueue
->numberOfItems();
if ($options['format'] == 'string') {
return dt('There are @total items in the queue.', [
'@total' => $volume,
]);
}
return (string) $volume;
}
public function queueWork(array $options = [
'format' => 'string',
'finish' => FALSE,
]) {
if ($options['finish'] === FALSE && $options['no-interaction']) {
return $this
->queueWorkChunk();
}
$opt = [
'format' => 'json',
'finish' => FALSE,
'no-interaction' => TRUE,
];
$self = $this
->siteAliasManager()
->getSelf();
$runs = [];
do {
$subproc = Drush::drush($self, 'p:queue-work', [], $opt);
$subproc
->run();
if (!is_array($result = json_decode($subproc
->getOutput(), TRUE))) {
throw new \Exception("Inter-process communication failure!");
}
$runs[] = $result;
if ($result['error'] == 'empty') {
if (count($runs) === 1) {
throw new \Exception(dt("The queue is empty or has only locked items!"));
}
break;
}
if ($options['format'] == 'string') {
$this
->io()
->table([], [
[
dt("Succeeded"),
$result['succeeded'],
],
[
dt("Failed"),
$result['failed'],
],
[
dt("Currently invalidating"),
$result['processing'],
],
[
dt("Not supported"),
$result['not_supported'],
],
]);
}
if ($result['error']) {
throw new \Exception($result['error']);
}
} while ($options['finish'] && is_null($result['error']));
return $options['format'] == 'string' ? NULL : new RowsOfFields($runs);
}
protected function queueWorkChunk($returnstruct = NULL) {
if (!is_null($returnstruct)) {
return [
'error' => is_string($returnstruct) ? $returnstruct : NULL,
'total' => 0,
'processing' => 0,
'succeeded' => 0,
'failed' => 0,
'not_supported' => 0,
];
}
if (!($processor = $this->purgeProcessors
->get('drush_purge_queue_work'))) {
return $this
->queueWorkChunk(dt("Please add the required processor:\ndrush p:processor-add drush_purge_queue_work"));
}
if (!($claims = $this->purgeQueue
->claim())) {
return $this
->queueWorkChunk('empty');
}
try {
$this->purgePurgers
->invalidate($processor, $claims);
} catch (DiagnosticsException $e) {
return $this
->queueWorkChunk($e
->getMessage());
} catch (CapacityException $e) {
return $this
->queueWorkChunk($e
->getMessage());
} catch (LockException $e) {
return $this
->queueWorkChunk($e
->getMessage());
} finally {
$this->purgeQueue
->handleResults($claims);
}
$result = $this
->queueWorkChunk(TRUE);
$result['total'] = count($claims);
foreach ($claims as $claim) {
$result[strtolower($claim
->getStateString())]++;
}
if ($result['succeeded'] === 0 && $result['processing'] === 0) {
$result['error'] = dt("Not a single invalidation was successful!");
}
if ($result['failed'] / $result['total'] > 0.4) {
$result['error'] = dt("Over 40% failed, please check the logs!");
}
return $result;
}
}