View source
<?php
define('ADVANCEDQUEUE_STATUS_QUEUED', -1);
define('ADVANCEDQUEUE_STATUS_PROCESSING', 0);
define('ADVANCEDQUEUE_STATUS_SUCCESS', 1);
define('ADVANCEDQUEUE_STATUS_FAILURE', 2);
define('ADVANCEDQUEUE_STATUS_FAILURE_RETRY', 3);
function advancedqueue_entity_info() {
$entity_info['advancedqueue_item'] = array(
'label' => t('Advanced queue item'),
'controller class' => module_exists('entity') ? 'EntityAPIController' : 'DrupalDefaultEntityController',
'metadata controller class' => 'EntityDefaultMetadataController',
'base table' => 'advancedqueue',
'module' => 'advancedqueue',
'entity keys' => array(
'id' => 'item_id',
'label' => 'title',
),
);
return $entity_info;
}
function advancedqueue_entity_property_info_alter(&$info) {
$aq =& $info['advancedqueue_item'];
foreach ($aq['properties'] as $name => &$prop) {
$prop['setter callback'] = 'entity_property_verbatim_set';
}
$aq['properties']['status']['options list'] = '_advancedqueue_status_options';
}
function _advancedqueue_status_options() {
static $options = array(
ADVANCEDQUEUE_STATUS_QUEUED => 'Queued',
ADVANCEDQUEUE_STATUS_PROCESSING => 'Processing',
ADVANCEDQUEUE_STATUS_SUCCESS => 'Processed',
ADVANCEDQUEUE_STATUS_FAILURE => 'Failed',
ADVANCEDQUEUE_STATUS_FAILURE_RETRY => 'Retry',
);
return $options;
}
function advancedqueue_menu() {
$items = array();
$items['admin/config/system/advancedqueue'] = array(
'title' => 'Advanced Queue',
'description' => 'Configure settings for the Advanced Queue module.',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'advancedqueue_settings_form',
),
'access arguments' => array(
'administer site configuration',
),
'type' => MENU_NORMAL_ITEM,
'file' => 'advancedqueue.admin.inc',
);
return $items;
}
function advancedqueue_cron() {
_advancedqueue_cleanup_table();
if (!variable_get('advancedqueue_use_cron', FALSE)) {
return;
}
if (!($queues = advancedqueue_get_queues_info())) {
return;
}
$end = time() + variable_get('advancedqueue_processing_timeout_cron', 60);
foreach ($queues as $queue_name => $queue_info) {
$queue = DrupalQueue::get($queue_name);
while ($item = $queue
->claimItem($queue_info['lease time'])) {
advancedqueue_process_item($queue, $queue_name, $queue_info, $item, $end);
if (time() > $end) {
return;
}
}
}
}
function advancedqueue_form_system_cron_settings_alter(&$form, $form_state, $form_id) {
$form['advancedqueue_use_cron'] = array(
'#type' => 'checkbox',
'#title' => t('Process Advanced Queue via Cron'),
'#multiple' => TRUE,
'#description' => t('Enable to allow queue items to to be processed using Cron. This is a "poor man\'s" option that allows processing the queue, as the better solution would be to execute the Drush command via the command line.'),
'#default_value' => variable_get('advancedqueue_use_cron', FALSE),
);
}
function advancedqueue_get_queues_info($queue_names = array()) {
$queues_info =& drupal_static(__FUNCTION__, array());
if (empty($queues_info)) {
$queues_info = module_invoke_all('advanced_queue_info');
foreach ($queues_info as &$queue_info) {
$queue_info += array(
'delete when completed' => TRUE,
'retry after' => FALSE,
'max attempts' => 10,
'lease time' => 30,
'skip hooks' => FALSE,
'groups' => array(),
);
}
drupal_alter('advanced_queue_info', $queues_info);
uasort($queues_info, 'drupal_sort_weight');
}
if ($queue_names) {
return array_intersect_key($queues_info, $queue_names);
}
return $queues_info;
}
function advancedqueue_process_item($queue, $queue_name, $queue_info, $item, $end_time = FALSE) {
if (!$queue_info['skip hooks']) {
$hook_func = module_exists('rules') ? 'rules_invoke_all' : 'module_invoke_all';
$hook_func('advancedqueue_pre_execute', $queue_name, $item);
}
$function = $queue_info['worker callback'];
if (!empty($queue_info['worker include'])) {
call_user_func_array('module_load_include', $queue_info['worker include']);
}
$params = array(
'@queue' => $queue_name,
'@id' => $item->item_id,
'@title' => !empty($item->title) ? $item->title : 'untitled',
);
advancedqueue_log_message(format_string('[@queue:@id] Starting processing item @title.', $params));
drupal_static_reset();
try {
$output = $function($item, $end_time);
if (is_array($output)) {
$item->status = $output['status'];
$item->result = $output['result'];
}
else {
$item->status = $output ? ADVANCEDQUEUE_STATUS_SUCCESS : ADVANCEDQUEUE_STATUS_FAILURE;
}
} catch (Exception $e) {
$item->status = ADVANCEDQUEUE_STATUS_FAILURE;
$params['!message'] = (string) $e;
advancedqueue_log_message(format_string('[@queue:@id] failed processing: !message', $params), WATCHDOG_ERROR);
}
if (!$queue_info['skip hooks']) {
$hook_func = module_exists('rules') ? 'rules_invoke_all' : 'module_invoke_all';
$hook_func('advancedqueue_post_execute', $queue_name, $item);
}
$params['@status'] = $item->status;
advancedqueue_log_message(format_string('[@queue:@id] Processing ended with result @status.', $params));
if ($item->status == ADVANCEDQUEUE_STATUS_FAILURE_RETRY && !empty($queue_info['retry after'])) {
$item->data['advancedqueue_attempt'] = isset($item->data['advancedqueue_attempt']) ? $item->data['advancedqueue_attempt'] + 1 : 1;
$item->created = time() + $queue_info['retry after'];
$max_attempts = isset($queue_info['max attempts']) ? $queue_info['max attempts'] : 0;
if (!$max_attempts || $item->data['advancedqueue_attempt'] <= $max_attempts) {
$queue
->requeueItem($item);
advancedqueue_log_message(format_string('[@queue:@id] failed processing and has been requeued.', $params), WATCHDOG_WARNING);
return;
}
else {
$item->status = ADVANCEDQUEUE_STATUS_FAILURE;
advancedqueue_log_message(format_string('[@queue:@id] The maximum number of attempts has been reached, aborting.', $params), WATCHDOG_ERROR);
}
}
if ($queue_info['delete when completed'] && empty($item->skip_deletion)) {
$queue
->deleteItem($item);
}
}
function advancedqueue_log_message($message, $severity = WATCHDOG_DEBUG) {
watchdog('advancedqueue', $message, array(), $severity);
}
function advancedqueue_element_validate_integer_positive($element, &$form_state) {
$value = $element['#value'];
if ($value !== '' && (!is_numeric($value) || intval($value) != $value || $value < 0)) {
form_error($element, t('%name must be a positive integer.', array(
'%name' => $element['#title'],
)));
}
}
function advancedqueue_views_api() {
return array(
'api' => 2,
'path' => drupal_get_path('module', 'advancedqueue') . '/views',
);
}
function _advancedqueue_cleanup_table() {
$preserve_rows = variable_get('advancedqueue_threshold', 0);
if ($preserve_rows) {
_advancedqueue_purge_history($preserve_rows);
}
$timeout = variable_get('advancedqueue_release_timeout', 0);
if ($timeout) {
_advancedqueue_release_stale_items($timeout);
}
}
function _advancedqueue_purge_history($preserve_rows) {
$statuses = array(
ADVANCEDQUEUE_STATUS_SUCCESS,
ADVANCEDQUEUE_STATUS_FAILURE,
);
$delete_before = db_select('advancedqueue', 'a')
->fields('a', array(
'created',
))
->condition('status', $statuses, 'IN')
->orderBy('created', 'DESC')
->range($preserve_rows - 1, 1)
->execute()
->fetchField();
if ($delete_before) {
db_delete('advancedqueue')
->condition('created', $delete_before, '<')
->condition('status', $statuses, 'IN')
->execute();
}
}
function _advancedqueue_release_stale_items($timeout) {
$before = REQUEST_TIME - $timeout;
$items = db_select('advancedqueue', 'a')
->fields('a', array(
'item_id',
'name',
))
->condition('status', ADVANCEDQUEUE_STATUS_PROCESSING)
->condition('expire', $before, '<=')
->orderBy('name')
->execute();
$queues = array();
foreach ($items as $item) {
$queue = DrupalQueue::get($item->name);
$queue
->releaseItem($item);
}
}