View source
<?php
namespace Drupal\acquia_purge\Plugin\Purge\Purger;
use Drupal\acquia_purge\AcquiaCloud\Hash;
use Drupal\acquia_purge\AcquiaCloud\PlatformInfoInterface;
use Drupal\acquia_purge\Plugin\Purge\TagsHeader\TagsHeaderValue;
use Drupal\purge\Plugin\Purge\Invalidation\InvalidationInterface;
use Drupal\purge\Plugin\Purge\Purger\PurgerBase;
use Drupal\purge\Plugin\Purge\Purger\PurgerInterface;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\Pool;
use Symfony\Component\DependencyInjection\ContainerInterface;
class AcquiaCloudPurger extends PurgerBase implements DebuggerAwareInterface, PurgerInterface {
use DebuggerAwareTrait;
const CONCURRENCY = 6;
const CONNECT_TIMEOUT = 1.5;
const TIMEOUT = 3.0;
const TAGS_GROUPED_BY = 15;
protected $platformInfo;
protected $httpClient;
public final function __construct(PlatformInfoInterface $acquia_purge_platforminfo, ClientInterface $http_client, array $configuration, $plugin_id, $plugin_definition) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->platformInfo = $acquia_purge_platforminfo;
$this->httpClient = $http_client;
}
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static($container
->get('acquia_purge.platforminfo'), $container
->get('http_client'), $configuration, $plugin_id, $plugin_definition);
}
protected function getGlobalOptions(array $extra = []) {
$opt = [
'http_errors' => FALSE,
'connect_timeout' => self::CONNECT_TIMEOUT,
'timeout' => self::TIMEOUT,
'verify' => FALSE,
'acquia_purge_balancer_middleware' => TRUE,
];
if ($this
->debugger()
->enabled()) {
$opt['acquia_purge_debugger'] = $this
->debugger();
}
return array_merge($opt, $extra);
}
protected function getResultsConcurrently(\Closure $requests) {
$this
->debugger()
->callerAdd(__METHOD__);
$results = [];
$pool = new Pool($this->httpClient, $requests(), [
'options' => $this
->getGlobalOptions(),
'concurrency' => self::CONCURRENCY,
'fulfilled' => function ($response, $result_id) use (&$results) {
$this
->debugger()
->callerAdd(__METHOD__ . '::fulfilled');
$results[$result_id][] = TRUE;
$this
->debugger()
->callerRemove(__METHOD__ . '::fulfilled');
},
'rejected' => function ($exception, $result_id) use (&$results) {
$this
->debugger()
->callerAdd(__METHOD__ . '::rejected');
$this
->debugger()
->logFailedRequest($exception);
$results[$result_id][] = FALSE;
$this
->debugger()
->callerRemove(__METHOD__ . '::rejected');
},
]);
$promise = $pool
->promise();
$promise
->wait();
$this
->debugger()
->callerRemove(__METHOD__);
return $results;
}
public function getIdealConditionsLimit() {
$balancers = count($this->platformInfo
->getBalancerAddresses());
if ($balancers) {
return intval(ceil(200 / $balancers));
}
return 100;
}
public function hasRuntimeMeasurement() {
return TRUE;
}
public function invalidate(array $invalidations) {
throw new \Exception("Malum consilium quod mutari non potest!");
}
public function invalidateTags(array $invalidations) {
$this
->debugger()
->callerAdd(__METHOD__);
foreach ($invalidations as $invalidation) {
$tag = $invalidation
->getExpression();
if (strpos($tag, ' ') !== FALSE) {
$invalidation
->setState(InvalidationInterface::FAILED);
$this->logger
->error("Tag '%tag' contains a space, this is forbidden.", [
'%tag' => $tag,
]);
}
else {
$invalidation
->setState(InvalidationInterface::PROCESSING);
}
}
$group = 0;
$groups = [];
foreach ($invalidations as $invalidation) {
if ($invalidation
->getState() !== InvalidationInterface::PROCESSING) {
continue;
}
if (!isset($groups[$group])) {
$groups[$group] = [
'tags' => [],
[
'objects' => [],
],
];
}
if (count($groups[$group]['tags']) >= self::TAGS_GROUPED_BY) {
$group++;
}
$groups[$group]['objects'][] = $invalidation;
$groups[$group]['tags'][] = $invalidation
->getExpression();
}
if (!count($groups)) {
foreach ($invalidations as $invalidation) {
$invalidation
->setState(InvalidationInterface::FAILED);
}
return;
}
$site = $this->platformInfo
->getSiteIdentifier();
$ipv4_addresses = $this->platformInfo
->getBalancerAddresses();
$requests = function () use ($groups, $ipv4_addresses, $site) {
foreach ($groups as $group_id => $group) {
$tags = new TagsHeaderValue($group['tags'], Hash::cacheTags($group['tags']));
foreach ($ipv4_addresses as $ipv4) {
(yield $group_id => function ($poolopt) use ($site, $tags, $ipv4) {
$opt = [
'headers' => [
'X-Acquia-Purge' => $site,
'X-Acquia-Purge-Tags' => $tags
->__toString(),
'Accept-Encoding' => 'gzip',
'User-Agent' => 'Acquia Purge',
],
];
if ($this
->debugger()
->enabled()) {
$opt['acquia_purge_tags'] = $tags;
}
if (is_array($poolopt) && count($poolopt)) {
$opt = array_merge($poolopt, $opt);
}
return $this->httpClient
->requestAsync('BAN', "http://{$ipv4}/tags", $opt);
});
}
}
};
$results = $this
->getResultsConcurrently($requests);
foreach ($groups as $group_id => $group) {
if (!isset($results[$group_id]) || !count($results[$group_id])) {
foreach ($group['objects'] as $invalidation) {
$invalidation
->setState(InvalidationInterface::FAILED);
}
}
else {
if (in_array(FALSE, $results[$group_id])) {
foreach ($group['objects'] as $invalidation) {
$invalidation
->setState(InvalidationInterface::FAILED);
}
}
else {
foreach ($group['objects'] as $invalidation) {
$invalidation
->setState(InvalidationInterface::SUCCEEDED);
}
}
}
}
$this
->debugger()
->callerRemove(__METHOD__);
}
public function invalidateUrls(array $invalidations) {
$this
->debugger()
->callerAdd(__METHOD__);
foreach ($invalidations as $inv) {
$inv
->setState(InvalidationInterface::PROCESSING);
}
$ipv4_addresses = $this->platformInfo
->getBalancerAddresses();
$token = $this->platformInfo
->getBalancerToken();
$requests = function () use ($invalidations, $ipv4_addresses, $token) {
foreach ($invalidations as $inv) {
foreach ($ipv4_addresses as $ipv4) {
(yield $inv
->getId() => function ($poolopt) use ($inv, $ipv4, $token) {
$uri = $inv
->getExpression();
$host = parse_url($uri, PHP_URL_HOST);
$uri = str_replace($host, $ipv4, $uri);
$opt = [
'headers' => [
'X-Acquia-Purge' => $token,
'Accept-Encoding' => 'gzip',
'User-Agent' => 'Acquia Purge',
'Host' => $host,
],
];
if (is_array($poolopt) && count($poolopt)) {
$opt = array_merge($poolopt, $opt);
}
return $this->httpClient
->requestAsync('PURGE', $uri, $opt);
});
}
}
};
$results = $this
->getResultsConcurrently($requests);
foreach ($invalidations as $invalidation) {
$inv_id = $invalidation
->getId();
if (!isset($results[$inv_id]) || !count($results[$inv_id])) {
$invalidation
->setState(InvalidationInterface::FAILED);
}
else {
if (in_array(FALSE, $results[$inv_id])) {
$invalidation
->setState(InvalidationInterface::FAILED);
}
else {
$invalidation
->setState(InvalidationInterface::SUCCEEDED);
}
}
}
$this
->debugger()
->callerRemove(__METHOD__);
}
public function invalidateWildcardUrls(array $invalidations) {
$this
->debugger()
->callerAdd(__METHOD__);
foreach ($invalidations as $inv) {
$inv
->setState(InvalidationInterface::PROCESSING);
}
$ipv4_addresses = $this->platformInfo
->getBalancerAddresses();
$token = $this->platformInfo
->getBalancerToken();
$requests = function () use ($invalidations, $ipv4_addresses, $token) {
foreach ($invalidations as $inv) {
foreach ($ipv4_addresses as $ipv4) {
(yield $inv
->getId() => function ($poolopt) use ($inv, $ipv4, $token) {
$uri = str_replace('https://', 'http://', $inv
->getExpression());
$host = parse_url($uri, PHP_URL_HOST);
$uri = str_replace($host, $ipv4, $uri);
$opt = [
'headers' => [
'X-Acquia-Purge' => $token,
'Accept-Encoding' => 'gzip',
'User-Agent' => 'Acquia Purge',
'Host' => $host,
],
];
if (is_array($poolopt) && count($poolopt)) {
$opt = array_merge($poolopt, $opt);
}
return $this->httpClient
->requestAsync('BAN', $uri, $opt);
});
}
}
};
$results = $this
->getResultsConcurrently($requests);
foreach ($invalidations as $invalidation) {
$inv_id = $invalidation
->getId();
if (!isset($results[$inv_id]) || !count($results[$inv_id])) {
$invalidation
->setState(InvalidationInterface::FAILED);
}
else {
if (in_array(FALSE, $results[$inv_id])) {
$invalidation
->setState(InvalidationInterface::FAILED);
}
else {
$invalidation
->setState(InvalidationInterface::SUCCEEDED);
}
}
}
$this
->debugger()
->callerRemove(__METHOD__);
}
public function invalidateEverything(array $invalidations) {
$this
->debugger()
->callerAdd(__METHOD__);
foreach ($invalidations as $invalidation) {
$invalidation
->setState(InvalidationInterface::PROCESSING);
}
$overall_success = TRUE;
$opt = $this
->getGlobalOptions();
$opt['headers'] = [
'X-Acquia-Purge' => $this->platformInfo
->getSiteIdentifier(),
'Accept-Encoding' => 'gzip',
'User-Agent' => 'Acquia Purge',
];
foreach ($this->platformInfo
->getBalancerAddresses() as $ip_address) {
try {
$this->httpClient
->request('BAN', 'http://' . $ip_address . '/site', $opt);
} catch (\Exception $e) {
$this
->debugger()
->logFailedRequest($e);
$overall_success = FALSE;
}
}
foreach ($invalidations as $invalidation) {
if ($overall_success) {
$invalidation
->setState(InvalidationInterface::SUCCEEDED);
}
else {
$invalidation
->setState(InvalidationInterface::FAILED);
}
}
$this
->debugger()
->callerRemove(__METHOD__);
}
public function routeTypeToMethod($type) {
$methods = [
'tag' => 'invalidateTags',
'url' => 'invalidateUrls',
'wildcardurl' => 'invalidateWildcardUrls',
'everything' => 'invalidateEverything',
];
return isset($methods[$type]) ? $methods[$type] : 'invalidate';
}
}