You are here

function drush_advancedqueue_process_queue in Advanced Queue 7

Command callback for drush advancedqueue-process-queue.

Parameters

string $queue_args: Arbitrary string. The name of the queue to work with.

File

drush/advancedqueue.drush.inc, line 40
Drush worker for Advanced-queue.

Code

function drush_advancedqueue_process_queue($queue_args = NULL) {

  // Load information about the registered queues, and sort them by weight.
  if (!($queues_info = advancedqueue_get_queues_info())) {
    return drush_set_error(dt('No queues exist.'));
  }
  $all_option = drush_get_option('all');
  $group_option = drush_get_option('group');
  if (!$all_option && !$group_option && empty($queue_args)) {
    return drush_set_error(dt('You have to specify either a set of queues or the --all or --group parameter.'));
  }
  if ($all_option) {
    $queues = $queues_info;
  }
  elseif ($group_option) {
    $groups = array();
    foreach ($queues_info as $queue_name => $queue_info) {

      // Each queue can define one or more groups. Thanks to the defaults, we
      // can be sure this key is an array.
      foreach ($queue_info['groups'] as $group) {
        if (!isset($groups[$group])) {

          // These sub-arrays need to be initialized or we can't append to them.
          $groups[$group] = array();
        }
        $groups[$group][] = $queue_name;
      }
    }
    if (!isset($groups[$group_option])) {
      return drush_set_error(dt('Invalid !group queue group. Aborting.', array(
        '!group' => $group_option,
      )));
    }
    $queues = drupal_map_assoc($groups[$group_option]);
    $queues = array_intersect_key($queues_info, $queues);
  }
  else {

    // Validate queues.
    $queues = drupal_map_assoc(explode(',', $queue_args));
    if ($invalid_queues = array_diff_key($queues, $queues_info)) {
      return drush_set_error(dt('The following queues are invalid: !queues. Aborting.', array(
        '!queues' => implode(', ', $invalid_queues),
      )));
    }
    $queues = array_intersect_key($queues_info, $queues);
  }

  // Delete older entries and make sure there are no stale items in the table.
  drush_log(dt('Cleanup processed and stale items.'));
  _advancedqueue_cleanup_table();

  // Run the worker for a certain period of time before killing it.
  $timeout = drush_get_option('timeout') ?: variable_get('advancedqueue_processing_timeout_drush', 0);
  $end = $timeout ? time() + $timeout : 0;
  drush_log(dt('Starting processing loop.'));
  while (!$end || time() < $end) {
    foreach ($queues as $queue_name => $queue_info) {
      $queue = DrupalQueue::get($queue_name);
      if ($item = $queue
        ->claimItem($queue_info['lease time'])) {
        advancedqueue_process_item($queue, $queue_name, $queue_info, $item, $end);
        continue 2;
      }
    }

    // No item processed in that round, let the CPU rest.
    sleep(1);
  }
  drush_log(dt('Timeout: exiting processing loop.'));
}