View source
<?php
namespace Drupal\feeds\EventSubscriber;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Drupal\Core\Url as CoreUrl;
use Drupal\feeds\Component\HttpHelpers;
use Drupal\feeds\Event\DeleteFeedsEvent;
use Drupal\feeds\Event\FeedsEvents;
use Drupal\feeds\Event\FetchEvent;
use Drupal\feeds\FeedInterface;
use Drupal\feeds\Result\FetcherResultInterface;
use Drupal\feeds\Result\HttpFetcherResultInterface;
use Drupal\feeds\SubscriptionInterface;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Url;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
class PubSubHubbub implements EventSubscriberInterface {
use StringTranslationTrait;
protected $storage;
public function __construct(EntityTypeManagerInterface $entity_type_manager) {
$this->storage = $entity_type_manager
->getStorage('feeds_subscription');
}
public static function getSubscribedEvents() {
$events = [];
$events[FeedsEvents::FETCH][] = [
'onPostFetch',
FeedsEvents::AFTER,
];
$events[FeedsEvents::FEEDS_DELETE][] = 'onDeleteMultipleFeeds';
return $events;
}
protected function batchSet(array $batch_definition) {
return batch_set($batch_definition);
}
public function onPostFetch(FetchEvent $event) {
$feed = $event
->getFeed();
$fetcher = $feed
->getType()
->getFetcher();
$subscription = $this->storage
->load($feed
->id());
if (!$fetcher
->getConfiguration('use_pubsubhubbub')) {
return $this
->unsubscribe($feed, $subscription);
}
if (!($hub = $this
->findRelation($event
->getFetcherResult(), 'hub'))) {
$hub = $fetcher
->getConfiguration('fallback_hub');
}
if (!$hub) {
return $this
->unsubscribe($feed, $subscription);
}
$source_url = Url::fromString($feed
->getSource());
$hub = (string) $source_url
->combine($hub);
if ($topic = $this
->findRelation($event
->getFetcherResult(), 'self')) {
$topic = (string) $source_url
->combine($topic);
$feed
->setSource($topic);
}
else {
$topic = $feed
->getSource();
}
if (!$subscription) {
$subscription = $this->storage
->create([
'fid' => $feed
->id(),
'topic' => $topic,
'hub' => $hub,
]);
return $this
->subscribe($feed, $subscription);
}
if ($topic !== $subscription
->getTopic() || $subscription
->getHub() !== $hub || $subscription
->getState() !== 'subscribed') {
$this
->unsubscribe($feed, $subscription);
$subscription = $this->storage
->create([
'fid' => $feed
->id(),
'topic' => $topic,
'hub' => $hub,
]);
return $this
->subscribe($feed, $subscription);
}
}
protected function subscribe(FeedInterface $feed, SubscriptionInterface $subscription) {
$subscription
->subscribe();
$batch = [
'title' => $this
->t('Subscribing to: %title', [
'%title' => $feed
->label(),
]),
'init_message' => $this
->t('Subscribing to: %title', [
'%title' => $feed
->label(),
]),
'operations' => [
[
'Drupal\\feeds\\EventSubscriber\\PubSubHubbub::runSubscribeBatch',
[
$subscription,
],
],
],
'progress_message' => $this
->t('Subscribing: %title', [
'%title' => $feed
->label(),
]),
'error_message' => $this
->t('An error occored while subscribing to %title.', [
'%title' => $feed
->label(),
]),
];
$this
->batchSet($batch);
}
protected function unsubscribe(FeedInterface $feed, SubscriptionInterface $subscription = NULL) {
if (!$subscription) {
return;
}
$subscription
->unsubscribe();
$batch = [
'title' => $this
->t('Unsubscribing from: %title', [
'%title' => $feed
->label(),
]),
'init_message' => $this
->t('Unsubscribing from: %title', [
'%title' => $feed
->label(),
]),
'operations' => [
[
'Drupal\\feeds\\EventSubscriber\\PubSubHubbub::runSubscribeBatch',
[
$subscription,
],
],
],
'progress_message' => $this
->t('Unsubscribing: %title', [
'%title' => $feed
->label(),
]),
'error_message' => $this
->t('An error occored while unsubscribing from %title.', [
'%title' => $feed
->label(),
]),
];
$this
->batchSet($batch);
}
public static function runSubscribeBatch(SubscriptionInterface $subscription) {
switch ($subscription
->getState()) {
case 'subscribing':
$mode = 'subscribe';
break;
case 'unsubscribing':
$mode = 'unsubscribe';
$id = $subscription
->getToken() . ':' . $subscription
->id();
\Drupal::keyValueExpirable('feeds_push_unsubscribe')
->setWithExpire($id, $subscription, 3600);
break;
default:
throw new \LogicException('A subscription was found in an invalid state.');
}
$args = [
'feeds_subscription_id' => $subscription
->id(),
'feeds_push_token' => $subscription
->getToken(),
];
$callback = CoreUrl::fromRoute('entity.feeds_feed.subscribe', $args, [
'absolute' => TRUE,
])
->toString();
$post_body = [
'hub.callback' => $callback,
'hub.mode' => $mode,
'hub.topic' => $subscription
->getTopic(),
'hub.secret' => $subscription
->getSecret(),
];
$response = static::retry($subscription, $post_body);
if (!$response || $response
->getStatusCode() != 202) {
switch ($subscription
->getState()) {
case 'subscribing':
$subscription
->delete();
break;
case 'unsubscribing':
break;
}
}
}
protected static function retry(SubscriptionInterface $subscription, array $body, $retries = 3) {
$tries = 0;
do {
$tries++;
try {
return \Drupal::httpClient()
->post($subscription
->getHub(), [
'body' => $body,
]);
} catch (RequestException $e) {
\Drupal::logger('feeds')
->warning('Subscription error: %error', [
'%error' => $e
->getMessage(),
]);
}
} while ($tries <= $retries);
}
protected function findRelation(FetcherResultInterface $fetcher_result, $relation) {
if ($fetcher_result instanceof HttpFetcherResultInterface) {
if ($rel = HttpHelpers::findLinkHeader($fetcher_result
->getHeaders(), $relation)) {
return $rel;
}
}
return HttpHelpers::findRelationFromXml($fetcher_result
->getRaw(), $relation);
}
public function onDeleteMultipleFeeds(DeleteFeedsEvent $event) {
$subscriptions = $this->storage
->loadMultiple(array_keys($event
->getFeeds()));
foreach ($event
->getFeeds() as $feed) {
if (!isset($subscriptions[$feed
->id()])) {
continue;
}
$this
->unsubscribe($feed, $subscriptions[$feed
->id()]);
}
}
}