You are here

public static function UltimateCronQueueSettings::worker_callback in Ultimate Cron 7.2

Process a cron queue.

This is a wrapper around the cron queues "worker callback".

Parameters

UltimateCronJob $job: The job being run.

File

plugins/ultimate_cron/settings/queue.class.php, line 94
Queue settings for Ultimate Cron.

Class

UltimateCronQueueSettings
Queue settings plugin class.

Code

public static function worker_callback($job) {
  $settings = $job
    ->getPluginSettings('settings');
  $queue = DrupalQueue::get($settings['queue']['name']);
  $function = $settings['queue']['worker callback'];
  $end = microtime(TRUE) + $settings['queue']['time'];
  $items = 0;
  do {
    if ($job
      ->getSignal('kill')) {
      watchdog('ultimate_cron', 'kill signal received', array(), WATCHDOG_NOTICE);
      break;
    }
    $item = $queue
      ->claimItem($settings['queue']['lease_time']);
    if (!$item) {
      if ($settings['queue']['empty_delay']) {
        usleep($settings['queue']['empty_delay'] * 1000000);
        continue;
      }
      else {
        break;
      }
    }
    try {
      call_user_func($function, $item->data);
      $queue
        ->deleteItem($item);
      $items++;

      // Sleep after processing retrieving.
      if ($settings['queue']['item_delay']) {
        usleep($settings['queue']['item_delay'] * 1000000);
      }
    } catch (Throwable $e) {

      // Just continue ...
      ultimate_cron_watchdog_throwable($job->hook['module'], $e, "Queue item @item_id from queue @queue failed with message @message", array(
        '@item_id' => $item->item_id,
        '@queue' => $settings['queue']['name'],
        '@message' => (string) $e,
      ), WATCHDOG_ERROR);
    } catch (Exception $e) {

      // Just continue ...
      watchdog_exception($job->hook['module'], $e, "Queue item @item_id from queue @queue failed with message @message", array(
        '@item_id' => $item->item_id,
        '@queue' => $settings['queue']['name'],
        '@message' => (string) $e,
      ), WATCHDOG_ERROR);
    }
  } while (microtime(TRUE) < $end);
  if ($items) {
    watchdog($job->hook['module'], 'Processed @items items from queue @queue', array(
      '@items' => $items,
      '@queue' => $settings['queue']['name'],
    ), WATCHDOG_INFO);
  }

  // Re-throttle.
  $job
    ->getPlugin('settings', 'queue')
    ->throttle($job);
}