class UltimateCronSerialLauncher in Ultimate Cron 7.2

Ultimate Cron launcher plugin class.


Expanded class hierarchy of UltimateCronSerialLauncher

1 string reference to 'UltimateCronSerialLauncher' in plugins/ultimate_cron/launcher/


plugins/ultimate_cron/launcher/serial.class.php, line 10
Serial cron job launcher for Ultimate Cron.

class UltimateCronSerialLauncher extends UltimateCronLauncher {
  public $currentThread = NULL;
  public $currentThreadLockId = NULL;

   * Implements hook_cron_alter().
  public function cron_alter(&$jobs) {
    $class = _ultimate_cron_get_class('lock');
    if (isset($jobs['ultimate_cron_plugin_launcher_serial_cleanup']) && !empty($class::$killable)) {
      $jobs['ultimate_cron_plugin_launcher_serial_cleanup']->hook['tags'][] = 'killable';

   * Default settings.
  public function defaultSettings() {
    return array(
      'max_threads' => 1,
      'thread' => 'any',
      'lock_timeout' => 3600,
      'poorman_keepalive' => FALSE,
    ) + parent::defaultSettings();

   * Settings form for the crontab scheduler.
  public function settingsForm(&$form, &$form_state, $job = NULL) {
    $elements =& $form['settings'][$this->type][$this->name];
    $values =& $form_state['values']['settings'][$this->type][$this->name];
    $elements['timeouts'] = array(
      '#type' => 'fieldset',
      '#title' => t('Timeouts'),
    $elements['launcher'] = array(
      '#type' => 'fieldset',
      '#title' => t('Launching options'),
    $elements['timeouts']['lock_timeout'] = array(
      '#parents' => array(
      '#title' => t("Job lock timeout"),
      '#type' => 'textfield',
      '#default_value' => $values['lock_timeout'],
      '#description' => t('Number of seconds to keep lock on job.'),
      '#fallback' => TRUE,
      '#required' => TRUE,
    if (!$job) {
      $max_threads = $values['max_threads'];
      $elements['launcher']['max_threads'] = array(
        '#parents' => array(
        '#title' => t("Maximum number of launcher threads"),
        '#type' => 'textfield',
        '#default_value' => $max_threads,
        '#description' => t('The maximum number of launch threads that can be running at any given time.'),
        '#fallback' => TRUE,
        '#required' => TRUE,
        '#element_validate' => array(
        '#weight' => 1,
      $elements['launcher']['poorman_keepalive'] = array(
        '#parents' => array(
        '#title' => t("Poormans cron keepalive"),
        '#type' => 'checkbox',
        '#default_value' => $values['poorman_keepalive'],
        '#description' => t('Retrigger poormans cron after it has finished. Requires $base_url to be accessible from the webserver.'),
        '#fallback' => TRUE,
        '#weight' => 3,
    else {
      $settings = $this
      $max_threads = $settings['max_threads'];
    $options = array(
      'any' => '-- ' . t('Any') . ' --',
      'fixed' => '-- ' . t('Fixed') . ' --',
    for ($i = 1; $i <= $max_threads; $i++) {
      $options[$i] = $i;
    $elements['launcher']['thread'] = array(
      '#parents' => array(
      '#title' => t("Run in thread"),
      '#type' => 'select',
      '#default_value' => $values['thread'],
      '#options' => $options,
      '#description' => t('Which thread to run jobs in.') . "<br/>" . t('<strong>Any</strong>: Just use any available thread') . "<br/>" . t('<strong>Fixed</strong>: Only run in one specific thread. The maximum number of threads is spread across the jobs.') . "<br/>" . t('<strong>1-?</strong>: Only run when a specific thread is invoked. This setting only has an effect when cron is run through cron.php with an argument ?thread=N or through Drush with --options=thread=N.'),
      '#fallback' => TRUE,
      '#required' => TRUE,
      '#weight' => 2,

   * Settings form validator.
  public function settingsFormValidate(&$form, &$form_state, $job = NULL) {
    $elements =& $form['settings'][$this->type][$this->name];
    $values =& $form_state['values']['settings'][$this->type][$this->name];
    if (!$job) {
      if (intval($values['max_threads']) <= 0) {
        form_set_error("settings[{$this->type}][{$this->name}", t('%title must be greater than 0', array(
          '%title' => $elements['launcher']['max_threads']['#title'],

   * Lock job.
  public function lock($job) {
    $settings = $job
    $timeout = $settings['lock_timeout'];
    $class = _ultimate_cron_get_class('lock');
    if ($lock_id = $class::lock($job->name, $timeout)) {
      $lock_id = $this->name . '-' . $lock_id;
      return $lock_id;
    return FALSE;

   * Unlock job.
  public function unlock($lock_id, $manual = FALSE) {
    list($launcher, $lock_id) = explode('-', $lock_id, 2);
    $class = _ultimate_cron_get_class('lock');
    return $class::unlock($lock_id);

   * Check if job is locked.
  public function isLocked($job) {
    $class = _ultimate_cron_get_class('lock');
    $lock_id = $class::isLocked($job->name);
    return $lock_id ? $this->name . '-' . $lock_id : $lock_id;

   * Check lock for multiple jobs.
  public function isLockedMultiple($jobs) {
    $names = array();
    foreach ($jobs as $job) {
      $names[] = $job->name;
    $class = _ultimate_cron_get_class('lock');
    $lock_ids = $class::isLockedMultiple($names);
    foreach ($lock_ids as &$lock_id) {
      $lock_id = $lock_id ? $this->name . '-' . $lock_id : $lock_id;
    return $lock_ids;

   * Cleanup.
  public function cleanup() {
    $class = _ultimate_cron_get_class('lock');

   * Launcher.
  public function launch($job) {
    $lock_id = $job
    if (!$lock_id) {
      return FALSE;
    if ($this->currentThread) {
      $init_message = t('Launched in thread @current_thread', array(
        '@current_thread' => $this->currentThread,
    else {
      $init_message = t('Launched manually');
    $log_entry = $job
      ->startLog($lock_id, $init_message);
    drupal_set_message(t('@name: @init_message', array(
      '@name' => $job->name,
      '@init_message' => $init_message,
    $class = _ultimate_cron_get_class('lock');
    try {

      // Allocate time for the job's lock if necessary.
      $settings = $job
      $lock_timeout = drupal_set_time_limit($settings['lock_timeout']);

      // Relock cron thread with proper timeout.
      if ($this->currentThreadLockId) {
        $class::reLock($this->currentThreadLockId, $settings['lock_timeout']);

      // Run job.
    } catch (Throwable $e) {
      ultimate_cron_watchdog_throwable('serial_launcher', $e, 'Error executing %job: @error', array(
        '%job' => $job->name,
        '@error' => (string) $e,
      return FALSE;
    } catch (Exception $e) {
      watchdog_exception('serial_launcher', $e, 'Error executing %job: @error', array(
        '%job' => $job->name,
        '@error' => (string) $e,
      return FALSE;
    return TRUE;

   * Find a free thread for running cron jobs.
  public function findFreeThread($lock, $lock_timeout = NULL, $timeout = 3) {
    $settings = $this

    // Find a free thread, try for 3 seconds.
    $delay = $timeout * 1000000;
    $sleep = 25000;
    $class = _ultimate_cron_get_class('lock');
    do {
      for ($thread = 1; $thread <= $settings['max_threads']; $thread++) {
        if ($thread != $this->currentThread) {
          $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
          if (!$class::isLocked($lock_name)) {
            if ($lock) {
              if ($lock_id = $class::lock($lock_name, $lock_timeout)) {
                return array(
            else {
              return array(
      if ($delay > 0) {

        // After each sleep, increase the value of $sleep until it reaches
        // 500ms, to reduce the potential for a lock stampede.
        $delay = $delay - $sleep;
        $sleep = min(500000, $sleep + 25000, $delay);
    } while ($delay > 0);
    return array(

   * Launch manager.
  public function launchJobs($jobs) {
    $class = _ultimate_cron_get_class('lock');
    $settings = $this

    // We only lock for 55 seconds at a time, to give room for other cron
    // runs.
    $lock_timeout = 55;
    if (!empty($_GET['thread'])) {
      self::setGlobalOption('thread', $_GET['thread']);
    if ($thread = intval(self::getGlobalOption('thread'))) {
      if ($thread < 1 || $thread > $settings['max_threads']) {
        watchdog('serial_launcher', "Invalid thread available for starting launch thread", array(), WATCHDOG_ERROR);
      $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
      $lock_id = NULL;
      if (!$class::isLocked($lock_name)) {
        $lock_id = $class::lock($lock_name, $lock_timeout);
      if (!$lock_id) {
        watchdog('serial_launcher', "Thread @thread is already running", array(
          '@thread' => $thread,
    else {
      $timeout = 1;
      list($thread, $lock_id) = $this
        ->findFreeThread(TRUE, $lock_timeout, $timeout);
    if (!$thread) {
      watchdog('serial_launcher', "No free threads available for launching jobs", array(), WATCHDOG_WARNING);
    watchdog('serial_launcher', "Cron thread %thread started", array(
      '%thread' => $thread,
      ->runThread($lock_id, $thread, $jobs);

   * Run jobs in thread.
   * @param string $lock_id
   *   The lock id.
   * @param string $thread
   *   The tread number.
   * @param array $jobs
   *   The UltimateCronJobs to run.
  public function runThread($lock_id, $thread, $jobs) {
    $this->currentThread = $thread;
    $this->currentThreadLockId = $lock_id;
    $class = _ultimate_cron_get_class('lock');
    $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
    foreach ($jobs as $job) {
      $settings = $job
      switch ($settings['thread']) {
        case 'any':
          $settings['thread'] = $thread;
        case 'fixed':
          $settings['thread'] = $job
            ->getUniqueID() % $settings['max_threads'] + 1;
      if ((!self::getGlobalOption('thread') || $settings['thread'] == $thread) && $job
        ->isScheduled()) {

        // Be friendly, and check if someone else has taken the lock.
        // If they have, bail out, since someone else is now handling
        // this thread.
        if ($current_lock_id = $class::isLocked($lock_name)) {
          if ($current_lock_id !== $lock_id) {
        else {

          // If lock is free, then take the lock again.
          $lock_id = $class::lock($lock_name);
          if (!$lock_id) {

            // Race-condition, someone beat us to it.

   * Poormans cron launcher.
  public function launchPoorman() {
    $class = _ultimate_cron_get_class('lock');
    $settings = $this

    // Is it time to run cron?
    $cron_last = variable_get('cron_last', 0);
    $cron_next = floor(($cron_last + 60) / 60) * 60;
    $time = time();
    if ($time < $cron_next) {
      if ($settings['poorman_keepalive'] && ($lock_id = $class::lock('ultimate_cron_poorman_serial', 60))) {
        $sleep = $cron_next - $time;

    // Check poorman settings. If launcher has changed, we don't want
    // to keepalive.
    $poorman = _ultimate_cron_plugin_load('settings', 'poorman');
    if (!$poorman) {
    $settings = $poorman
    if (!$settings['launcher'] || $settings['launcher'] !== $this->name) {
    $settings = $this
    if ($settings['poorman_keepalive'] && ($lock_id = $class::lock('ultimate_cron_poorman_serial', 60))) {

      // Is it time to run cron? If not wait before re-launching.
      $cron_last = variable_get('cron_last', 0);
      $cron_next = floor(($cron_last + 60) / 60) * 60;
      $time = time();
      if ($time < $cron_next) {
        $sleep = $cron_next - $time;



Namesort descending Modifiers Type Description Overrides
UltimateCronLauncher::finishProgress public function Default implementation of finishProgress(). 1
UltimateCronLauncher::formatProgress public function Default implementation of formatProgress().
UltimateCronLauncher::formatRunning public function Format running state. 1
UltimateCronLauncher::formatUnfinished public function Format unfinished state.
UltimateCronLauncher::getProgress public function Default implementation of getProgress(). 1
UltimateCronLauncher::getProgressMultiple public function Default implementation of getProgressMultiple(). 1
UltimateCronLauncher::initializeProgress public function Default implementation of initializeProgress(). 1
UltimateCronLauncher::run public function Run the job.
UltimateCronLauncher::setProgress public function Default implementation of setProgress(). 1
UltimateCronPlugin::$description public property
UltimateCronPlugin::$globalOptions public static property
UltimateCronPlugin::$instances public static property
UltimateCronPlugin::$multiple public static property 1
UltimateCronPlugin::$name public property
UltimateCronPlugin::$plugin public property
UltimateCronPlugin::$settings public property
UltimateCronPlugin::$title public property
UltimateCronPlugin::$weight public property 1
UltimateCronPlugin::build_operations_alter public function Allow plugins to alter the allowed operations for a job. 2
UltimateCronPlugin::cleanForm public function Clean form of empty fallback values.
UltimateCronPlugin::cronapi public function A hook_cronapi() for plugins. 1
UltimateCronPlugin::cron_post_invoke public function A hook_cron_post_invoke() for plugins.
UltimateCronPlugin::cron_post_launch public function A hook_cron_post_launch() for plugins.
UltimateCronPlugin::cron_post_run public function A hook_cron_post_run() for plugins.
UltimateCronPlugin::cron_post_schedule public function A hook_cron_post_schedule() for plugins. 1
UltimateCronPlugin::cron_pre_invoke public function A hook_cron_pre_invoke() for plugins.
UltimateCronPlugin::cron_pre_launch public function A hook_cron_pre_launch() for plugins.
UltimateCronPlugin::cron_pre_run public function A hook_cron_pre_run() for plugins.
UltimateCronPlugin::cron_pre_schedule public function A hook_cron_pre_schedule() for plugins. 2
UltimateCronPlugin::defaultSettingsForm public static function Default settings form. 1
UltimateCronPlugin::drupal_array_remove_nested_value public function Modified version drupal_array_get_nested_value().
UltimateCronPlugin::factory public static function Singleton factoryLogEntry.
UltimateCronPlugin::fallbackalize public function Process fallback form parameters.
UltimateCronPlugin::formatLabel public function Format label for the plugin. 1
UltimateCronPlugin::formatLabelVerbose public function Format verbose label for the plugin. 1
UltimateCronPlugin::getDefaultSettings public function Get default settings. 1
UltimateCronPlugin::getGlobalOption public static function Get global plugin option.
UltimateCronPlugin::getGlobalOptions public static function Get all global plugin options.
UltimateCronPlugin::hook_cron_alter final public static function Invoke hook_cron_alter() on plugins.
UltimateCronPlugin::hook_cron_post_invoke final public static function Invoke hook_cron_post_invoke() on plugins.
UltimateCronPlugin::hook_cron_post_launch final public static function Invoke hook_cron_post_launch() on plugins.
UltimateCronPlugin::hook_cron_post_run final public static function Invoke hook_cron_post_run() on plugins.
UltimateCronPlugin::hook_cron_post_schedule final public static function Invoke hook_cron_post_schedule() on plugins.
UltimateCronPlugin::hook_cron_pre_invoke final public static function Invoke hook_cron_pre_invoke() on plugins.
UltimateCronPlugin::hook_cron_pre_launch final public static function Invoke hook_cron_pre_launch() on plugins.
UltimateCronPlugin::hook_cron_pre_run final public static function Invoke hook_cron_pre_run() on plugins.
UltimateCronPlugin::hook_cron_pre_schedule final public static function Invoke hook_cron_pre_schedule() on plugins.
UltimateCronPlugin::isValid public function Default plugin valid for all jobs. 2
UltimateCronPlugin::jobSettingsForm public static function Job settings form. 1
UltimateCronPlugin::jobSettingsFormSubmit public static function Job settings form submit handler. 1
UltimateCronPlugin::jobSettingsFormValidate public static function Job settings form validate handler. 1
UltimateCronPlugin::setGlobalOption public static function Set global plugin option.
UltimateCronPlugin::setSettings public function Save settings to db.
UltimateCronPlugin::settingsFormSubmit public function Settings form submit handler. 3
UltimateCronPlugin::settingsLabel public function Get label for a specific setting. 3
UltimateCronPlugin::signal public function Signal page for plugins. 2
UltimateCronPlugin::unsetGlobalOption public static function Remove a global plugin option.
UltimateCronPlugin::unsetGlobalOptions public static function Remove all global plugin options.
UltimateCronPlugin::__construct public function Constructor. 1
UltimateCronSerialLauncher::$currentThread public property
UltimateCronSerialLauncher::$currentThreadLockId public property
UltimateCronSerialLauncher::cleanup public function Cleanup.
UltimateCronSerialLauncher::cron_alter public function Implements hook_cron_alter(). Overrides UltimateCronPlugin::cron_alter
UltimateCronSerialLauncher::defaultSettings public function Default settings. Overrides UltimateCronLauncher::defaultSettings
UltimateCronSerialLauncher::findFreeThread public function Find a free thread for running cron jobs.
UltimateCronSerialLauncher::isLocked public function Check if job is locked. Overrides UltimateCronLauncher::isLocked
UltimateCronSerialLauncher::isLockedMultiple public function Check lock for multiple jobs. Overrides UltimateCronLauncher::isLockedMultiple
UltimateCronSerialLauncher::launch public function Launcher. Overrides UltimateCronLauncher::launch
UltimateCronSerialLauncher::launchJobs public function Launch manager. Overrides UltimateCronLauncher::launchJobs
UltimateCronSerialLauncher::launchPoorman public function Poormans cron launcher.
UltimateCronSerialLauncher::lock public function Lock job. Overrides UltimateCronLauncher::lock
UltimateCronSerialLauncher::runThread public function Run jobs in thread.
UltimateCronSerialLauncher::settingsForm public function Settings form for the crontab scheduler. Overrides UltimateCronPlugin::settingsForm
UltimateCronSerialLauncher::settingsFormValidate public function Settings form validator. Overrides UltimateCronPlugin::settingsFormValidate
UltimateCronSerialLauncher::unlock public function Unlock job. Overrides UltimateCronLauncher::unlock