You are here

function drush_mob_queue_mob_exe_queue in Drush Queue Handling 7

Same name and namespace in other branches
  1. 8 mob_queue.drush.inc \drush_mob_queue_mob_exe_queue()

Run the queued job.

File

./mob_queue.drush.inc, line 43
Drush commands for Drush Queue Handling.

Code

function drush_mob_queue_mob_exe_queue($time = 900) {

  // Allow execution to continue even if the request gets canceled.
  @ignore_user_abort(TRUE);
  if (!drush_get_option('no-reset-expired', FALSE)) {

    // Reset expired items in the default queue implementation table.
    $updated = db_update('queue')
      ->fields(array(
      'expire' => 0,
    ))
      ->condition('expire', 0, '<>')
      ->condition('expire', REQUEST_TIME, '<')
      ->execute();
    drush_log(dt('!updated expired items reset.', array(
      '!updated' => $updated,
    )));
  }

  // Prevent session information from being saved while cron is running.
  $original_session_saving = drupal_save_session();
  drupal_save_session(FALSE);

  // Force the current user to anonymous to ensure consistent permissions on
  // cron runs.
  $original_user = $GLOBALS['user'];
  $GLOBALS['user'] = drupal_anonymous_user();

  // Try to allocate enough time to run all the hook_cron implementations.
  drupal_set_time_limit($time);
  $return = FALSE;

  // Grab the defined cron queues.
  $queues = module_invoke_all('cron_queue_info');
  drupal_alter('cron_queue_info', $queues);
  foreach ($queues as $name => $queue) {
    if (variable_get('mob_queue_' . $name, 0)) {
      $queues[$name]['mob_queue'] = TRUE;
    }
  }
  drupal_alter('mob_queue_cron_queue_info', $queues);

  // Make sure every queue exists. There is no harm in trying to recreate an
  // existing queue.
  foreach ($queues as $queue_name => $info) {
    DrupalQueue::get($queue_name)
      ->createQueue();
  }
  reset($queues);
  while (TRUE) {

    // Due to backwards incompatible changes on array handling from PHP 5.x to
    // 7.x, looping code must be kept simple. For instance, current item pointer
    // should be advanced manually, since foreach() does not starting from 7.0.
    // @see #2974823 for more info.
    $queue_name = key($queues);
    $info = current($queues);
    if ($queue_name === NULL && $info === FALSE) {
      break;
    }
    next($queues);

    // Allow other modules to alter the queues listing, order or next queue to process.
    drupal_alter('mob_queue_queue_processing', $queue_name, $info, $queues);
    if (empty($info['mob_queue'])) {
      continue;
    }
    $function = $info['worker callback'];
    $end = time() + (isset($info['time']) ? $info['time'] : $time);
    $leasetime = isset($info['leasetime']) ? $info['leasetime'] : 30;
    $queue = DrupalQueue::get($queue_name);
    while (time() < $end) {

      // Allow other modules to skip processing this queue.
      $invoked = module_invoke_all('mob_queue_queue_item_processing', $queue_name, $info);
      if (!empty($invoked)) {
        if (in_array(FALSE, $invoked, TRUE)) {
          break;
        }
      }

      // Claim an item from the queue.
      $item = $queue
        ->claimItem($leasetime);
      if (!$item) {
        drush_log(dt("No items to process on '!queue-name' queue.", array(
          '!queue-name' => $queue_name,
        )));
        break;
      }
      drush_log(dt("Processing '!queue-name' queue, item id: !item-id.", array(
        '!item-id' => $item->item_id,
        '!queue-name' => $queue_name,
      )));

      // Call the item's queue worker function.
      $e = NULL;
      try {
        $function($item->data);
        $queue
          ->deleteItem($item);
      } catch (Exception $e) {

        // In case of exception log it and leave the item in the queue
        // to be processed again later.
        watchdog_exception('mob_queue', $e);
      }

      // Inform other modules that an item has been processed.
      module_invoke_all('mob_queue_queue_item_processed', $queue_name, $item, $e);
    }
    module_invoke_all('mob_queue_queue_processed', $queue_name, $info, $queues);
  }

  // Restore the user.
  $GLOBALS['user'] = $original_user;
  drupal_save_session($original_session_saving);
  $return = TRUE;
  return $return;
}