function drush_mob_queue_mob_exe_queue in Drush Queue Handling 7
Same name and namespace in other branches
- 8 mob_queue.drush.inc \drush_mob_queue_mob_exe_queue()
Run the queued job.
File
- ./
mob_queue.drush.inc, line 43 - Drush commands for Drush Queue Handling.
Code
function drush_mob_queue_mob_exe_queue($time = 900) {
// Allow execution to continue even if the request gets canceled.
@ignore_user_abort(TRUE);
if (!drush_get_option('no-reset-expired', FALSE)) {
// Reset expired items in the default queue implementation table.
$updated = db_update('queue')
->fields(array(
'expire' => 0,
))
->condition('expire', 0, '<>')
->condition('expire', REQUEST_TIME, '<')
->execute();
drush_log(dt('!updated expired items reset.', array(
'!updated' => $updated,
)));
}
// Prevent session information from being saved while cron is running.
$original_session_saving = drupal_save_session();
drupal_save_session(FALSE);
// Force the current user to anonymous to ensure consistent permissions on
// cron runs.
$original_user = $GLOBALS['user'];
$GLOBALS['user'] = drupal_anonymous_user();
// Try to allocate enough time to run all the hook_cron implementations.
drupal_set_time_limit($time);
$return = FALSE;
// Grab the defined cron queues.
$queues = module_invoke_all('cron_queue_info');
drupal_alter('cron_queue_info', $queues);
foreach ($queues as $name => $queue) {
if (variable_get('mob_queue_' . $name, 0)) {
$queues[$name]['mob_queue'] = TRUE;
}
}
drupal_alter('mob_queue_cron_queue_info', $queues);
// Make sure every queue exists. There is no harm in trying to recreate an
// existing queue.
foreach ($queues as $queue_name => $info) {
DrupalQueue::get($queue_name)
->createQueue();
}
reset($queues);
while (TRUE) {
// Due to backwards incompatible changes on array handling from PHP 5.x to
// 7.x, looping code must be kept simple. For instance, current item pointer
// should be advanced manually, since foreach() does not starting from 7.0.
// @see #2974823 for more info.
$queue_name = key($queues);
$info = current($queues);
if ($queue_name === NULL && $info === FALSE) {
break;
}
next($queues);
// Allow other modules to alter the queues listing, order or next queue to process.
drupal_alter('mob_queue_queue_processing', $queue_name, $info, $queues);
if (empty($info['mob_queue'])) {
continue;
}
$function = $info['worker callback'];
$end = time() + (isset($info['time']) ? $info['time'] : $time);
$leasetime = isset($info['leasetime']) ? $info['leasetime'] : 30;
$queue = DrupalQueue::get($queue_name);
while (time() < $end) {
// Allow other modules to skip processing this queue.
$invoked = module_invoke_all('mob_queue_queue_item_processing', $queue_name, $info);
if (!empty($invoked)) {
if (in_array(FALSE, $invoked, TRUE)) {
break;
}
}
// Claim an item from the queue.
$item = $queue
->claimItem($leasetime);
if (!$item) {
drush_log(dt("No items to process on '!queue-name' queue.", array(
'!queue-name' => $queue_name,
)));
break;
}
drush_log(dt("Processing '!queue-name' queue, item id: !item-id.", array(
'!item-id' => $item->item_id,
'!queue-name' => $queue_name,
)));
// Call the item's queue worker function.
$e = NULL;
try {
$function($item->data);
$queue
->deleteItem($item);
} catch (Exception $e) {
// In case of exception log it and leave the item in the queue
// to be processed again later.
watchdog_exception('mob_queue', $e);
}
// Inform other modules that an item has been processed.
module_invoke_all('mob_queue_queue_item_processed', $queue_name, $item, $e);
}
module_invoke_all('mob_queue_queue_processed', $queue_name, $info, $queues);
}
// Restore the user.
$GLOBALS['user'] = $original_user;
drupal_save_session($original_session_saving);
$return = TRUE;
return $return;
}