You are here

function advancedqueue_process_item in Advanced Queue 7

Process a queue item.

Parameters

$queue: The queue object.

$queue_name: The queue name.

$queue_info: The queue info.

$item: The item to process.

$end_time: (Optional) The time the process should end.

2 calls to advancedqueue_process_item()
advancedqueue_cron in ./advancedqueue.module
Implements hook_cron().
drush_advancedqueue_process_queue in drush/advancedqueue.drush.inc
Command callback for drush advancedqueue-process-queue.

File

./advancedqueue.module, line 185
Helper module for advanced queuing.

Code

function advancedqueue_process_item($queue, $queue_name, $queue_info, $item, $end_time = FALSE) {

  // First our pre-execute hook.
  if (!$queue_info['skip hooks']) {
    $hook_func = module_exists('rules') ? 'rules_invoke_all' : 'module_invoke_all';
    $hook_func('advancedqueue_pre_execute', $queue_name, $item);
  }
  $function = $queue_info['worker callback'];
  if (!empty($queue_info['worker include'])) {

    // This parameter, if set, is the 3 args to module_load_include, in order.
    // This allows worker callbacks to live outside the .module file.
    call_user_func_array('module_load_include', $queue_info['worker include']);
  }
  $params = array(
    '@queue' => $queue_name,
    '@id' => $item->item_id,
    '@title' => !empty($item->title) ? $item->title : 'untitled',
  );
  advancedqueue_log_message(format_string('[@queue:@id] Starting processing item @title.', $params));

  // Clear Drupal's static caches (including entity controllers) before
  // processing, so that each queue item can have a relatively fresh
  // start.
  drupal_static_reset();
  try {

    // Pass the claimed item itself and end date along to the worker
    // callback.
    $output = $function($item, $end_time);
    if (is_array($output)) {
      $item->status = $output['status'];
      $item->result = $output['result'];
    }
    else {
      $item->status = $output ? ADVANCEDQUEUE_STATUS_SUCCESS : ADVANCEDQUEUE_STATUS_FAILURE;
    }
  } catch (Exception $e) {
    $item->status = ADVANCEDQUEUE_STATUS_FAILURE;
    $params['!message'] = (string) $e;
    advancedqueue_log_message(format_string('[@queue:@id] failed processing: !message', $params), WATCHDOG_ERROR);
  }

  // Once we have a result, run the post-execute hook. Adventurers can use this
  // to override the result of jobs (stored on the $item object.)
  if (!$queue_info['skip hooks']) {
    $hook_func = module_exists('rules') ? 'rules_invoke_all' : 'module_invoke_all';
    $hook_func('advancedqueue_post_execute', $queue_name, $item);
  }
  $params['@status'] = $item->status;
  advancedqueue_log_message(format_string('[@queue:@id] Processing ended with result @status.', $params));

  // Requeue in case of failure.
  if ($item->status == ADVANCEDQUEUE_STATUS_FAILURE_RETRY && !empty($queue_info['retry after'])) {
    $item->data['advancedqueue_attempt'] = isset($item->data['advancedqueue_attempt']) ? $item->data['advancedqueue_attempt'] + 1 : 1;
    $item->created = time() + $queue_info['retry after'];

    // "max attempts" is optional, skip the attempts check if it's missing.
    $max_attempts = isset($queue_info['max attempts']) ? $queue_info['max attempts'] : 0;
    if (!$max_attempts || $item->data['advancedqueue_attempt'] <= $max_attempts) {
      $queue
        ->requeueItem($item);
      advancedqueue_log_message(format_string('[@queue:@id] failed processing and has been requeued.', $params), WATCHDOG_WARNING);
      return;
    }
    else {
      $item->status = ADVANCEDQUEUE_STATUS_FAILURE;
      advancedqueue_log_message(format_string('[@queue:@id] The maximum number of attempts has been reached, aborting.', $params), WATCHDOG_ERROR);
    }
  }
  if ($queue_info['delete when completed'] && empty($item->skip_deletion)) {

    // Item was processed, so we can "delete" it. This is not removing the
    // item from the database, but rather updates it with the status. Both
    // overall queues and individual items can skip the deletion process.
    $queue
      ->deleteItem($item);
  }
}