View source
<?php
namespace Drupal\ultimate_cron\Plugin\ultimate_cron\Launcher;
use Drupal\Core\Form\FormStateInterface;
use Drupal\ultimate_cron\CronJobInterface;
use Drupal\ultimate_cron\Launcher\LauncherBase;
use Drupal\ultimate_cron\PluginCleanupInterface;
class SerialLauncher extends LauncherBase implements PluginCleanupInterface {
public $currentThread = NULL;
public function cron_alter(&$jobs) {
$lock = \Drupal::service('ultimate_cron.lock');
if (!empty($lock->{$killable})) {
$jobs['ultimate_cron_plugin_launcher_serial_cleanup']->hook['tags'][] = 'killable';
}
}
public function defaultConfiguration() {
return array(
'timeouts' => array(
'lock_timeout' => 3600,
'max_execution_time' => 3600,
),
'launcher' => array(
'max_threads' => 1,
'thread' => 'any',
),
) + parent::defaultConfiguration();
}
public function buildConfigurationForm(array $form, FormStateInterface $form_state) {
$form['timeouts'] = array(
'#type' => 'fieldset',
'#title' => t('Timeouts'),
);
$form['launcher'] = array(
'#type' => 'fieldset',
'#title' => t('Launching options'),
);
$form['timeouts']['lock_timeout'] = array(
'#title' => t("Job lock timeout"),
'#type' => 'textfield',
'#default_value' => $this->configuration['timeouts']['lock_timeout'],
'#description' => t('Number of seconds to keep lock on job.'),
'#fallback' => TRUE,
'#required' => TRUE,
);
if (FALSE) {
$form['timeouts']['max_execution_time'] = array(
'#title' => t("Maximum execution time"),
'#type' => 'textfield',
'#default_value' => $this->configuration['timeouts']['max_execution_time'],
'#description' => t('Maximum execution time for a cron run in seconds.'),
'#fallback' => TRUE,
'#required' => TRUE,
);
$form['launcher']['max_threads'] = array(
'#title' => t("Maximum number of launcher threads"),
'#type' => 'number',
'#default_value' => $this->configuration['launcher']['max_threads'],
'#description' => t('The maximum number of launch threads that can be running at any given time.'),
'#fallback' => TRUE,
'#required' => TRUE,
'#weight' => 1,
);
return $form;
}
else {
$max_threads = isset($this->configuration['launcher']['max_threads']) ? $this->configuration['launcher']['max_threads'] : 1;
}
$options = array(
'any' => t('-- Any -- '),
'fixed' => t('-- Fixed -- '),
);
for ($i = 1; $i <= $max_threads; $i++) {
$options[$i] = $i;
}
$form['launcher']['thread'] = array(
'#title' => t("Run in thread"),
'#type' => 'select',
'#default_value' => isset($this->configuration['launcher']['thread']) ? $this->configuration['launcher']['thread'] : 'any',
'#options' => $options,
'#description' => t('Which thread to run in when invoking with ?thread=N. Note: This setting only has an effect when cron is run through cron.php with an argument ?thread=N or through Drush with --options=thread=N.'),
'#fallback' => TRUE,
'#required' => TRUE,
'#weight' => 2,
);
return $form;
}
public function settingsFormValidate(&$form, &$form_state, $job = NULL) {
$elements =& $form['configuration'][$this->type][$this->name];
$values =& $form_state['values']['configuration'][$this->type][$this->name];
if (!$job) {
if (intval($values['max_threads']) <= 0) {
form_set_error("settings[{$this->type}][{$this->name}", t('%title must be greater than 0', array(
'%title' => $elements['launcher']['max_threads']['#title'],
)));
}
}
}
public function lock(CronJobInterface $job) {
if (array_key_exists('timeouts', $this->configuration)) {
$timeout = $this->configuration['timeouts']['lock_timeout'];
}
else {
$timeout = 0;
}
$lock = \Drupal::service('ultimate_cron.lock');
if ($lock_id = $lock
->lock($job
->id(), $timeout)) {
$lock_id = $this
->getPluginId() . '-' . $lock_id;
return $lock_id;
}
return FALSE;
}
public function unlock($lock_id, $manual = FALSE) {
list($launcher, $lock_id) = explode('-', $lock_id, 2);
$lock = \Drupal::service('ultimate_cron.lock');
return $lock
->unlock($lock_id);
}
public function isLocked(CronJobInterface $job) {
$lock = \Drupal::service('ultimate_cron.lock');
$lock_id = $lock
->isLocked($job
->id());
return $lock_id ? $this->pluginId . '-' . $lock_id : $lock_id;
}
public function isLockedMultiple(array $jobs) {
$names = array();
foreach ($jobs as $job) {
$names[] = $job
->id();
}
$lock = \Drupal::service('ultimate_cron.lock');
$lock_ids = $lock
->isLockedMultiple($names);
foreach ($lock_ids as &$lock_id) {
$lock_id = $lock_id ? $this->pluginId . '-' . $lock_id : $lock_id;
}
return $lock_ids;
}
public function cleanup() {
$lock = \Drupal::service('ultimate_cron.lock');
$lock
->cleanup();
}
public function launch(CronJobInterface $job) {
\Drupal::moduleHandler()
->invokeAll('cron_pre_launch', array(
$this,
));
if ($this->currentThread) {
$init_message = t('Launched in thread @current_thread', array(
'@current_thread' => $this->currentThread,
));
}
else {
$init_message = t('Launched manually');
}
$job_launch = $job
->run($init_message);
\Drupal::moduleHandler()
->invokeAll('cron_post_launch', array(
$this,
));
return $job_launch;
}
public function findFreeThread($lock, $lock_timeout = NULL, $timeout = 3) {
$configuration = $this
->getConfiguration();
$delay = $timeout * 1000000;
$sleep = 25000;
$lock_service = \Drupal::service('ultimate_cron.lock');
do {
for ($thread = 1; $thread <= $configuration['launcher']['max_threads']; $thread++) {
if ($thread != $this->currentThread) {
$lock_name = 'ultimate_cron_serial_launcher_' . $thread;
if (!$lock_service
->isLocked($lock_name)) {
if ($lock) {
if ($lock_id = $lock_service
->lock($lock_name, $lock_timeout)) {
return array(
$thread,
$lock_id,
);
}
}
else {
return array(
$thread,
FALSE,
);
}
}
}
}
if ($delay > 0) {
usleep($sleep);
$delay = $delay - $sleep;
$sleep = min(500000, $sleep + 25000, $delay);
}
} while ($delay > 0);
return array(
FALSE,
FALSE,
);
}
public function launchJobs(array $jobs) {
$lock = \Drupal::service('ultimate_cron.lock');
$configuration = $this
->getConfiguration();
$max_execution_time = ini_get('max_execution_time');
$lock_timeout = max($max_execution_time, $configuration['timeouts']['max_execution_time']);
$lock_timeout = 55;
if (!empty($_GET['thread'])) {
self::setGlobalOption('thread', $_GET['thread']);
}
if ($thread = intval(self::getGlobalOption('thread'))) {
if ($thread < 1 || $thread > $configuration['launcher']['max_threads']) {
\Drupal::logger('serial_launcher')
->warning("Invalid thread available for starting launch thread");
return;
}
$lock_name = 'ultimate_cron_serial_launcher_' . $thread;
$lock_id = NULL;
if (!$lock
->isLocked($lock_name)) {
$lock_id = $lock
->lock($lock_name, $lock_timeout);
}
if (!$lock_id) {
\Drupal::logger('serial_launcher')
->warning("Thread @thread is already running", array(
'@thread' => $thread,
));
}
}
else {
$timeout = 1;
list($thread, $lock_id) = $this
->findFreeThread(TRUE, $lock_timeout, $timeout);
}
$this->currentThread = $thread;
if (!$thread) {
\Drupal::logger('serial_launcher')
->warning("No free threads available for launching jobs");
return;
}
if ($max_execution_time && $max_execution_time < $configuration['timeouts']['max_execution_time']) {
set_time_limit($configuration['timeouts']['max_execution_time']);
}
$this
->runThread($lock_id, $thread, $jobs);
$lock
->unlock($lock_id);
}
public function runThread($lock_id, $thread, $jobs) {
$lock = \Drupal::service('ultimate_cron.lock');
$lock_name = 'ultimate_cron_serial_launcher_' . $thread;
foreach ($jobs as $job) {
$configuration = $job
->getConfiguration('launcher');
$configuration['launcher'] += array(
'thread' => 'any',
);
switch ($configuration['launcher']['thread']) {
case 'any':
$configuration['launcher']['thread'] = $thread;
break;
case 'fixed':
$configuration['launcher']['thread'] = $job
->getUniqueID() % $configuration['launcher']['max_threads'] + 1;
break;
}
if ((!self::getGlobalOption('thread') || $configuration['launcher']['thread'] == $thread) && $job
->isScheduled()) {
$this
->launch($job);
if ($current_lock_id = $lock
->isLocked($lock_name)) {
if ($current_lock_id !== $lock_id) {
return;
}
}
else {
$lock_id = $lock
->lock($lock_name);
if (!$lock_id) {
return;
}
}
}
}
}
}