You are here

mob_queue.drush.inc in Drush Queue Handling 7

Same filename and directory in other branches
  1. 8 mob_queue.drush.inc

Drush commands for Drush Queue Handling.

File

mob_queue.drush.inc
View source
<?php

/**
 * @file
 * Drush commands for Drush Queue Handling.
 */

/**
 * Implements hook_drush_command().
 */
function mob_queue_drush_command() {
  $items = array();
  $items['mob-exe-queue'] = array(
    'description' => "Execute mob_queue queued tasks.",
    'examples' => array(
      'drush mob-queue' => 'Go a sprint to finish the tasks in the queues for mob-queue',
    ),
    'arguments' => array(
      'time' => dt('Total execution time for this command.'),
    ),
    'options' => array(
      'no-reset-expired' => "Do not reset expired items from the queue table.",
    ),
    'aliases' => array(
      'meq',
    ),
  );
  return $items;
}

/**
 * Implements hook_drush_help().
 */
function mob_queue_drush_help($section) {
  switch ($section) {
    case 'drush:mob-queue':
      return dt("Execute queue tasks.");
  }
}

/**
 * Run the queued job.
 */
function drush_mob_queue_mob_exe_queue($time = 900) {

  // Allow execution to continue even if the request gets canceled.
  @ignore_user_abort(TRUE);
  if (!drush_get_option('no-reset-expired', FALSE)) {

    // Reset expired items in the default queue implementation table.
    $updated = db_update('queue')
      ->fields(array(
      'expire' => 0,
    ))
      ->condition('expire', 0, '<>')
      ->condition('expire', REQUEST_TIME, '<')
      ->execute();
    drush_log(dt('!updated expired items reset.', array(
      '!updated' => $updated,
    )));
  }

  // Prevent session information from being saved while cron is running.
  $original_session_saving = drupal_save_session();
  drupal_save_session(FALSE);

  // Force the current user to anonymous to ensure consistent permissions on
  // cron runs.
  $original_user = $GLOBALS['user'];
  $GLOBALS['user'] = drupal_anonymous_user();

  // Try to allocate enough time to run all the hook_cron implementations.
  drupal_set_time_limit($time);
  $return = FALSE;

  // Grab the defined cron queues.
  $queues = module_invoke_all('cron_queue_info');
  drupal_alter('cron_queue_info', $queues);
  foreach ($queues as $name => $queue) {
    if (variable_get('mob_queue_' . $name, 0)) {
      $queues[$name]['mob_queue'] = TRUE;
    }
  }
  drupal_alter('mob_queue_cron_queue_info', $queues);

  // Make sure every queue exists. There is no harm in trying to recreate an
  // existing queue.
  foreach ($queues as $queue_name => $info) {
    DrupalQueue::get($queue_name)
      ->createQueue();
  }
  reset($queues);
  while (TRUE) {

    // Due to backwards incompatible changes on array handling from PHP 5.x to
    // 7.x, looping code must be kept simple. For instance, current item pointer
    // should be advanced manually, since foreach() does not starting from 7.0.
    // @see #2974823 for more info.
    $queue_name = key($queues);
    $info = current($queues);
    if ($queue_name === NULL && $info === FALSE) {
      break;
    }
    next($queues);

    // Allow other modules to alter the queues listing, order or next queue to process.
    drupal_alter('mob_queue_queue_processing', $queue_name, $info, $queues);
    if (empty($info['mob_queue'])) {
      continue;
    }
    $function = $info['worker callback'];
    $end = time() + (isset($info['time']) ? $info['time'] : $time);
    $leasetime = isset($info['leasetime']) ? $info['leasetime'] : 30;
    $queue = DrupalQueue::get($queue_name);
    while (time() < $end) {

      // Allow other modules to skip processing this queue.
      $invoked = module_invoke_all('mob_queue_queue_item_processing', $queue_name, $info);
      if (!empty($invoked)) {
        if (in_array(FALSE, $invoked, TRUE)) {
          break;
        }
      }

      // Claim an item from the queue.
      $item = $queue
        ->claimItem($leasetime);
      if (!$item) {
        drush_log(dt("No items to process on '!queue-name' queue.", array(
          '!queue-name' => $queue_name,
        )));
        break;
      }
      drush_log(dt("Processing '!queue-name' queue, item id: !item-id.", array(
        '!item-id' => $item->item_id,
        '!queue-name' => $queue_name,
      )));

      // Call the item's queue worker function.
      $e = NULL;
      try {
        $function($item->data);
        $queue
          ->deleteItem($item);
      } catch (Exception $e) {

        // In case of exception log it and leave the item in the queue
        // to be processed again later.
        watchdog_exception('mob_queue', $e);
      }

      // Inform other modules that an item has been processed.
      module_invoke_all('mob_queue_queue_item_processed', $queue_name, $item, $e);
    }
    module_invoke_all('mob_queue_queue_processed', $queue_name, $info, $queues);
  }

  // Restore the user.
  $GLOBALS['user'] = $original_user;
  drupal_save_session($original_session_saving);
  $return = TRUE;
  return $return;
}

Functions

Namesort descending Description
drush_mob_queue_mob_exe_queue Run the queued job.
mob_queue_drush_command Implements hook_drush_command().
mob_queue_drush_help Implements hook_drush_help().