function drush_advancedqueue_process_queue in Advanced Queue 7
Command callback for drush advancedqueue-process-queue.
Parameters
string $queue_args: Arbitrary string. The name of the queue to work with.
File
- drush/
advancedqueue.drush.inc, line 40 - Drush worker for Advanced-queue.
Code
function drush_advancedqueue_process_queue($queue_args = NULL) {
// Load information about the registered queues, and sort them by weight.
if (!($queues_info = advancedqueue_get_queues_info())) {
return drush_set_error(dt('No queues exist.'));
}
$all_option = drush_get_option('all');
$group_option = drush_get_option('group');
if (!$all_option && !$group_option && empty($queue_args)) {
return drush_set_error(dt('You have to specify either a set of queues or the --all or --group parameter.'));
}
if ($all_option) {
$queues = $queues_info;
}
elseif ($group_option) {
$groups = array();
foreach ($queues_info as $queue_name => $queue_info) {
// Each queue can define one or more groups. Thanks to the defaults, we
// can be sure this key is an array.
foreach ($queue_info['groups'] as $group) {
if (!isset($groups[$group])) {
// These sub-arrays need to be initialized or we can't append to them.
$groups[$group] = array();
}
$groups[$group][] = $queue_name;
}
}
if (!isset($groups[$group_option])) {
return drush_set_error(dt('Invalid !group queue group. Aborting.', array(
'!group' => $group_option,
)));
}
$queues = drupal_map_assoc($groups[$group_option]);
$queues = array_intersect_key($queues_info, $queues);
}
else {
// Validate queues.
$queues = drupal_map_assoc(explode(',', $queue_args));
if ($invalid_queues = array_diff_key($queues, $queues_info)) {
return drush_set_error(dt('The following queues are invalid: !queues. Aborting.', array(
'!queues' => implode(', ', $invalid_queues),
)));
}
$queues = array_intersect_key($queues_info, $queues);
}
// Delete older entries and make sure there are no stale items in the table.
drush_log(dt('Cleanup processed and stale items.'));
_advancedqueue_cleanup_table();
// Run the worker for a certain period of time before killing it.
$timeout = drush_get_option('timeout') ?: variable_get('advancedqueue_processing_timeout_drush', 0);
$end = $timeout ? time() + $timeout : 0;
drush_log(dt('Starting processing loop.'));
while (!$end || time() < $end) {
foreach ($queues as $queue_name => $queue_info) {
$queue = DrupalQueue::get($queue_name);
if ($item = $queue
->claimItem($queue_info['lease time'])) {
advancedqueue_process_item($queue, $queue_name, $queue_info, $item, $end);
continue 2;
}
}
// No item processed in that round, let the CPU rest.
sleep(1);
}
drush_log(dt('Timeout: exiting processing loop.'));
}