You are here

advancedqueue.module in Advanced Queue 7

Same filename and directory in other branches
  1. 8 advancedqueue.module

Helper module for advanced queuing.

File

advancedqueue.module
View source
<?php

/**
 * @file
 * Helper module for advanced queuing.
 */

/**
 * Status indicating item was added to the queue.
 */
define('ADVANCEDQUEUE_STATUS_QUEUED', -1);

/**
 * Status indicating item is currently being processed.
 */
define('ADVANCEDQUEUE_STATUS_PROCESSING', 0);

/**
 * Status indicating item was processed successfully.
 */
define('ADVANCEDQUEUE_STATUS_SUCCESS', 1);

/**
 * Status indicating item processing failed.
 */
define('ADVANCEDQUEUE_STATUS_FAILURE', 2);

/**
 * Status indicating item processing failed, and should be retried.
 */
define('ADVANCEDQUEUE_STATUS_FAILURE_RETRY', 3);

/**
 * Implements hook_advancedqueue_entity_info().
 */
function advancedqueue_entity_info() {
  $entity_info['advancedqueue_item'] = array(
    'label' => t('Advanced queue item'),
    'controller class' => module_exists('entity') ? 'EntityAPIController' : 'DrupalDefaultEntityController',
    'metadata controller class' => 'EntityDefaultMetadataController',
    'base table' => 'advancedqueue',
    'module' => 'advancedqueue',
    'entity keys' => array(
      'id' => 'item_id',
      'label' => 'title',
    ),
  );
  return $entity_info;
}

/**
 * Implements hook_entity_property_info_alter().
 */
function advancedqueue_entity_property_info_alter(&$info) {
  $aq =& $info['advancedqueue_item'];
  foreach ($aq['properties'] as $name => &$prop) {
    $prop['setter callback'] = 'entity_property_verbatim_set';
  }
  $aq['properties']['status']['options list'] = '_advancedqueue_status_options';
}
function _advancedqueue_status_options() {
  static $options = array(
    ADVANCEDQUEUE_STATUS_QUEUED => 'Queued',
    ADVANCEDQUEUE_STATUS_PROCESSING => 'Processing',
    ADVANCEDQUEUE_STATUS_SUCCESS => 'Processed',
    ADVANCEDQUEUE_STATUS_FAILURE => 'Failed',
    ADVANCEDQUEUE_STATUS_FAILURE_RETRY => 'Retry',
  );
  return $options;
}

/**
 * Implements hook_menu().
 */
function advancedqueue_menu() {
  $items = array();
  $items['admin/config/system/advancedqueue'] = array(
    'title' => 'Advanced Queue',
    'description' => 'Configure settings for the Advanced Queue module.',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'advancedqueue_settings_form',
    ),
    'access arguments' => array(
      'administer site configuration',
    ),
    'type' => MENU_NORMAL_ITEM,
    'file' => 'advancedqueue.admin.inc',
  );
  return $items;
}

/**
 * Implements hook_cron().
 */
function advancedqueue_cron() {

  // Delete older entries and make sure there are no stale items in the table.
  _advancedqueue_cleanup_table();
  if (!variable_get('advancedqueue_use_cron', FALSE)) {
    return;
  }
  if (!($queues = advancedqueue_get_queues_info())) {
    return;
  }
  $end = time() + variable_get('advancedqueue_processing_timeout_cron', 60);
  foreach ($queues as $queue_name => $queue_info) {
    $queue = DrupalQueue::get($queue_name);
    while ($item = $queue
      ->claimItem($queue_info['lease time'])) {
      advancedqueue_process_item($queue, $queue_name, $queue_info, $item, $end);
      if (time() > $end) {

        // We've reached max execution time.
        return;
      }
    }
  }
}

/**
 * Implements hook_form_FORM_ID_alter().
 *
 * Add Advanced queue setting to use cron, to the cron settings page.
 */
function advancedqueue_form_system_cron_settings_alter(&$form, $form_state, $form_id) {
  $form['advancedqueue_use_cron'] = array(
    '#type' => 'checkbox',
    '#title' => t('Process Advanced Queue via Cron'),
    '#multiple' => TRUE,
    '#description' => t('Enable to allow queue items to to be processed using Cron. This is a "poor man\'s" option that allows processing the queue, as the better solution would be to execute the Drush command via the command line.'),
    '#default_value' => variable_get('advancedqueue_use_cron', FALSE),
  );
}

/**
 * Return queue(s) info.
 *
 * @params $queue_names
 *   Optional; Array with the queue names. If empty, return all the queues.
 */
function advancedqueue_get_queues_info($queue_names = array()) {
  $queues_info =& drupal_static(__FUNCTION__, array());
  if (empty($queues_info)) {
    $queues_info = module_invoke_all('advanced_queue_info');

    // Add default values.
    foreach ($queues_info as &$queue_info) {
      $queue_info += array(
        'delete when completed' => TRUE,
        'retry after' => FALSE,
        'max attempts' => 10,
        'lease time' => 30,
        'skip hooks' => FALSE,
        'groups' => array(),
      );
    }
    drupal_alter('advanced_queue_info', $queues_info);
    uasort($queues_info, 'drupal_sort_weight');
  }
  if ($queue_names) {
    return array_intersect_key($queues_info, $queue_names);
  }
  return $queues_info;
}

/**
 * Process a queue item.
 *
 * @param $queue
 *   The queue object.
 * @param $queue_name
 *   The queue name.
 * @param $queue_info
 *   The queue info.
 * @param $item
 *   The item to process.
 * @param $end_time
 *   (Optional) The time the process should end.
 */
function advancedqueue_process_item($queue, $queue_name, $queue_info, $item, $end_time = FALSE) {

  // First our pre-execute hook.
  if (!$queue_info['skip hooks']) {
    $hook_func = module_exists('rules') ? 'rules_invoke_all' : 'module_invoke_all';
    $hook_func('advancedqueue_pre_execute', $queue_name, $item);
  }
  $function = $queue_info['worker callback'];
  if (!empty($queue_info['worker include'])) {

    // This parameter, if set, is the 3 args to module_load_include, in order.
    // This allows worker callbacks to live outside the .module file.
    call_user_func_array('module_load_include', $queue_info['worker include']);
  }
  $params = array(
    '@queue' => $queue_name,
    '@id' => $item->item_id,
    '@title' => !empty($item->title) ? $item->title : 'untitled',
  );
  advancedqueue_log_message(format_string('[@queue:@id] Starting processing item @title.', $params));

  // Clear Drupal's static caches (including entity controllers) before
  // processing, so that each queue item can have a relatively fresh
  // start.
  drupal_static_reset();
  try {

    // Pass the claimed item itself and end date along to the worker
    // callback.
    $output = $function($item, $end_time);
    if (is_array($output)) {
      $item->status = $output['status'];
      $item->result = $output['result'];
    }
    else {
      $item->status = $output ? ADVANCEDQUEUE_STATUS_SUCCESS : ADVANCEDQUEUE_STATUS_FAILURE;
    }
  } catch (Exception $e) {
    $item->status = ADVANCEDQUEUE_STATUS_FAILURE;
    $params['!message'] = (string) $e;
    advancedqueue_log_message(format_string('[@queue:@id] failed processing: !message', $params), WATCHDOG_ERROR);
  }

  // Once we have a result, run the post-execute hook. Adventurers can use this
  // to override the result of jobs (stored on the $item object.)
  if (!$queue_info['skip hooks']) {
    $hook_func = module_exists('rules') ? 'rules_invoke_all' : 'module_invoke_all';
    $hook_func('advancedqueue_post_execute', $queue_name, $item);
  }
  $params['@status'] = $item->status;
  advancedqueue_log_message(format_string('[@queue:@id] Processing ended with result @status.', $params));

  // Requeue in case of failure.
  if ($item->status == ADVANCEDQUEUE_STATUS_FAILURE_RETRY && !empty($queue_info['retry after'])) {
    $item->data['advancedqueue_attempt'] = isset($item->data['advancedqueue_attempt']) ? $item->data['advancedqueue_attempt'] + 1 : 1;
    $item->created = time() + $queue_info['retry after'];

    // "max attempts" is optional, skip the attempts check if it's missing.
    $max_attempts = isset($queue_info['max attempts']) ? $queue_info['max attempts'] : 0;
    if (!$max_attempts || $item->data['advancedqueue_attempt'] <= $max_attempts) {
      $queue
        ->requeueItem($item);
      advancedqueue_log_message(format_string('[@queue:@id] failed processing and has been requeued.', $params), WATCHDOG_WARNING);
      return;
    }
    else {
      $item->status = ADVANCEDQUEUE_STATUS_FAILURE;
      advancedqueue_log_message(format_string('[@queue:@id] The maximum number of attempts has been reached, aborting.', $params), WATCHDOG_ERROR);
    }
  }
  if ($queue_info['delete when completed'] && empty($item->skip_deletion)) {

    // Item was processed, so we can "delete" it. This is not removing the
    // item from the database, but rather updates it with the status. Both
    // overall queues and individual items can skip the deletion process.
    $queue
      ->deleteItem($item);
  }
}

/**
 * Helper function to log a message.
 *
 * @param string $message
 *   The message to log.
 * @param int $severity
 *   The severity as one of the WATCHDOG_* constants.
 *
 * @see watchdog()
 */
function advancedqueue_log_message($message, $severity = WATCHDOG_DEBUG) {

  // The $variables argument to watchdog() is ignored in this case; the default
  // messages are easier to search if they are saved as plain strings.
  watchdog('advancedqueue', $message, array(), $severity);
}

/**
 * Form element validation handler for integer elements that must be positive.
 *
 * Similar to element_validate_integer_positive(), but allows for 0 values.
 *
 * @see advancedqueue_settings_form()
 */
function advancedqueue_element_validate_integer_positive($element, &$form_state) {
  $value = $element['#value'];
  if ($value !== '' && (!is_numeric($value) || intval($value) != $value || $value < 0)) {
    form_error($element, t('%name must be a positive integer.', array(
      '%name' => $element['#title'],
    )));
  }
}

/**
 * Implements hook_views_api().
 */
function advancedqueue_views_api() {
  return array(
    'api' => 2,
    'path' => drupal_get_path('module', 'advancedqueue') . '/views',
  );
}

/**
 * Helper function to clean the advancedqueue table.
 */
function _advancedqueue_cleanup_table() {

  // Purge processed items beyond a chosen threshold.
  $preserve_rows = variable_get('advancedqueue_threshold', 0);
  if ($preserve_rows) {

    // No limit means we don't remove old entries.
    _advancedqueue_purge_history($preserve_rows);
  }

  // Clean up timed out items stuck in ADVANCEDQUEUE_STATUS_PROCESSING.
  $timeout = variable_get('advancedqueue_release_timeout', 0);
  if ($timeout) {

    // No timeout means we don't touch stale items.
    _advancedqueue_release_stale_items($timeout);
  }
}

/**
 * Helper function to remove data we don't need anymore.
 *
 * Removes old entries of processed items.
 */
function _advancedqueue_purge_history($preserve_rows) {

  // Item status we want to clean.
  $statuses = array(
    ADVANCEDQUEUE_STATUS_SUCCESS,
    ADVANCEDQUEUE_STATUS_FAILURE,
  );

  // Find the timestamp of the Xth row.
  $delete_before = db_select('advancedqueue', 'a')
    ->fields('a', array(
    'created',
  ))
    ->condition('status', $statuses, 'IN')
    ->orderBy('created', 'DESC')
    ->range($preserve_rows - 1, 1)
    ->execute()
    ->fetchField();

  // Remove all items created before the selected timestamp.
  if ($delete_before) {
    db_delete('advancedqueue')
      ->condition('created', $delete_before, '<')
      ->condition('status', $statuses, 'IN')
      ->execute();
  }
}

/**
 * Helper function to release stale items.
 *
 * Requeues long expired entries that are in processing state.
 * Items can be stuck in the ADVANCEDQUEUE_STATUS_PROCESSING state
 * if the PHP process crashes or is killed while processing an item.
 */
function _advancedqueue_release_stale_items($timeout) {
  $before = REQUEST_TIME - $timeout;
  $items = db_select('advancedqueue', 'a')
    ->fields('a', array(
    'item_id',
    'name',
  ))
    ->condition('status', ADVANCEDQUEUE_STATUS_PROCESSING)
    ->condition('expire', $before, '<=')
    ->orderBy('name')
    ->execute();
  $queues = array();

  // Releasing stale items to put them back in queued status.
  foreach ($items as $item) {

    // DrupalQueue::get() statically caches queues objects,
    // we wouldn't improve performance by grouping items by queue.
    $queue = DrupalQueue::get($item->name);
    $queue
      ->releaseItem($item);
  }
}

Functions

Namesort descending Description
advancedqueue_cron Implements hook_cron().
advancedqueue_element_validate_integer_positive Form element validation handler for integer elements that must be positive.
advancedqueue_entity_info Implements hook_advancedqueue_entity_info().
advancedqueue_entity_property_info_alter Implements hook_entity_property_info_alter().
advancedqueue_form_system_cron_settings_alter Implements hook_form_FORM_ID_alter().
advancedqueue_get_queues_info Return queue(s) info.
advancedqueue_log_message Helper function to log a message.
advancedqueue_menu Implements hook_menu().
advancedqueue_process_item Process a queue item.
advancedqueue_views_api Implements hook_views_api().
_advancedqueue_cleanup_table Helper function to clean the advancedqueue table.
_advancedqueue_purge_history Helper function to remove data we don't need anymore.
_advancedqueue_release_stale_items Helper function to release stale items.
_advancedqueue_status_options

Constants

Namesort descending Description
ADVANCEDQUEUE_STATUS_FAILURE Status indicating item processing failed.
ADVANCEDQUEUE_STATUS_FAILURE_RETRY Status indicating item processing failed, and should be retried.
ADVANCEDQUEUE_STATUS_PROCESSING Status indicating item is currently being processed.
ADVANCEDQUEUE_STATUS_QUEUED Status indicating item was added to the queue.
ADVANCEDQUEUE_STATUS_SUCCESS Status indicating item was processed successfully.