You are here

class MessageQueueManager in Courier 2.x

Same name and namespace in other branches
  1. 8 src/Service/MessageQueueManager.php \Drupal\courier\Service\MessageQueueManager

The message queue manager.

Hierarchy

Expanded class hierarchy of MessageQueueManager

1 string reference to 'MessageQueueManager'
courier.services.yml in ./courier.services.yml
courier.services.yml
1 service uses MessageQueueManager
courier.manager.message_queue in ./courier.services.yml
Drupal\courier\Service\MessageQueueManager

File

src/Service/MessageQueueManager.php, line 11

Namespace

Drupal\courier\Service
View source
class MessageQueueManager implements MessageQueueManagerInterface {

  /**
   * The logger for the Courier channel.
   *
   * @var \Psr\Log\LoggerInterface
   */
  protected $logger;

  /**
   * The identity channel manager.
   *
   * @var \Drupal\courier\Service\IdentityChannelManager
   */
  protected $identityChannelManager;

  /**
   * Constructs a message queue manager.
   *
   * @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory
   *   The logger factory service.
   * @param \Drupal\courier\Service\IdentityChannelManagerInterface $identity_channel_manager
   *   The identity channel manager.
   */
  public function __construct(LoggerChannelFactoryInterface $logger_factory, IdentityChannelManagerInterface $identity_channel_manager) {
    $this->logger = $logger_factory
      ->get('courier');
    $this->identityChannelManager = $identity_channel_manager;
  }

  /**
   * {@inheritdoc}
   */
  public function sendMessage(MessageQueueItemInterface $mqi) {
    $options = $mqi
      ->getOptions();
    $channel_options = array_key_exists('channels', $options) ? $options['channels'] : [];
    unset($options['channels']);

    // Instead of iterating over messages, get the identity' channel preferences
    // again. This ensures preference order is up to date since significant time
    // may have passed since adding to queue.
    $channels = $this->identityChannelManager
      ->getChannelsForIdentity($mqi
      ->getIdentity());
    $messages = [];
    foreach ($channels as $channel) {
      if ($message = $mqi
        ->getMessage($channel)) {
        $messages[] = $message;
      }
    }

    /** @var \Drupal\courier\ChannelInterface[] $messages */
    foreach ($messages as $message) {
      $message_options = $options;

      // Transform options based on channel.
      $channel = $message
        ->getEntityTypeId();
      if (array_key_exists($channel, $channel_options)) {
        $message_options = array_merge($message_options, $channel_options[$channel]);
      }
      $t_args = [
        '@channel' => $channel,
        '@identity' => $mqi
          ->getIdentity()
          ->label(),
      ];
      try {
        $message::sendMessages([
          $message,
        ], $message_options);
        $this->logger
          ->info('Successfully sent @channel to @identity', $t_args);
        $mqi
          ->delete();
        return $message;
      } catch (\Exception $e) {
        $t_args['@exception'] = $e
          ->getMessage();
        $this->logger
          ->warning('Failed to send @channel to @identity: @exception', $t_args);
        continue;
      }
      break;
    }
    return FALSE;
  }

}

Members

Namesort descending Modifiers Type Description Overrides
MessageQueueManager::$identityChannelManager protected property The identity channel manager.
MessageQueueManager::$logger protected property The logger for the Courier channel.
MessageQueueManager::sendMessage public function Attempts to send the messages in the message queue item. Overrides MessageQueueManagerInterface::sendMessage
MessageQueueManager::__construct public function Constructs a message queue manager.