You are here

message_subscribe.module in Message Subscribe 7

Same filename and directory in other branches
  1. 8 message_subscribe.module

Subscribe API for the Message and Message notify modules.

File

message_subscribe.module
View source
<?php

/**
 * @file
 * Subscribe API for the Message and Message notify modules.
 */

/**
 * Process a message, and send to subscribed users.
 *
 * @param $entity_type
 *   The entity type.
 * @param $entity
 *   The entity object.
 * @param $message
 *   The Message object.
 * @param $notify_options
 *   Optional; Array of options to pass to message_notify_send_message()
 *   keyed by the notifier name.
 * @param $subscribe_options
 *   Optional; Array with the following optional values:
 *   - "save message": Determine if the Message should be saved. Defaults
 *     to TRUE.
 *   - "skip context": Determine if extracting basic context should be
 *     skipped in message_subscribe_get_subscribers(). Defaults to FALSE.
 *   - "last uid": The last user ID to query.
 *   - "uids": Array of user IDs to be processed. Setting this, will cause
 *     skipping message_subscribe_get_subscribers() to get the subscribed
 *     users. For example:
 *     $subscribe_options['uids'] = array(
 *       1 => array(
 *        'notifiers' => array('email'),
 *       ),
 *     );
 *   - "range": The number of items to fetch in the query.
 *   - "end time": The timestamp of the time limit for the function to
 *     execute. Defaults to FALSE, meaning there is no time limitation.
 *   - "use queue": Determine if queue API should be used to
 *   - "queue": Set to TRUE to indicate the processing is done via a queue
 *     worker. see message_subscribe_queue_worker().
 *   - "entity access": Determine if access to view the entity should be applied
 *     when getting the list of subscribed users. Defaults to TRUE.
 *   - "notify blocked users": Determine whether blocked users
 *      should be notified. Typically this should be used in conjunction with
 *      "entity access" to ensure that blocked users don't receive notifications
 *      about entities which they used to have access to
 *      before they were blocked. Defaults to FALSE.
 *   - "notify message owner": Determines if the user that created the entity
 *      gets notified of their own action. If TRUE the author will get notified.
 *      Defaults to FALSE.
 *
 * @param $context
 *   Optional; Array keyed with the entity type and array of entity IDs as
 *   the value. For example, if the event is related to a node
 *   entity, the implementing module might pass along with the node
 *   itself, the node author and related taxonomy terms.
 *
 * @code
 *   $context = array(
 *     'node' => array(1),
 *     // The node author.
 *     'user' => array(10),
 *     // Related taxonomy terms.
 *     'taxonomy_term' => array(100, 200, 300)
 *   );
 * @endcode
 *
 * @return Message
 *  The message object.
 */
function message_subscribe_send_message($entity_type, $entity, Message $message, $notify_options = array(), $subscribe_options = array(), $context = array()) {
  $use_queue = isset($subscribe_options['use queue']) ? $subscribe_options['use queue'] : variable_get('message_subscribe_use_queue', FALSE);
  $notify_message_owner = isset($subscribe_options['notify message owner']) ? $subscribe_options['notify message owner'] : variable_get('message_subscribe_notify_own_actions', FALSE);

  // Save message by default.
  $subscribe_options += array(
    'save message' => TRUE,
    'skip context' => FALSE,
    'last uid' => 0,
    'uids' => array(),
    'range' => $use_queue ? 100 : FALSE,
    'end time' => FALSE,
    'use queue' => $use_queue,
    'queue' => FALSE,
    'entity access' => TRUE,
    'notify blocked users' => FALSE,
    'notify message owner' => $notify_message_owner,
  );
  if (empty($message->mid) && $subscribe_options['save message']) {
    $message
      ->save();
  }
  if ($use_queue) {
    $queue = DrupalQueue::get('message_subscribe');
    list($id) = entity_extract_ids($entity_type, $entity);
  }
  if ($use_queue && empty($subscribe_options['queue'])) {
    if (empty($message->mid)) {
      throw new Exception('Cannot add a non-saved message to the queue.');
    }

    // Get the context once, so we don't need to process it every time
    // a worker claims the item.
    $context = $context ? $context : message_subscribe_get_basic_context($entity_type, $entity, $subscribe_options, $context);
    $subscribe_options['skip context'] = TRUE;

    // Add item to the queue.
    $task = array(
      'mid' => $message->mid,
      'entity_type' => $entity_type,
      'entity_id' => $id,
      'notify_options' => $notify_options,
      'subscribe_options' => $subscribe_options,
      'context' => $context,
      // Add the user ID for the queue.
      'uid' => $subscribe_options['last uid'],
    );

    // Exit now, as messages will be processed via queue API.
    return $queue
      ->createItem($task);
  }
  $message->message_subscribe = array();

  // Retrieve all users subscribed.
  $uids = array();
  if ($subscribe_options['uids']) {

    // We got a list of user IDs directly from the implementing module,
    // However we need to adhere to the range.
    // If 'last uid' is provided, we need to start from next 'uid' key in array.
    if (!empty($subscribe_options['last uid'])) {
      $index = array_search($subscribe_options['last uid'], array_keys($subscribe_options['uids'])) + 1;
    }
    else {
      $index = 0;
    }
    $uids = $subscribe_options['range'] ? array_slice($subscribe_options['uids'], $index, $subscribe_options['range'], TRUE) : $subscribe_options['uids'];
  }
  if (empty($uids) && !($uids = message_subscribe_get_subscribers($entity_type, $entity, $message, $subscribe_options, $context))) {

    // If we use a queue, it will be deleted.
    return $message;
  }
  foreach ($uids as $uid => $values) {
    $last_uid = $uid;

    // Clone the message in case it will need to be saved, it won't
    // overwrite the existing one.
    $cloned_message = clone $message;

    // Push a copy of the original message into the new one.
    $cloned_message->original = $message;
    unset($cloned_message->mid);
    $cloned_message->uid = $uid;
    $values += array(
      'notifiers' => array(),
    );

    // Send the message using the required notifiers.
    foreach ($values['notifiers'] as $notifier_name) {
      $options = !empty($notify_options[$notifier_name]) ? $notify_options[$notifier_name] : array();
      $options += array(
        'save on fail' => FALSE,
        'save on success' => FALSE,
      );
      message_notify_send_message($cloned_message, $options, $notifier_name);

      // Check we didn't timeout.
      if ($use_queue && $subscribe_options['queue']['end time'] && time() < $subscribe_options['queue']['end time']) {
        continue 2;
      }
    }
  }
  if ($use_queue) {

    // Add item to the queue.
    $task = array(
      'mid' => $message->mid,
      'entity_type' => $entity_type,
      'entity_id' => $id,
      'notify_options' => $notify_options,
      'subscribe_options' => $subscribe_options,
      'context' => $context,
      'uid' => $last_uid,
    );
    $task['subscribe_options']['last uid'] = $last_uid;

    // Create a new queue item, with the last user ID.
    $queue
      ->createItem($task);
  }
  return $message;
}

/**
 * Get a list of user IDs that need to recieve the message.
 *
 * @param $entity_type
 *   The entity type.
 * @param $entity
 *   The entity object.
 * @param $message
 *   The Message object.
 * @param $subscribe_options
 *   Optional; The options array to pass to
 *   message_subscribe_get_basic_context().
 * @param $context
 *   Optional; The context array, passed by reference.
 *
 * @return
 *   Array keyed with the user IDs to send email, and array with the flags
 *   the used it for the subscription, and the notifier names.
 *
 * @code
 *   array(
 *     1 => array(
 *       'flags' => array('subscribe_node', 'subscribe_user'),
 *       'notifiers' => array('email', 'sms'),
 *     ),
 *   );
 * @endcode
 */
function message_subscribe_get_subscribers($entity_type, $entity, Message $message, $subscribe_options = array(), &$context = array()) {
  $context = $context ? $context : message_subscribe_get_basic_context($entity_type, $entity, $subscribe_options, $context);
  $notify_message_owner = isset($subscribe_options['notify message owner']) ? $subscribe_options['notify message owner'] : variable_get('message_subscribe_notify_own_actions', FALSE);
  $uids = array();

  // We don't use module_invoke_all() is we want to retain the array keys,
  // which are the user IDs.
  foreach (module_implements('message_subscribe_get_subscribers') as $module) {
    $function = $module . '_message_subscribe_get_subscribers';
    $result = $function($message, $subscribe_options, $context);
    $uids += $result;
  }

  // If we're not notifying blocked users, exclude those users from the result
  // set now so that we avoid unnecessarily loading those users later.
  if (empty($subscribe_options['notify blocked users']) && !empty($uids)) {
    $query = new EntityFieldQuery();
    $results = $query
      ->entityCondition('entity_type', 'user')
      ->propertyCondition('status', 1)
      ->propertyCondition('uid', array_keys($uids), 'IN')
      ->execute();
    if (!empty($results['user'])) {
      $uids = array_intersect_key($uids, $results['user']);
    }
    else {

      // There are no not blocked users to notify.
      $uids = array();
    }
  }

  // See if the author of the entity gets notified.
  if (isset($entity->uid) && !$notify_message_owner && array_key_exists($entity->uid, $uids)) {
    unset($uids[$entity->uid]);
  }
  $accounts = user_load_multiple(array_keys($uids));
  foreach ($uids as $uid => $values) {
    if (!empty($subscribe_options['entity access'])) {
      if (!entity_access('view', $entity_type, $entity, $accounts[$uid])) {

        // User doesn't have access to view the entity.
        unset($uids[$uid]);
      }
    }
  }
  $values = array(
    'context' => $context,
    'entity_type' => $entity_type,
    'entity' => $entity,
    'message' => $message,
    'subscribe_options' => $subscribe_options,
  );
  _message_subscribe_add_default_notifiers($uids);
  ksort($uids);
  drupal_alter('message_subscribe_get_subscribers', $uids, $values);
  return $uids;
}

/**
 * Get context from entity type.
 *
 * This is a naive implementation, which extracts context from an entity.
 * For example, given a node we extract the node author and related
 * taxonomy terms.
 *
 * @param $entity_type
 *   The entity type.
 * @param $entity
 *   The entity object.
 * @param $subscribe_options
 *   Optional; The options array to pass to
 *   message_subscribe_get_basic_context().
 * @param $context
 *   Optional; The context array.
 *
 * @return
 *   Array keyed with the entity type and array of entity IDs as the
 *   value.
 */
function message_subscribe_get_basic_context($entity_type, $entity, $subscribe_options = array(), $context = array()) {
  if (!$context) {
    list($id) = entity_extract_ids($entity_type, $entity);
    $context[$entity_type][$id] = $id;
  }
  if (!empty($subscribe_options['skip context'])) {
    return $context;
  }
  $context += array(
    'node' => array(),
    'user' => array(),
    'taxonomy_term' => array(),
  );

  // Default context for comments.
  if ($entity_type == 'comment') {
    $context['node'][$entity->nid] = $entity->nid;
    $context['user'][$entity->uid] = $entity->uid;
  }
  if (empty($context['node'])) {
    return $context;
  }
  $nodes = node_load_multiple($context['node']);
  if (module_exists('og')) {

    // Iterate over existing nodes to extract the related groups.
    foreach ($nodes as $node) {
      foreach (og_get_entity_groups('node', $node) as $group_type => $gids) {
        foreach ($gids as $gid) {
          $context[$group_type][$gid] = $gid;
        }
      }
    }
  }
  $nodes = node_load_multiple($context['node']);
  foreach ($nodes as $node) {
    $context['user'][$node->uid] = $node->uid;
    if (module_exists('taxonomy')) {
      $context['taxonomy_term'] = !empty($context['taxonomy_term']) ? $context['taxonomy_term'] : array();

      // Iterate over all taxonomy term reference fields, or entity-reference
      // fields that reference terms.
      foreach (array_keys(field_info_instances('node', $node->type)) as $field_name) {
        $field = field_info_field($field_name);
        if ($field['type'] == 'taxonomy_term_reference' || $field['type'] == 'entityreference' && $field['settings']['target_type'] == 'taxonomy_term') {
          $wrapper = entity_metadata_wrapper('node', $node);
          if ($tids = $wrapper->{$field_name}
            ->value(array(
            'identifier' => TRUE,
          ))) {
            $tids = $field['cardinality'] == 1 ? array(
              $tids,
            ) : $tids;
            foreach ($tids as $tid) {
              $context['taxonomy_term'][$tid] = $tid;
            }
          }
        }
      }
    }
  }
  return $context;
}

/**
 * Implements hook_message_subscribe_get_subscribers().
 *
 * @param $context
 *   Array keyed with the entity type and array of entity IDs as the
 *   value. According to this context this function will retrieve the
 *   related subscribers.
 * @param $message
 *   The message object.
 *
 * @return
 *   Array keyed with the user ID and the value:
 *   - "flags": Array with the flag names that resulted with including
 *   the user.
 *   - "notifiers": Array with the Message notifier name plugins.
 *
 * @see flag_get_content_flags()
 */
function message_subscribe_message_subscribe_get_subscribers(Message $message, $subscribe_options = array(), $context = array()) {
  $uids = array();
  $subscribe_options += array(
    'last uid' => 0,
    'range' => FALSE,
  );

  // Determine if a range is needed for the query.
  $range = $subscribe_options['range'];

  // Find the users that subscribed to each context.
  foreach ($context as $entity_type => $entity_ids) {
    if (!$entity_ids) {
      continue;
    }

    // Get all flags on given entity type.
    if (!($flags = message_subscribe_flag_get_flags($entity_type))) {
      continue;
    }
    $fids = array();
    foreach ($flags as $flag) {
      $fids[$flag->fid] = $flag->name;
    }

    // Query all the entity IDs inside the given flags. We don't use
    // flag_get_content_flags() as we want to get all the flaggings of an
    // entity-type in a single query.
    if (FLAG_API_VERSION == 2) {
      $query = db_select('flag_content', 'fc')
        ->condition('content_type', $entity_type)
        ->condition('content_id', $entity_ids, 'IN');
    }
    else {
      $query = db_select('flagging', 'fc')
        ->condition('entity_type', $entity_type)
        ->condition('entity_id', $entity_ids, 'IN');
    }
    $query
      ->fields('fc')
      ->condition('fid', array_keys($fids), 'IN')
      ->condition('fc.uid', $subscribe_options['last uid'], '>')
      ->orderBy('fc.uid', 'ASC');
    if ($range) {
      $query
        ->range(0, $range);

      // If we're mpt notifying blocked users, we need to take this into account
      // in this query so that the range is accurate.
      if (empty($subscribe_options['notify blocked users'])) {
        $query
          ->join('users', 'users', 'users.uid = fc.uid');
        $query
          ->condition('users.status', '1', '=');
      }
    }
    $result = $query
      ->execute();
    foreach ($result as $row) {
      $uids[$row->uid] = !empty($uids[$row->uid]) ? $uids[$row->uid] : array(
        'notifiers' => array(),
      );

      // Register the flag name.
      $flag_name = $fids[$row->fid];
      $uids[$row->uid]['flags'][] = $flag_name;
      if ($range) {
        --$range;
        if ($range == 0) {

          // We've reach the requested item.
          return $uids;
        }
      }
    }
  }
  return $uids;
}

/**
 * Get Message subscribe related flags.
 *
 * Return Flags related to message subscribe using a name convention --
 * the flag name should start with "subscribe_".
 *
 * @param $content_type
 *   Optional. The type of content for which to load the flags. Usually 'node'.
 * @param $content_subtype
 *   Optional. The node type for which to load the flags.
 * @param $account
 *   Optional. The user account to filter available flags. If not set, all
 *   flags for will this node will be returned.
 * @param $reset
 *   Optional. Reset the internal query cache.
 *
 * @return $flags
 *   An array of the structure [fid] = flag_object.
 *
 * @see flag_get_flags()
 */
function message_subscribe_flag_get_flags($content_type = NULL, $content_subtype = NULL, $account = NULL, $reset = FALSE) {
  $flags = flag_get_flags($content_type, $content_subtype, $account, $reset);
  $ms_flags = array();
  $prefix = variable_get('message_subscribe_flag_prefix', 'subscribe') . '_';
  foreach ($flags as $flag_name => $flag) {

    // Check that the flag is using name convention.
    if (strpos($flag_name, $prefix) === 0) {
      $ms_flags[$flag_name] = $flag;
    }
  }
  return $ms_flags;
}

/**
 * Implements hook_flag_default_flags().
 */
function message_subscribe_flag_default_flags() {
  $flags = array();

  // Exported flag: "Content".
  $flags['subscribe_node'] = array(
    'entity_type' => 'node',
    'title' => 'Content',
    'global' => '0',
    'types' => array(),
    'flag_short' => 'Subscribe',
    'flag_long' => '',
    'flag_message' => 'You are now subscribed to this item.',
    'unflag_short' => 'Unsubscribe',
    'unflag_long' => '',
    'unflag_message' => 'You are no longer subscribed to this item.',
    'unflag_denied_text' => '',
    'link_type' => 'toggle',
    'import_roles' => array(
      'flag' => array(
        0 => '2',
      ),
      'unflag' => array(
        0 => '2',
      ),
    ),
    'weight' => 0,
    'show_on_form' => 0,
    'access_author' => '',
    'show_on_page' => 1,
    'show_on_teaser' => 1,
    'show_contextual_link' => 0,
    'i18n' => 0,
    'module' => 'message_subscribe',
    'locked' => array(
      0 => 'name',
    ),
    'api_version' => 3,
    'status' => FALSE,
  );

  // Exported flag: "Terms".
  $flags['subscribe_term'] = array(
    'entity_type' => 'taxonomy_term',
    'title' => 'Terms',
    'global' => '0',
    'types' => array(),
    'flag_short' => 'Subscribe',
    'flag_long' => '',
    'flag_message' => 'You are now subscribed to this item.',
    'unflag_short' => 'Unsubscribe',
    'unflag_long' => '',
    'unflag_message' => 'You are no longer subscribed to this item.',
    'unflag_denied_text' => '',
    'link_type' => 'toggle',
    'import_roles' => array(
      'flag' => array(
        0 => '2',
      ),
      'unflag' => array(
        0 => '2',
      ),
    ),
    'weight' => 0,
    'show_on_entity' => TRUE,
    'show_on_form' => 0,
    'access_author' => '',
    'module' => 'message_subscribe',
    'locked' => array(
      0 => 'name',
    ),
    'api_version' => 3,
    'status' => FALSE,
  );

  // Exported flag: "Users".
  $flags['subscribe_user'] = array(
    'entity_type' => 'user',
    'title' => 'Users',
    'global' => '0',
    'types' => array(),
    'flag_short' => 'Subscribe',
    'flag_long' => '',
    'flag_message' => 'You are now subscribed to this item.',
    'unflag_short' => 'Unsubscribe',
    'unflag_long' => '',
    'unflag_message' => 'You are no longer subscribed to this item.',
    'unflag_denied_text' => '',
    'link_type' => 'toggle',
    'import_roles' => array(
      'flag' => array(
        0 => '2',
      ),
      'unflag' => array(
        0 => '2',
      ),
    ),
    'weight' => 0,
    'show_on_form' => 0,
    'access_author' => '',
    'show_on_profile' => TRUE,
    'access_uid' => '',
    'module' => 'message_subscribe',
    'locked' => array(
      0 => 'name',
    ),
    'api_version' => 3,
    'status' => FALSE,
  );
  if (module_exists('og')) {

    // Exported flag: "Groups".
    $flags['subscribe_og'] = array(
      'entity_type' => 'node',
      'title' => 'Groups',
      'global' => '0',
      'types' => array(),
      'flag_short' => 'Subscribe',
      'flag_long' => '',
      'flag_message' => 'You are now subscribed to this item.',
      'unflag_short' => 'Unsubscribe',
      'unflag_long' => '',
      'unflag_message' => 'You are no longer subscribed to this item.',
      'unflag_denied_text' => '',
      'link_type' => 'toggle',
      'import_roles' => array(
        'flag' => array(
          0 => '2',
        ),
        'unflag' => array(
          0 => '2',
        ),
      ),
      'weight' => 0,
      'show_on_form' => 0,
      'access_author' => '',
      'show_on_page' => 1,
      'show_on_teaser' => 1,
      'show_contextual_link' => 0,
      'i18n' => 0,
      'module' => 'message_subscribe',
      'locked' => array(
        0 => 'name',
      ),
      'api_version' => 3,
      'status' => FALSE,
    );
  }
  if (FLAG_API_VERSION == 2) {
    $flags['subscribe_node']['api_version'] = $flags['subscribe_term']['api_version'] = $flags['subscribe_user']['api_version'] = 2;
    $flags['subscribe_node']['content_type'] = 'node';
    $flags['subscribe_term']['content_type'] = 'taxonomy_term';
    $flags['subscribe_user']['content_type'] = 'user';
    $flags['subscribe_node']['roles'] = $flags['subscribe_node']['import_roles'];
    $flags['subscribe_term']['roles'] = $flags['subscribe_term']['import_roles'];
    $flags['subscribe_user']['roles'] = $flags['subscribe_user']['import_roles'];
    unset($flags['subscribe_node']['import_roles'], $flags['subscribe_term']['import_roles'], $flags['subscribe_user']['import_roles']);
    unset($flags['subscribe_node']['entity_type'], $flags['subscribe_term']['entity_type'], $flags['subscribe_user']['entity_type']);
    if (module_exists('og')) {
      $flags['subscribe_og']['api_version'] = 2;
      $flags['subscribe_og']['content_type'] = 'node';
      $flags['subscribe_og']['roles'] = $flags['subscribe_og']['import_roles'];
      unset($flags['subscribe_og']['entity_type'], $flags['subscribe_og']['import_roles']);
    }
  }
  return $flags;
}

/**
 * Helper function that adds default notifiers to the user IDs array.
 *
 * @param $uids
 *   Array detailing notification info for users.
 *
 * @see message_subscribe_get_subscribers().
 */
function _message_subscribe_add_default_notifiers(&$uids) {
  $notifiers = variable_get('message_subscribe_default_notifiers', array(
    'email' => 'email',
  ));
  if (empty($notifiers)) {
    return;
  }

  // Add the notifiers to each user.
  foreach ($uids as $delta => $uid) {
    $uids[$delta]['notifiers'] += $notifiers;
  }
}

/**
 * Implements hook_menu().
 */
function message_subscribe_menu() {
  $items = array();
  $items['admin/config/system/message-subscribe'] = array(
    'title' => 'Message subscribe settings',
    'description' => 'Administer message subscribe',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'message_subscribe_admin_settings',
    ),
    'access arguments' => array(
      'administer message subscribe',
    ),
    'file' => 'includes/message_subscribe.admin.inc',
  );
  return $items;
}

/**
 * Implements hook_cron_queue_info().
 */
function message_subscribe_cron_queue_info() {
  $items['message_subscribe'] = array(
    'title' => t('Message subscribe'),
    'worker callback' => 'message_subscribe_queue_worker',
  );
  return $items;
}

/**
 * Queue API worker; Process a queue item.
 *
 * The item holds the Message ID, and user ID of the last user the email
 * was sent to.
 */
function message_subscribe_queue_worker($data, $end_time = FALSE) {
  extract($data);
  $entity = entity_load_single($entity_type, $entity_id);
  if (!$entity) {

    // Abort if the entity could not be loaded to avoid errors.
    return;
  }
  if (!($message = message_load($mid))) {

    // Abort if the message has been deleted to avoid errors.
    return;
  }
  $subscribe_options['queue'] = TRUE;
  message_subscribe_send_message($entity_type, $entity, $message, $notify_options, $subscribe_options, $context);
}

Functions

Namesort descending Description
message_subscribe_cron_queue_info Implements hook_cron_queue_info().
message_subscribe_flag_default_flags Implements hook_flag_default_flags().
message_subscribe_flag_get_flags Get Message subscribe related flags.
message_subscribe_get_basic_context Get context from entity type.
message_subscribe_get_subscribers Get a list of user IDs that need to recieve the message.
message_subscribe_menu Implements hook_menu().
message_subscribe_message_subscribe_get_subscribers Implements hook_message_subscribe_get_subscribers().
message_subscribe_queue_worker Queue API worker; Process a queue item.
message_subscribe_send_message Process a message, and send to subscribed users.
_message_subscribe_add_default_notifiers Helper function that adds default notifiers to the user IDs array.