You are here

class UltimateCronSerialLauncher in Ultimate Cron 7.2

Ultimate Cron launcher plugin class.

Hierarchy

Expanded class hierarchy of UltimateCronSerialLauncher

1 string reference to 'UltimateCronSerialLauncher'
serial.inc in plugins/ultimate_cron/launcher/serial.inc

File

plugins/ultimate_cron/launcher/serial.class.php, line 10
Serial cron job launcher for Ultimate Cron.

View source
class UltimateCronSerialLauncher extends UltimateCronLauncher {
  public $currentThread = NULL;
  public $currentThreadLockId = NULL;

  /**
   * Implements hook_cron_alter().
   */
  public function cron_alter(&$jobs) {
    $class = _ultimate_cron_get_class('lock');
    if (isset($jobs['ultimate_cron_plugin_launcher_serial_cleanup']) && !empty($class::$killable)) {
      $jobs['ultimate_cron_plugin_launcher_serial_cleanup']->hook['tags'][] = 'killable';
    }
  }

  /**
   * Default settings.
   */
  public function defaultSettings() {
    return array(
      'max_threads' => 1,
      'thread' => 'any',
      'lock_timeout' => 3600,
      'poorman_keepalive' => FALSE,
    ) + parent::defaultSettings();
  }

  /**
   * Settings form for the crontab scheduler.
   */
  public function settingsForm(&$form, &$form_state, $job = NULL) {
    $elements =& $form['settings'][$this->type][$this->name];
    $values =& $form_state['values']['settings'][$this->type][$this->name];
    $elements['timeouts'] = array(
      '#type' => 'fieldset',
      '#title' => t('Timeouts'),
    );
    $elements['launcher'] = array(
      '#type' => 'fieldset',
      '#title' => t('Launching options'),
    );
    $elements['timeouts']['lock_timeout'] = array(
      '#parents' => array(
        'settings',
        $this->type,
        $this->name,
        'lock_timeout',
      ),
      '#title' => t("Job lock timeout"),
      '#type' => 'textfield',
      '#default_value' => $values['lock_timeout'],
      '#description' => t('Number of seconds to keep lock on job.'),
      '#fallback' => TRUE,
      '#required' => TRUE,
    );
    if (!$job) {
      $max_threads = $values['max_threads'];
      $elements['launcher']['max_threads'] = array(
        '#parents' => array(
          'settings',
          $this->type,
          $this->name,
          'max_threads',
        ),
        '#title' => t("Maximum number of launcher threads"),
        '#type' => 'textfield',
        '#default_value' => $max_threads,
        '#description' => t('The maximum number of launch threads that can be running at any given time.'),
        '#fallback' => TRUE,
        '#required' => TRUE,
        '#element_validate' => array(
          'element_validate_number',
        ),
        '#weight' => 1,
      );
      $elements['launcher']['poorman_keepalive'] = array(
        '#parents' => array(
          'settings',
          $this->type,
          $this->name,
          'poorman_keepalive',
        ),
        '#title' => t("Poormans cron keepalive"),
        '#type' => 'checkbox',
        '#default_value' => $values['poorman_keepalive'],
        '#description' => t('Retrigger poormans cron after it has finished. Requires $base_url to be accessible from the webserver.'),
        '#fallback' => TRUE,
        '#weight' => 3,
      );
    }
    else {
      $settings = $this
        ->getDefaultSettings();
      $max_threads = $settings['max_threads'];
    }
    $options = array(
      'any' => '-- ' . t('Any') . ' --',
      'fixed' => '-- ' . t('Fixed') . ' --',
    );
    for ($i = 1; $i <= $max_threads; $i++) {
      $options[$i] = $i;
    }
    $elements['launcher']['thread'] = array(
      '#parents' => array(
        'settings',
        $this->type,
        $this->name,
        'thread',
      ),
      '#title' => t("Run in thread"),
      '#type' => 'select',
      '#default_value' => $values['thread'],
      '#options' => $options,
      '#description' => t('Which thread to run jobs in.') . "<br/>" . t('<strong>Any</strong>: Just use any available thread') . "<br/>" . t('<strong>Fixed</strong>: Only run in one specific thread. The maximum number of threads is spread across the jobs.') . "<br/>" . t('<strong>1-?</strong>: Only run when a specific thread is invoked. This setting only has an effect when cron is run through cron.php with an argument ?thread=N or through Drush with --options=thread=N.'),
      '#fallback' => TRUE,
      '#required' => TRUE,
      '#weight' => 2,
    );
  }

  /**
   * Settings form validator.
   */
  public function settingsFormValidate(&$form, &$form_state, $job = NULL) {
    $elements =& $form['settings'][$this->type][$this->name];
    $values =& $form_state['values']['settings'][$this->type][$this->name];
    if (!$job) {
      if (intval($values['max_threads']) <= 0) {
        form_set_error("settings[{$this->type}][{$this->name}", t('%title must be greater than 0', array(
          '%title' => $elements['launcher']['max_threads']['#title'],
        )));
      }
    }
  }

  /**
   * Lock job.
   */
  public function lock($job) {
    $settings = $job
      ->getSettings($this->type);
    $timeout = $settings['lock_timeout'];
    $class = _ultimate_cron_get_class('lock');
    if ($lock_id = $class::lock($job->name, $timeout)) {
      $lock_id = $this->name . '-' . $lock_id;
      return $lock_id;
    }
    return FALSE;
  }

  /**
   * Unlock job.
   */
  public function unlock($lock_id, $manual = FALSE) {
    list($launcher, $lock_id) = explode('-', $lock_id, 2);
    $class = _ultimate_cron_get_class('lock');
    return $class::unlock($lock_id);
  }

  /**
   * Check if job is locked.
   */
  public function isLocked($job) {
    $class = _ultimate_cron_get_class('lock');
    $lock_id = $class::isLocked($job->name);
    return $lock_id ? $this->name . '-' . $lock_id : $lock_id;
  }

  /**
   * Check lock for multiple jobs.
   */
  public function isLockedMultiple($jobs) {
    $names = array();
    foreach ($jobs as $job) {
      $names[] = $job->name;
    }
    $class = _ultimate_cron_get_class('lock');
    $lock_ids = $class::isLockedMultiple($names);
    foreach ($lock_ids as &$lock_id) {
      $lock_id = $lock_id ? $this->name . '-' . $lock_id : $lock_id;
    }
    return $lock_ids;
  }

  /**
   * Cleanup.
   */
  public function cleanup() {
    $class = _ultimate_cron_get_class('lock');
    $class::cleanup();
  }

  /**
   * Launcher.
   */
  public function launch($job) {
    $lock_id = $job
      ->lock();
    if (!$lock_id) {
      return FALSE;
    }
    if ($this->currentThread) {
      $init_message = t('Launched in thread @current_thread', array(
        '@current_thread' => $this->currentThread,
      ));
    }
    else {
      $init_message = t('Launched manually');
    }
    $log_entry = $job
      ->startLog($lock_id, $init_message);
    drupal_set_message(t('@name: @init_message', array(
      '@name' => $job->name,
      '@init_message' => $init_message,
    )));
    $class = _ultimate_cron_get_class('lock');
    try {

      // Allocate time for the job's lock if necessary.
      $settings = $job
        ->getSettings($this->type);
      $lock_timeout = drupal_set_time_limit($settings['lock_timeout']);

      // Relock cron thread with proper timeout.
      if ($this->currentThreadLockId) {
        $class::reLock($this->currentThreadLockId, $settings['lock_timeout']);
      }

      // Run job.
      $job
        ->run();
    } catch (Throwable $e) {
      ultimate_cron_watchdog_throwable('serial_launcher', $e, 'Error executing %job: @error', array(
        '%job' => $job->name,
        '@error' => (string) $e,
      ), WATCHDOG_ERROR);
      $log_entry
        ->finish();
      $job
        ->unlock($lock_id);
      return FALSE;
    } catch (Exception $e) {
      watchdog_exception('serial_launcher', $e, 'Error executing %job: @error', array(
        '%job' => $job->name,
        '@error' => (string) $e,
      ), WATCHDOG_ERROR);
      $log_entry
        ->finish();
      $job
        ->unlock($lock_id);
      return FALSE;
    }
    $log_entry
      ->finish();
    $job
      ->unlock($lock_id);
    return TRUE;
  }

  /**
   * Find a free thread for running cron jobs.
   */
  public function findFreeThread($lock, $lock_timeout = NULL, $timeout = 3) {
    $settings = $this
      ->getDefaultSettings();

    // Find a free thread, try for 3 seconds.
    $delay = $timeout * 1000000;
    $sleep = 25000;
    $class = _ultimate_cron_get_class('lock');
    do {
      for ($thread = 1; $thread <= $settings['max_threads']; $thread++) {
        if ($thread != $this->currentThread) {
          $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
          if (!$class::isLocked($lock_name)) {
            if ($lock) {
              if ($lock_id = $class::lock($lock_name, $lock_timeout)) {
                return array(
                  $thread,
                  $lock_id,
                );
              }
            }
            else {
              return array(
                $thread,
                FALSE,
              );
            }
          }
        }
      }
      if ($delay > 0) {
        usleep($sleep);

        // After each sleep, increase the value of $sleep until it reaches
        // 500ms, to reduce the potential for a lock stampede.
        $delay = $delay - $sleep;
        $sleep = min(500000, $sleep + 25000, $delay);
      }
    } while ($delay > 0);
    return array(
      FALSE,
      FALSE,
    );
  }

  /**
   * Launch manager.
   */
  public function launchJobs($jobs) {
    $class = _ultimate_cron_get_class('lock');
    $settings = $this
      ->getDefaultSettings();

    // We only lock for 55 seconds at a time, to give room for other cron
    // runs.
    $lock_timeout = 55;
    if (!empty($_GET['thread'])) {
      self::setGlobalOption('thread', $_GET['thread']);
    }
    if ($thread = intval(self::getGlobalOption('thread'))) {
      if ($thread < 1 || $thread > $settings['max_threads']) {
        watchdog('serial_launcher', "Invalid thread available for starting launch thread", array(), WATCHDOG_ERROR);
        return;
      }
      $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
      $lock_id = NULL;
      if (!$class::isLocked($lock_name)) {
        $lock_id = $class::lock($lock_name, $lock_timeout);
      }
      if (!$lock_id) {
        watchdog('serial_launcher', "Thread @thread is already running", array(
          '@thread' => $thread,
        ), WATCHDOG_WARNING);
      }
    }
    else {
      $timeout = 1;
      list($thread, $lock_id) = $this
        ->findFreeThread(TRUE, $lock_timeout, $timeout);
    }
    if (!$thread) {
      watchdog('serial_launcher', "No free threads available for launching jobs", array(), WATCHDOG_WARNING);
      return;
    }
    watchdog('serial_launcher', "Cron thread %thread started", array(
      '%thread' => $thread,
    ), WATCHDOG_DEBUG);
    $this
      ->runThread($lock_id, $thread, $jobs);
    $class::unlock($lock_id);
  }

  /**
   * Run jobs in thread.
   *
   * @param string $lock_id
   *   The lock id.
   * @param string $thread
   *   The tread number.
   * @param array $jobs
   *   The UltimateCronJobs to run.
   */
  public function runThread($lock_id, $thread, $jobs) {
    $this->currentThread = $thread;
    $this->currentThreadLockId = $lock_id;
    $class = _ultimate_cron_get_class('lock');
    $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
    foreach ($jobs as $job) {
      $settings = $job
        ->getSettings($this->type);
      switch ($settings['thread']) {
        case 'any':
          $settings['thread'] = $thread;
          break;
        case 'fixed':
          $settings['thread'] = $job
            ->getUniqueID() % $settings['max_threads'] + 1;
          break;
      }
      if ((!self::getGlobalOption('thread') || $settings['thread'] == $thread) && $job
        ->isScheduled()) {
        $job
          ->launch();

        // Be friendly, and check if someone else has taken the lock.
        // If they have, bail out, since someone else is now handling
        // this thread.
        if ($current_lock_id = $class::isLocked($lock_name)) {
          if ($current_lock_id !== $lock_id) {
            return;
          }
        }
        else {

          // If lock is free, then take the lock again.
          $lock_id = $class::lock($lock_name);
          if (!$lock_id) {

            // Race-condition, someone beat us to it.
            return;
          }
        }
      }
    }
  }

  /**
   * Poormans cron launcher.
   */
  public function launchPoorman() {
    $class = _ultimate_cron_get_class('lock');
    $settings = $this
      ->getDefaultSettings();

    // Is it time to run cron?
    $cron_last = variable_get('cron_last', 0);
    $cron_next = floor(($cron_last + 60) / 60) * 60;
    $time = time();
    if ($time < $cron_next) {
      if ($settings['poorman_keepalive'] && ($lock_id = $class::lock('ultimate_cron_poorman_serial', 60))) {
        ultimate_cron_poorman_page_flush();
        $sleep = $cron_next - $time;
        sleep($sleep);
        ultimate_cron_poorman_trigger();
        $class::unLock($lock_id);
      }
      return;
    }
    unset($_GET['thread']);
    ultimate_cron_poorman_page_flush();
    ultimate_cron_run_launchers();

    // Check poorman settings. If launcher has changed, we don't want
    // to keepalive.
    $poorman = _ultimate_cron_plugin_load('settings', 'poorman');
    if (!$poorman) {
      return;
    }
    $settings = $poorman
      ->getDefaultSettings();
    if (!$settings['launcher'] || $settings['launcher'] !== $this->name) {
      return;
    }
    $settings = $this
      ->getDefaultSettings();
    if ($settings['poorman_keepalive'] && ($lock_id = $class::lock('ultimate_cron_poorman_serial', 60))) {

      // Is it time to run cron? If not wait before re-launching.
      $cron_last = variable_get('cron_last', 0);
      $cron_next = floor(($cron_last + 60) / 60) * 60;
      $time = time();
      if ($time < $cron_next) {
        $sleep = $cron_next - $time;
        sleep($sleep);
      }
      $class::unLock($lock_id);
      ultimate_cron_poorman_trigger();
    }
  }

}

Members

Namesort descending Modifiers Type Description Overrides
UltimateCronLauncher::finishProgress public function Default implementation of finishProgress(). 1
UltimateCronLauncher::formatProgress public function Default implementation of formatProgress().
UltimateCronLauncher::formatRunning public function Format running state. 1
UltimateCronLauncher::formatUnfinished public function Format unfinished state.
UltimateCronLauncher::getProgress public function Default implementation of getProgress(). 1
UltimateCronLauncher::getProgressMultiple public function Default implementation of getProgressMultiple(). 1
UltimateCronLauncher::initializeProgress public function Default implementation of initializeProgress(). 1
UltimateCronLauncher::run public function Run the job.
UltimateCronLauncher::setProgress public function Default implementation of setProgress(). 1
UltimateCronPlugin::$description public property
UltimateCronPlugin::$globalOptions public static property
UltimateCronPlugin::$instances public static property
UltimateCronPlugin::$multiple public static property 1
UltimateCronPlugin::$name public property
UltimateCronPlugin::$plugin public property
UltimateCronPlugin::$settings public property
UltimateCronPlugin::$title public property
UltimateCronPlugin::$weight public property 1
UltimateCronPlugin::build_operations_alter public function Allow plugins to alter the allowed operations for a job. 2
UltimateCronPlugin::cleanForm public function Clean form of empty fallback values.
UltimateCronPlugin::cronapi public function A hook_cronapi() for plugins. 1
UltimateCronPlugin::cron_post_invoke public function A hook_cron_post_invoke() for plugins.
UltimateCronPlugin::cron_post_launch public function A hook_cron_post_launch() for plugins.
UltimateCronPlugin::cron_post_run public function A hook_cron_post_run() for plugins.
UltimateCronPlugin::cron_post_schedule public function A hook_cron_post_schedule() for plugins. 1
UltimateCronPlugin::cron_pre_invoke public function A hook_cron_pre_invoke() for plugins.
UltimateCronPlugin::cron_pre_launch public function A hook_cron_pre_launch() for plugins.
UltimateCronPlugin::cron_pre_run public function A hook_cron_pre_run() for plugins.
UltimateCronPlugin::cron_pre_schedule public function A hook_cron_pre_schedule() for plugins. 2
UltimateCronPlugin::defaultSettingsForm public static function Default settings form. 1
UltimateCronPlugin::drupal_array_remove_nested_value public function Modified version drupal_array_get_nested_value().
UltimateCronPlugin::factory public static function Singleton factoryLogEntry.
UltimateCronPlugin::fallbackalize public function Process fallback form parameters.
UltimateCronPlugin::formatLabel public function Format label for the plugin. 1
UltimateCronPlugin::formatLabelVerbose public function Format verbose label for the plugin. 1
UltimateCronPlugin::getDefaultSettings public function Get default settings. 1
UltimateCronPlugin::getGlobalOption public static function Get global plugin option.
UltimateCronPlugin::getGlobalOptions public static function Get all global plugin options.
UltimateCronPlugin::hook_cron_alter final public static function Invoke hook_cron_alter() on plugins.
UltimateCronPlugin::hook_cron_post_invoke final public static function Invoke hook_cron_post_invoke() on plugins.
UltimateCronPlugin::hook_cron_post_launch final public static function Invoke hook_cron_post_launch() on plugins.
UltimateCronPlugin::hook_cron_post_run final public static function Invoke hook_cron_post_run() on plugins.
UltimateCronPlugin::hook_cron_post_schedule final public static function Invoke hook_cron_post_schedule() on plugins.
UltimateCronPlugin::hook_cron_pre_invoke final public static function Invoke hook_cron_pre_invoke() on plugins.
UltimateCronPlugin::hook_cron_pre_launch final public static function Invoke hook_cron_pre_launch() on plugins.
UltimateCronPlugin::hook_cron_pre_run final public static function Invoke hook_cron_pre_run() on plugins.
UltimateCronPlugin::hook_cron_pre_schedule final public static function Invoke hook_cron_pre_schedule() on plugins.
UltimateCronPlugin::isValid public function Default plugin valid for all jobs. 2
UltimateCronPlugin::jobSettingsForm public static function Job settings form. 1
UltimateCronPlugin::jobSettingsFormSubmit public static function Job settings form submit handler. 1
UltimateCronPlugin::jobSettingsFormValidate public static function Job settings form validate handler. 1
UltimateCronPlugin::setGlobalOption public static function Set global plugin option.
UltimateCronPlugin::setSettings public function Save settings to db.
UltimateCronPlugin::settingsFormSubmit public function Settings form submit handler. 3
UltimateCronPlugin::settingsLabel public function Get label for a specific setting. 3
UltimateCronPlugin::signal public function Signal page for plugins. 2
UltimateCronPlugin::unsetGlobalOption public static function Remove a global plugin option.
UltimateCronPlugin::unsetGlobalOptions public static function Remove all global plugin options.
UltimateCronPlugin::__construct public function Constructor. 1
UltimateCronSerialLauncher::$currentThread public property
UltimateCronSerialLauncher::$currentThreadLockId public property
UltimateCronSerialLauncher::cleanup public function Cleanup.
UltimateCronSerialLauncher::cron_alter public function Implements hook_cron_alter(). Overrides UltimateCronPlugin::cron_alter
UltimateCronSerialLauncher::defaultSettings public function Default settings. Overrides UltimateCronLauncher::defaultSettings
UltimateCronSerialLauncher::findFreeThread public function Find a free thread for running cron jobs.
UltimateCronSerialLauncher::isLocked public function Check if job is locked. Overrides UltimateCronLauncher::isLocked
UltimateCronSerialLauncher::isLockedMultiple public function Check lock for multiple jobs. Overrides UltimateCronLauncher::isLockedMultiple
UltimateCronSerialLauncher::launch public function Launcher. Overrides UltimateCronLauncher::launch
UltimateCronSerialLauncher::launchJobs public function Launch manager. Overrides UltimateCronLauncher::launchJobs
UltimateCronSerialLauncher::launchPoorman public function Poormans cron launcher.
UltimateCronSerialLauncher::lock public function Lock job. Overrides UltimateCronLauncher::lock
UltimateCronSerialLauncher::runThread public function Run jobs in thread.
UltimateCronSerialLauncher::settingsForm public function Settings form for the crontab scheduler. Overrides UltimateCronPlugin::settingsForm
UltimateCronSerialLauncher::settingsFormValidate public function Settings form validator. Overrides UltimateCronPlugin::settingsFormValidate
UltimateCronSerialLauncher::unlock public function Unlock job. Overrides UltimateCronLauncher::unlock