View source
<?php
define('QUEUE_UI_BASE', 'admin/config/system/queue-ui');
function queue_ui_permission() {
return array(
'admin queue_ui' => array(
'title' => t('Administer queues'),
'description' => t('View, run, and delete queues'),
),
);
}
function queue_ui_menu() {
$items[QUEUE_UI_BASE] = array(
'title' => 'Queue manager',
'description' => 'View and manage queues',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'queue_ui_page',
),
'access arguments' => array(
'admin queue_ui',
),
'file' => 'queue_ui.pages.inc',
);
$items[QUEUE_UI_BASE . '/inspect/%'] = array(
'title' => 'View Queue: @name',
'title arguments' => array(
'@name' => 5,
),
'page callback' => 'queue_ui_view_queue',
'page arguments' => array(
5,
),
'access arguments' => array(
'admin queue_ui',
),
'type' => MENU_NORMAL_ITEM,
'file' => 'queue_ui.forms.inc',
);
$items[QUEUE_UI_BASE . '/%/view/%queue_ui_queue_item'] = array(
'title' => 'View Queue Item',
'description' => 'View the details of an individual queue item',
'page callback' => 'queue_ui_view_queue_item',
'page arguments' => array(
4,
6,
),
'access arguments' => array(
'admin queue_ui',
),
'type' => MENU_NORMAL_ITEM,
'file' => 'queue_ui.forms.inc',
);
$items[QUEUE_UI_BASE . '/%/release/%queue_ui_queue_item'] = array(
'title' => 'Release items',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'queue_ui_release_item_form',
4,
6,
),
'access arguments' => array(
'admin queue_ui',
),
'file' => 'queue_ui.forms.inc',
);
$items[QUEUE_UI_BASE . '/%/delete/%queue_ui_queue_item'] = array(
'title' => 'Delete items',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'queue_ui_delete_item_form',
4,
6,
),
'access arguments' => array(
'admin queue_ui',
),
'file' => 'queue_ui.forms.inc',
);
return $items;
}
function queue_ui_queue_load($name) {
return DrupalQueue::get($name);
}
function queue_ui_queue_item_load($queue_item) {
return $queue_item;
}
function queue_ui_test() {
$queue = DrupalQueue::get('queue ui test_me');
$queue
->createQueue();
$num = mt_rand(0, 99);
for ($i = 0; $i < $num; $i++) {
$queue
->createItem(time());
}
}
function _queue_ui_queueclass($queue_name) {
$queue = DrupalQueue::get($queue_name);
$class = get_class($queue);
return QueueUI::get('QueueUI' . $class);
}
function queue_ui_queue_class_info() {
return array(
'SystemQueue' => array(
'title' => t('System Queue (database Queue)'),
'inspect' => TRUE,
'operations' => array(
'view',
'release',
'delete',
'deleteall',
),
),
'MemoryQueue' => array(
'title' => t('Memory queue (non-reliable)'),
'inspect' => FALSE,
),
);
}
function queue_ui_get_queue_class_info($class) {
$classes =& drupal_static(__FUNCTION__);
if (!isset($classes)) {
$classes = module_invoke_all('queue_class_info');
}
return $classes[$class];
}
function queue_ui_queue_names() {
$result = db_query("SELECT DISTINCT name FROM {queue}");
return $result
->fetchAll();
}
function queue_ui_queues() {
$queues = array();
$queue_names = queue_ui_queue_names();
if (!empty($queue_names)) {
foreach ($queue_names as $name) {
$queue = DrupalQueue::get($name->name);
$class = get_class($queue);
$queues[$class][$name->name] = array(
'queue' => $queue,
'items' => $queue
->numberOfItems(),
);
}
}
return $queues;
}
function queue_ui_defined_queues() {
$queues =& drupal_static(__FUNCTION__);
if (!isset($queues)) {
$queues = module_invoke_all('queue_info');
}
return $queues;
}
function queue_ui_cron() {
$defs = queue_ui_defined_queues();
if (!empty($defs)) {
foreach ($defs as $name => $definition) {
$queue = DrupalQueue::get($name);
if (isset($definition['cron']) && is_object($queue) && $queue
->numberOfItems()) {
$active = variable_get('queue_ui_cron_' . $name, FALSE);
if ($active) {
$function = $definition['cron']['callback'];
$args = isset($definition['cron']['args']) ? $definition['cron']['args'] : array();
array_unshift($args, $queue);
call_user_func_array($function, $args);
}
}
}
}
}
function queue_ui_queue_info() {
return array(
'queue ui test_me' => array(
'title' => t('Test queue'),
'batch' => array(
'operations' => array(
array(
'queue_ui_batch_test',
array(),
),
),
'finished' => 'queue_ui_batch_finished',
'title' => t('Processing test queue'),
),
'cron' => array(
'callback' => 'queue_ui_test_process',
'args' => array(
10,
),
),
),
);
}
function queue_ui_test_process($queue, $limit = 20) {
$count = $queue
->numberOfItems();
for ($i = 0; $i < $limit && $count > 0; $i++) {
$item = $queue
->claimItem(20);
if ($item) {
$queue
->deleteItem($item);
$count--;
}
}
}
function queue_ui_batch_test($queue, &$context) {
if (empty($context['sandbox'])) {
$context['sandbox']['progress'] = 0;
$context['sandbox']['current'] = 0;
$context['sandbox']['max'] = $queue
->numberOfItems();
}
for ($i = 0; $i < 20 && $context['sandbox']['current'] < $context['sandbox']['max']; $i++) {
$item = $queue
->claimItem(20);
if ($item) {
$queue
->deleteItem($item);
}
$context['sandbox']['progress']++;
$context['sandbox']['current']++;
}
if ($context['sandbox']['progress'] != $context['sandbox']['max']) {
$context['finished'] = $context['sandbox']['progress'] / $context['sandbox']['max'];
}
}
function queue_ui_batch_finished($success, $results, $operations) {
if ($success) {
$message = 'success';
}
else {
$message = t('An error occured @todo.');
}
drupal_set_message($message);
}
function queue_ui_cron_queue_batch_process($queue_name, $cron_queue_info, &$context) {
$queue = DrupalQueue::get($queue_name);
$function = $cron_queue_info['worker callback'];
if (empty($context['sandbox'])) {
$context['sandbox']['progress'] = 0;
$context['sandbox']['max'] = $queue
->numberOfItems();
}
$end = time() + (isset($cron_queue_info['time']) ? $cron_queue_info['time'] : 15);
while (time() < $end && ($item = $queue
->claimItem())) {
try {
$function($item->data);
$queue
->deleteItem($item);
} catch (Exception $e) {
watchdog_exception('queue_ui', $e);
drupal_set_message($e
->getMessage(), 'error', FALSE);
}
$context['sandbox']['progress']++;
}
if (empty($item)) {
$context['finished'] = 1;
}
else {
$context['finished'] = 0;
}
$context['message'] = t('Processed !count items in queue @name.', array(
'!count' => $context['sandbox']['progress'],
'@name' => $queue_name,
));
}