You are here

class SerialLauncher in Ultimate Cron 8.2

Ultimate Cron launcher plugin class.

Plugin annotation


@LauncherPlugin(
  id = "serial",
  title = @Translation("Serial"),
  description = @Translation("Launches scheduled jobs in the same thread and runs them consecutively."),
)

Hierarchy

Expanded class hierarchy of SerialLauncher

1 file declares its use of SerialLauncher
LauncherPluginTest.php in tests/src/Kernel/LauncherPluginTest.php

File

src/Plugin/ultimate_cron/Launcher/SerialLauncher.php, line 19

Namespace

Drupal\ultimate_cron\Plugin\ultimate_cron\Launcher
View source
class SerialLauncher extends LauncherBase implements PluginCleanupInterface {
  public $currentThread = NULL;

  /**
   * Implements hook_cron_alter().
   */
  public function cron_alter(&$jobs) {
    $lock = \Drupal::service('ultimate_cron.lock');
    if (!empty($lock->{$killable})) {
      $jobs['ultimate_cron_plugin_launcher_serial_cleanup']->hook['tags'][] = 'killable';
    }
  }

  /**
   * {@inheritdoc}
   */
  public function defaultConfiguration() {
    return array(
      'timeouts' => array(
        'lock_timeout' => 3600,
        'max_execution_time' => 3600,
      ),
      'launcher' => array(
        'max_threads' => 1,
        'thread' => 'any',
      ),
    ) + parent::defaultConfiguration();
  }

  /**
   * {@inheritdoc}
   */
  public function buildConfigurationForm(array $form, FormStateInterface $form_state) {
    $form['timeouts'] = array(
      '#type' => 'fieldset',
      '#title' => t('Timeouts'),
    );
    $form['launcher'] = array(
      '#type' => 'fieldset',
      '#title' => t('Launching options'),
    );
    $form['timeouts']['lock_timeout'] = array(
      '#title' => t("Job lock timeout"),
      '#type' => 'textfield',
      '#default_value' => $this->configuration['timeouts']['lock_timeout'],
      '#description' => t('Number of seconds to keep lock on job.'),
      '#fallback' => TRUE,
      '#required' => TRUE,
    );

    // @todo: Figure this out when converting global settings to use plugin
    // classes.
    if (FALSE) {
      $form['timeouts']['max_execution_time'] = array(
        '#title' => t("Maximum execution time"),
        '#type' => 'textfield',
        '#default_value' => $this->configuration['timeouts']['max_execution_time'],
        '#description' => t('Maximum execution time for a cron run in seconds.'),
        '#fallback' => TRUE,
        '#required' => TRUE,
      );
      $form['launcher']['max_threads'] = array(
        '#title' => t("Maximum number of launcher threads"),
        '#type' => 'number',
        '#default_value' => $this->configuration['launcher']['max_threads'],
        '#description' => t('The maximum number of launch threads that can be running at any given time.'),
        '#fallback' => TRUE,
        '#required' => TRUE,
        '#weight' => 1,
      );
      return $form;
    }
    else {
      $max_threads = isset($this->configuration['launcher']['max_threads']) ? $this->configuration['launcher']['max_threads'] : 1;
    }
    $options = array(
      'any' => t('-- Any -- '),
      'fixed' => t('-- Fixed -- '),
    );
    for ($i = 1; $i <= $max_threads; $i++) {
      $options[$i] = $i;
    }
    $form['launcher']['thread'] = array(
      '#title' => t("Run in thread"),
      '#type' => 'select',
      '#default_value' => isset($this->configuration['launcher']['thread']) ? $this->configuration['launcher']['thread'] : 'any',
      '#options' => $options,
      '#description' => t('Which thread to run in when invoking with ?thread=N. Note: This setting only has an effect when cron is run through cron.php with an argument ?thread=N or through Drush with --options=thread=N.'),
      '#fallback' => TRUE,
      '#required' => TRUE,
      '#weight' => 2,
    );
    return $form;
  }

  /**
   * {@inheritdoc}
   */
  public function settingsFormValidate(&$form, &$form_state, $job = NULL) {
    $elements =& $form['configuration'][$this->type][$this->name];
    $values =& $form_state['values']['configuration'][$this->type][$this->name];
    if (!$job) {
      if (intval($values['max_threads']) <= 0) {
        form_set_error("settings[{$this->type}][{$this->name}", t('%title must be greater than 0', array(
          '%title' => $elements['launcher']['max_threads']['#title'],
        )));
      }
    }
  }

  /**
   * {@inheritdoc}
   */
  public function lock(CronJobInterface $job) {
    if (array_key_exists('timeouts', $this->configuration)) {
      $timeout = $this->configuration['timeouts']['lock_timeout'];
    }
    else {
      $timeout = 0;
    }
    $lock = \Drupal::service('ultimate_cron.lock');
    if ($lock_id = $lock
      ->lock($job
      ->id(), $timeout)) {
      $lock_id = $this
        ->getPluginId() . '-' . $lock_id;
      return $lock_id;
    }
    return FALSE;
  }

  /**
   * {@inheritdoc}
   */
  public function unlock($lock_id, $manual = FALSE) {
    list($launcher, $lock_id) = explode('-', $lock_id, 2);
    $lock = \Drupal::service('ultimate_cron.lock');
    return $lock
      ->unlock($lock_id);
  }

  /**
   * {@inheritdoc}
   */
  public function isLocked(CronJobInterface $job) {
    $lock = \Drupal::service('ultimate_cron.lock');
    $lock_id = $lock
      ->isLocked($job
      ->id());
    return $lock_id ? $this->pluginId . '-' . $lock_id : $lock_id;
  }

  /**
   * {@inheritdoc}
   */
  public function isLockedMultiple(array $jobs) {
    $names = array();
    foreach ($jobs as $job) {
      $names[] = $job
        ->id();
    }
    $lock = \Drupal::service('ultimate_cron.lock');
    $lock_ids = $lock
      ->isLockedMultiple($names);
    foreach ($lock_ids as &$lock_id) {
      $lock_id = $lock_id ? $this->pluginId . '-' . $lock_id : $lock_id;
    }
    return $lock_ids;
  }

  /**
   * {@inheritdoc}
   */
  public function cleanup() {
    $lock = \Drupal::service('ultimate_cron.lock');
    $lock
      ->cleanup();
  }

  /**
   * {@inheritdoc}
   */
  public function launch(CronJobInterface $job) {
    \Drupal::moduleHandler()
      ->invokeAll('cron_pre_launch', array(
      $this,
    ));
    if ($this->currentThread) {
      $init_message = t('Launched in thread @current_thread', array(
        '@current_thread' => $this->currentThread,
      ));
    }
    else {
      $init_message = t('Launched manually');
    }

    // Run job.
    $job_launch = $job
      ->run($init_message);
    \Drupal::moduleHandler()
      ->invokeAll('cron_post_launch', array(
      $this,
    ));
    return $job_launch;
  }

  /**
   * {@inheritdoc}
   */
  public function findFreeThread($lock, $lock_timeout = NULL, $timeout = 3) {
    $configuration = $this
      ->getConfiguration();

    // Find a free thread, try for 3 seconds.
    $delay = $timeout * 1000000;
    $sleep = 25000;
    $lock_service = \Drupal::service('ultimate_cron.lock');
    do {
      for ($thread = 1; $thread <= $configuration['launcher']['max_threads']; $thread++) {
        if ($thread != $this->currentThread) {
          $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
          if (!$lock_service
            ->isLocked($lock_name)) {
            if ($lock) {
              if ($lock_id = $lock_service
                ->lock($lock_name, $lock_timeout)) {
                return array(
                  $thread,
                  $lock_id,
                );
              }
            }
            else {
              return array(
                $thread,
                FALSE,
              );
            }
          }
        }
      }
      if ($delay > 0) {
        usleep($sleep);

        // After each sleep, increase the value of $sleep until it reaches
        // 500ms, to reduce the potential for a lock stampede.
        $delay = $delay - $sleep;
        $sleep = min(500000, $sleep + 25000, $delay);
      }
    } while ($delay > 0);
    return array(
      FALSE,
      FALSE,
    );
  }

  /**
   * {@inheritdoc}
   */
  public function launchJobs(array $jobs) {
    $lock = \Drupal::service('ultimate_cron.lock');
    $configuration = $this
      ->getConfiguration();

    // Set proper max execution time.
    $max_execution_time = ini_get('max_execution_time');
    $lock_timeout = max($max_execution_time, $configuration['timeouts']['max_execution_time']);

    // We only lock for 55 seconds at a time, to give room for other cron
    // runs.
    // @todo: Why hard-code this?
    $lock_timeout = 55;
    if (!empty($_GET['thread'])) {
      self::setGlobalOption('thread', $_GET['thread']);
    }
    if ($thread = intval(self::getGlobalOption('thread'))) {
      if ($thread < 1 || $thread > $configuration['launcher']['max_threads']) {
        \Drupal::logger('serial_launcher')
          ->warning("Invalid thread available for starting launch thread");
        return;
      }
      $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
      $lock_id = NULL;
      if (!$lock
        ->isLocked($lock_name)) {
        $lock_id = $lock
          ->lock($lock_name, $lock_timeout);
      }
      if (!$lock_id) {
        \Drupal::logger('serial_launcher')
          ->warning("Thread @thread is already running", array(
          '@thread' => $thread,
        ));
      }
    }
    else {
      $timeout = 1;
      list($thread, $lock_id) = $this
        ->findFreeThread(TRUE, $lock_timeout, $timeout);
    }
    $this->currentThread = $thread;
    if (!$thread) {
      \Drupal::logger('serial_launcher')
        ->warning("No free threads available for launching jobs");
      return;
    }
    if ($max_execution_time && $max_execution_time < $configuration['timeouts']['max_execution_time']) {
      set_time_limit($configuration['timeouts']['max_execution_time']);
    }
    $this
      ->runThread($lock_id, $thread, $jobs);
    $lock
      ->unlock($lock_id);
  }

  /**
   * {@inheritdoc}
   */
  public function runThread($lock_id, $thread, $jobs) {
    $lock = \Drupal::service('ultimate_cron.lock');
    $lock_name = 'ultimate_cron_serial_launcher_' . $thread;
    foreach ($jobs as $job) {
      $configuration = $job
        ->getConfiguration('launcher');
      $configuration['launcher'] += array(
        'thread' => 'any',
      );
      switch ($configuration['launcher']['thread']) {
        case 'any':
          $configuration['launcher']['thread'] = $thread;
          break;
        case 'fixed':
          $configuration['launcher']['thread'] = $job
            ->getUniqueID() % $configuration['launcher']['max_threads'] + 1;
          break;
      }
      if ((!self::getGlobalOption('thread') || $configuration['launcher']['thread'] == $thread) && $job
        ->isScheduled()) {
        $this
          ->launch($job);

        // Be friendly, and check if someone else has taken the lock.
        // If they have, bail out, since someone else is now handling
        // this thread.
        if ($current_lock_id = $lock
          ->isLocked($lock_name)) {
          if ($current_lock_id !== $lock_id) {
            return;
          }
        }
        else {

          // If lock is free, then take the lock again.
          $lock_id = $lock
            ->lock($lock_name);
          if (!$lock_id) {

            // Race-condition, someone beat us to it.
            return;
          }
        }
      }
    }
  }

}

Members

Namesort descending Modifiers Type Description Overrides
CronPlugin::$globalOptions public static property
CronPlugin::$instances public static property
CronPlugin::$multiple public static property 1
CronPlugin::$weight public property
CronPlugin::calculateDependencies public function Calculates dependencies for the configured plugin. Overrides DependentPluginInterface::calculateDependencies
CronPlugin::cleanForm public function Clean form of empty fallback values.
CronPlugin::drupal_array_remove_nested_value public function Modified version drupal_array_get_nested_value().
CronPlugin::fallbackalize public function Process fallback form parameters.
CronPlugin::getConfiguration public function Gets this plugin's configuration. Overrides ConfigurableInterface::getConfiguration
CronPlugin::getGlobalOption public static function Get global plugin option.
CronPlugin::getGlobalOptions public static function Get all global plugin options.
CronPlugin::getPluginTypes public static function Returns a list of plugin types.
CronPlugin::isValid public function Default plugin valid for all jobs. 1
CronPlugin::setConfiguration public function Sets the configuration for this plugin instance. Overrides ConfigurableInterface::setConfiguration
CronPlugin::setGlobalOption public static function Set global plugin option.
CronPlugin::settingsLabel public function Get label for a specific setting. 1
CronPlugin::submitConfigurationForm public function Form submission handler. Overrides PluginFormInterface::submitConfigurationForm
CronPlugin::validateConfigurationForm public function Form validation handler. Overrides PluginFormInterface::validateConfigurationForm 1
CronPlugin::__construct public function Constructs a \Drupal\Component\Plugin\PluginBase object. Overrides PluginBase::__construct 2
DependencySerializationTrait::$_entityStorages protected property An array of entity type IDs keyed by the property name of their storages.
DependencySerializationTrait::$_serviceIds protected property An array of service IDs keyed by property name used for serialization.
DependencySerializationTrait::__sleep public function 1
DependencySerializationTrait::__wakeup public function 2
LauncherBase::finishProgress public function Default implementation of finishProgress(). Overrides LauncherInterface::finishProgress
LauncherBase::formatProgress public function Default implementation of formatProgress(). Overrides LauncherInterface::formatProgress
LauncherBase::formatRunning public function Format running state. Overrides LauncherInterface::formatRunning
LauncherBase::formatUnfinished public function Format unfinished state. Overrides LauncherInterface::formatUnfinished
LauncherBase::getProgress public function Default implementation of getProgress(). Overrides LauncherInterface::getProgress
LauncherBase::getProgressMultiple public function Default implementation of getProgressMultiple(). Overrides LauncherInterface::getProgressMultiple
LauncherBase::initializeProgress public function Default implementation of initializeProgress(). Overrides LauncherInterface::initializeProgress
LauncherBase::setProgress public function Default implementation of setProgress(). Overrides LauncherInterface::setProgress
MessengerTrait::$messenger protected property The messenger. 29
MessengerTrait::messenger public function Gets the messenger. 29
MessengerTrait::setMessenger public function Sets the messenger.
PluginBase::$configuration protected property Configuration information passed into the plugin. 1
PluginBase::$pluginDefinition protected property The plugin implementation definition. 1
PluginBase::$pluginId protected property The plugin_id.
PluginBase::DERIVATIVE_SEPARATOR constant A string which is used to separate base plugin IDs from the derivative ID.
PluginBase::getBaseId public function Gets the base_plugin_id of the plugin instance. Overrides DerivativeInspectionInterface::getBaseId
PluginBase::getDerivativeId public function Gets the derivative_id of the plugin instance. Overrides DerivativeInspectionInterface::getDerivativeId
PluginBase::getPluginDefinition public function Gets the definition of the plugin implementation. Overrides PluginInspectionInterface::getPluginDefinition 3
PluginBase::getPluginId public function Gets the plugin_id of the plugin instance. Overrides PluginInspectionInterface::getPluginId
PluginBase::isConfigurable public function Determines if the plugin is configurable.
SerialLauncher::$currentThread public property
SerialLauncher::buildConfigurationForm public function Form constructor. Overrides CronPlugin::buildConfigurationForm
SerialLauncher::cleanup public function Cleans and purges data stored by this plugin. Overrides PluginCleanupInterface::cleanup
SerialLauncher::cron_alter public function Implements hook_cron_alter().
SerialLauncher::defaultConfiguration public function Gets default configuration for this plugin. Overrides CronPlugin::defaultConfiguration
SerialLauncher::findFreeThread public function
SerialLauncher::isLocked public function Check if a job is locked. Overrides LauncherInterface::isLocked
SerialLauncher::isLockedMultiple public function Fallback implementation of multiple lock check. Overrides LauncherBase::isLockedMultiple
SerialLauncher::launch public function Launch job. Overrides LauncherInterface::launch
SerialLauncher::launchJobs public function Default implementation of jobs launcher. Overrides LauncherBase::launchJobs
SerialLauncher::lock public function Lock job. Overrides LauncherInterface::lock
SerialLauncher::runThread public function
SerialLauncher::settingsFormValidate public function
SerialLauncher::unlock public function Unlock a lock. Overrides LauncherInterface::unlock
StringTranslationTrait::$stringTranslation protected property The string translation service. 1
StringTranslationTrait::formatPlural protected function Formats a string containing a count of items.
StringTranslationTrait::getNumberOfPlurals protected function Returns the number of plurals supported by a given language.
StringTranslationTrait::getStringTranslation protected function Gets the string translation service.
StringTranslationTrait::setStringTranslation public function Sets the string translation service to use. 2
StringTranslationTrait::t protected function Translates a string to the current language or to a given language.