function advancedqueue_process_item in Advanced Queue 7
Process a queue item.
Parameters
$queue: The queue object.
$queue_name: The queue name.
$queue_info: The queue info.
$item: The item to process.
$end_time: (Optional) The time the process should end.
2 calls to advancedqueue_process_item()
- advancedqueue_cron in ./
advancedqueue.module - Implements hook_cron().
- drush_advancedqueue_process_queue in drush/
advancedqueue.drush.inc - Command callback for drush advancedqueue-process-queue.
File
- ./
advancedqueue.module, line 185 - Helper module for advanced queuing.
Code
function advancedqueue_process_item($queue, $queue_name, $queue_info, $item, $end_time = FALSE) {
// First our pre-execute hook.
if (!$queue_info['skip hooks']) {
$hook_func = module_exists('rules') ? 'rules_invoke_all' : 'module_invoke_all';
$hook_func('advancedqueue_pre_execute', $queue_name, $item);
}
$function = $queue_info['worker callback'];
if (!empty($queue_info['worker include'])) {
// This parameter, if set, is the 3 args to module_load_include, in order.
// This allows worker callbacks to live outside the .module file.
call_user_func_array('module_load_include', $queue_info['worker include']);
}
$params = array(
'@queue' => $queue_name,
'@id' => $item->item_id,
'@title' => !empty($item->title) ? $item->title : 'untitled',
);
advancedqueue_log_message(format_string('[@queue:@id] Starting processing item @title.', $params));
// Clear Drupal's static caches (including entity controllers) before
// processing, so that each queue item can have a relatively fresh
// start.
drupal_static_reset();
try {
// Pass the claimed item itself and end date along to the worker
// callback.
$output = $function($item, $end_time);
if (is_array($output)) {
$item->status = $output['status'];
$item->result = $output['result'];
}
else {
$item->status = $output ? ADVANCEDQUEUE_STATUS_SUCCESS : ADVANCEDQUEUE_STATUS_FAILURE;
}
} catch (Exception $e) {
$item->status = ADVANCEDQUEUE_STATUS_FAILURE;
$params['!message'] = (string) $e;
advancedqueue_log_message(format_string('[@queue:@id] failed processing: !message', $params), WATCHDOG_ERROR);
}
// Once we have a result, run the post-execute hook. Adventurers can use this
// to override the result of jobs (stored on the $item object.)
if (!$queue_info['skip hooks']) {
$hook_func = module_exists('rules') ? 'rules_invoke_all' : 'module_invoke_all';
$hook_func('advancedqueue_post_execute', $queue_name, $item);
}
$params['@status'] = $item->status;
advancedqueue_log_message(format_string('[@queue:@id] Processing ended with result @status.', $params));
// Requeue in case of failure.
if ($item->status == ADVANCEDQUEUE_STATUS_FAILURE_RETRY && !empty($queue_info['retry after'])) {
$item->data['advancedqueue_attempt'] = isset($item->data['advancedqueue_attempt']) ? $item->data['advancedqueue_attempt'] + 1 : 1;
$item->created = time() + $queue_info['retry after'];
// "max attempts" is optional, skip the attempts check if it's missing.
$max_attempts = isset($queue_info['max attempts']) ? $queue_info['max attempts'] : 0;
if (!$max_attempts || $item->data['advancedqueue_attempt'] <= $max_attempts) {
$queue
->requeueItem($item);
advancedqueue_log_message(format_string('[@queue:@id] failed processing and has been requeued.', $params), WATCHDOG_WARNING);
return;
}
else {
$item->status = ADVANCEDQUEUE_STATUS_FAILURE;
advancedqueue_log_message(format_string('[@queue:@id] The maximum number of attempts has been reached, aborting.', $params), WATCHDOG_ERROR);
}
}
if ($queue_info['delete when completed'] && empty($item->skip_deletion)) {
// Item was processed, so we can "delete" it. This is not removing the
// item from the database, but rather updates it with the status. Both
// overall queues and individual items can skip the deletion process.
$queue
->deleteItem($item);
}
}