You are here

class PubSubHubbub in Feeds 8.3

Event listener for PubSubHubbub subscriptions.

Hierarchy

Expanded class hierarchy of PubSubHubbub

1 string reference to 'PubSubHubbub'
feeds.services.yml in ./feeds.services.yml
feeds.services.yml
1 service uses PubSubHubbub
feeds.pubsubhubbub in ./feeds.services.yml
Drupal\feeds\EventSubscriber\PubSubHubbub

File

src/EventSubscriber/PubSubHubbub.php, line 23

Namespace

Drupal\feeds\EventSubscriber
View source
class PubSubHubbub implements EventSubscriberInterface {
  use StringTranslationTrait;

  /**
   * The subscription storage controller.
   *
   * @var \Drupal\Core\Entity\EntityStorageInterface
   */
  protected $storage;

  /**
   * Constructs a PubSubHubbub object.
   *
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
   *   The entity type manager.
   */
  public function __construct(EntityTypeManagerInterface $entity_type_manager) {
    $this->storage = $entity_type_manager
      ->getStorage('feeds_subscription');
  }

  /**
   * {@inheritdoc}
   */
  public static function getSubscribedEvents() {
    $events = [];
    $events[FeedsEvents::FETCH][] = [
      'onPostFetch',
      FeedsEvents::AFTER,
    ];
    $events[FeedsEvents::FEEDS_DELETE][] = 'onDeleteMultipleFeeds';
    return $events;
  }

  /**
   * Adds a new batch.
   *
   * @param array $batch_definition
   *   An associative array defining the batch.
   */
  protected function batchSet(array $batch_definition) {
    return batch_set($batch_definition);
  }

  /**
   * Subscribes to a feed.
   *
   * @param \Drupal\feeds\Event\FetchEvent $event
   *   The fetch event.
   */
  public function onPostFetch(FetchEvent $event) {
    $feed = $event
      ->getFeed();
    $fetcher = $feed
      ->getType()
      ->getFetcher();
    $subscription = $this->storage
      ->load($feed
      ->id());
    if (!$fetcher
      ->getConfiguration('use_pubsubhubbub')) {
      return $this
        ->unsubscribe($feed, $subscription);
    }
    if (!($hub = $this
      ->findRelation($event
      ->getFetcherResult(), 'hub'))) {
      $hub = $fetcher
        ->getConfiguration('fallback_hub');
    }

    // No hub found.
    if (!$hub) {
      return $this
        ->unsubscribe($feed, $subscription);
    }

    // Used to make other URLs absolute.
    $source_url = Url::fromString($feed
      ->getSource());
    $hub = (string) $source_url
      ->combine($hub);

    // If there is a rel="self" relation.
    if ($topic = $this
      ->findRelation($event
      ->getFetcherResult(), 'self')) {
      $topic = (string) $source_url
        ->combine($topic);
      $feed
        ->setSource($topic);
    }
    else {
      $topic = $feed
        ->getSource();
    }

    // Subscription does not exist yet.
    if (!$subscription) {
      $subscription = $this->storage
        ->create([
        'fid' => $feed
          ->id(),
        'topic' => $topic,
        'hub' => $hub,
      ]);
      return $this
        ->subscribe($feed, $subscription);
    }
    if ($topic !== $subscription
      ->getTopic() || $subscription
      ->getHub() !== $hub || $subscription
      ->getState() !== 'subscribed') {

      // Unsubscribe from the old feed.
      $this
        ->unsubscribe($feed, $subscription);
      $subscription = $this->storage
        ->create([
        'fid' => $feed
          ->id(),
        'topic' => $topic,
        'hub' => $hub,
      ]);
      return $this
        ->subscribe($feed, $subscription);
    }
  }

  /**
   * Subscribes a subscription to a hub in a batch.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to which the subscription is linked.
   * @param \Drupal\feeds\SubscriptionInterface $subscription
   *   The subscription to subscribe.
   */
  protected function subscribe(FeedInterface $feed, SubscriptionInterface $subscription) {
    $subscription
      ->subscribe();
    $batch = [
      'title' => $this
        ->t('Subscribing to: %title', [
        '%title' => $feed
          ->label(),
      ]),
      'init_message' => $this
        ->t('Subscribing to: %title', [
        '%title' => $feed
          ->label(),
      ]),
      'operations' => [
        [
          'Drupal\\feeds\\EventSubscriber\\PubSubHubbub::runSubscribeBatch',
          [
            $subscription,
          ],
        ],
      ],
      'progress_message' => $this
        ->t('Subscribing: %title', [
        '%title' => $feed
          ->label(),
      ]),
      'error_message' => $this
        ->t('An error occored while subscribing to %title.', [
        '%title' => $feed
          ->label(),
      ]),
    ];
    $this
      ->batchSet($batch);
  }

  /**
   * Unsubscribes a subscription from a hub in a batch.
   *
   * @param \Drupal\feeds\FeedInterface $feed
   *   The feed to which the subscription is linked.
   * @param \Drupal\feeds\SubscriptionInterface $subscription
   *   The subscription to unsubscribe.
   */
  protected function unsubscribe(FeedInterface $feed, SubscriptionInterface $subscription = NULL) {
    if (!$subscription) {
      return;
    }
    $subscription
      ->unsubscribe();
    $batch = [
      'title' => $this
        ->t('Unsubscribing from: %title', [
        '%title' => $feed
          ->label(),
      ]),
      'init_message' => $this
        ->t('Unsubscribing from: %title', [
        '%title' => $feed
          ->label(),
      ]),
      'operations' => [
        [
          'Drupal\\feeds\\EventSubscriber\\PubSubHubbub::runSubscribeBatch',
          [
            $subscription,
          ],
        ],
      ],
      'progress_message' => $this
        ->t('Unsubscribing: %title', [
        '%title' => $feed
          ->label(),
      ]),
      'error_message' => $this
        ->t('An error occored while unsubscribing from %title.', [
        '%title' => $feed
          ->label(),
      ]),
    ];
    $this
      ->batchSet($batch);
  }

  /**
   * Subscribes to or unsubscribes from a hub.
   *
   * This method is used as callback for a batch.
   *
   * @param \Drupal\feeds\SubscriptionInterface $subscription
   *   The subscription entity.
   *
   * @see ::subscribe
   * @see ::unsubscribe
   */
  public static function runSubscribeBatch(SubscriptionInterface $subscription) {
    switch ($subscription
      ->getState()) {
      case 'subscribing':
        $mode = 'subscribe';
        break;
      case 'unsubscribing':
        $mode = 'unsubscribe';

        // The subscription has been deleted, store it for a bit to handle the
        // response.
        $id = $subscription
          ->getToken() . ':' . $subscription
          ->id();
        \Drupal::keyValueExpirable('feeds_push_unsubscribe')
          ->setWithExpire($id, $subscription, 3600);
        break;
      default:
        throw new \LogicException('A subscription was found in an invalid state.');
    }
    $args = [
      'feeds_subscription_id' => $subscription
        ->id(),
      'feeds_push_token' => $subscription
        ->getToken(),
    ];
    $callback = CoreUrl::fromRoute('entity.feeds_feed.subscribe', $args, [
      'absolute' => TRUE,
    ])
      ->toString();
    $post_body = [
      'hub.callback' => $callback,
      'hub.mode' => $mode,
      'hub.topic' => $subscription
        ->getTopic(),
      'hub.secret' => $subscription
        ->getSecret(),
    ];
    $response = static::retry($subscription, $post_body);

    // Response failed.
    if (!$response || $response
      ->getStatusCode() != 202) {
      switch ($subscription
        ->getState()) {
        case 'subscribing':

          // Deleting the subscription will make it re-subscribe on the next
          // import.
          $subscription
            ->delete();
          break;
        case 'unsubscribing':

          // Unsubscribe failed. The hub should give up eventually.
          break;
      }
    }
  }

  /**
   * Retries a POST request.
   *
   * @param \Drupal\feeds\SubscriptionInterface $subscription
   *   The subscription.
   * @param array $body
   *   The POST body.
   * @param int $retries
   *   (optional) The number of retries. Defaults to 3.
   *
   * @return \GuzzleHttp\Message\Response
   *   The Guzzle response.
   */
  protected static function retry(SubscriptionInterface $subscription, array $body, $retries = 3) {
    $tries = 0;
    do {
      $tries++;
      try {
        return \Drupal::httpClient()
          ->post($subscription
          ->getHub(), [
          'body' => $body,
        ]);
      } catch (RequestException $e) {
        \Drupal::logger('feeds')
          ->warning('Subscription error: %error', [
          '%error' => $e
            ->getMessage(),
        ]);
      }
    } while ($tries <= $retries);
  }

  /**
   * Finds a hub from a fetcher result.
   *
   * @param \Drupal\feeds\Result\FetcherResultInterface $fetcher_result
   *   The fetcher result.
   * @param string $relation
   *   The type of relation to find.
   *
   * @return string|null
   *   The hub URL or null if one wasn't found.
   */
  protected function findRelation(FetcherResultInterface $fetcher_result, $relation) {
    if ($fetcher_result instanceof HttpFetcherResultInterface) {
      if ($rel = HttpHelpers::findLinkHeader($fetcher_result
        ->getHeaders(), $relation)) {
        return $rel;
      }
    }
    return HttpHelpers::findRelationFromXml($fetcher_result
      ->getRaw(), $relation);
  }

  /**
   * Deletes subscriptions when feeds are deleted.
   *
   * @param \Drupal\feeds\Event\DeleteFeedsEvent $event
   *   The delete event.
   */
  public function onDeleteMultipleFeeds(DeleteFeedsEvent $event) {
    $subscriptions = $this->storage
      ->loadMultiple(array_keys($event
      ->getFeeds()));
    foreach ($event
      ->getFeeds() as $feed) {
      if (!isset($subscriptions[$feed
        ->id()])) {
        continue;
      }
      $this
        ->unsubscribe($feed, $subscriptions[$feed
        ->id()]);
    }
  }

}

Members

Namesort descending Modifiers Type Description Overrides
PubSubHubbub::$storage protected property The subscription storage controller.
PubSubHubbub::batchSet protected function Adds a new batch.
PubSubHubbub::findRelation protected function Finds a hub from a fetcher result.
PubSubHubbub::getSubscribedEvents public static function Returns an array of event names this subscriber wants to listen to.
PubSubHubbub::onDeleteMultipleFeeds public function Deletes subscriptions when feeds are deleted.
PubSubHubbub::onPostFetch public function Subscribes to a feed.
PubSubHubbub::retry protected static function Retries a POST request.
PubSubHubbub::runSubscribeBatch public static function Subscribes to or unsubscribes from a hub.
PubSubHubbub::subscribe protected function Subscribes a subscription to a hub in a batch.
PubSubHubbub::unsubscribe protected function Unsubscribes a subscription from a hub in a batch.
PubSubHubbub::__construct public function Constructs a PubSubHubbub object.
StringTranslationTrait::$stringTranslation protected property The string translation service. 1
StringTranslationTrait::formatPlural protected function Formats a string containing a count of items.
StringTranslationTrait::getNumberOfPlurals protected function Returns the number of plurals supported by a given language.
StringTranslationTrait::getStringTranslation protected function Gets the string translation service.
StringTranslationTrait::setStringTranslation public function Sets the string translation service to use. 2
StringTranslationTrait::t protected function Translates a string to the current language or to a given language.