background_process.module in Background Process 8
Same filename and directory in other branches
This module implements a framework for calling funtions in the background.
$handle = background_process_start('mymodule_dosomething', $myvar1, $myvar2);
$handle = background_process_start(array(
'myclass',
'mystaticfunction',
), $myvar1, $myvar2);
$handle = background_process_start(array(
$myobject,
'mymethod',
), $myvar1, $myvar2);
$handle = background_process_start_locked('dontrunconcurrently', 'mymodule_dosomething', $myvar1, $myvar2);
For implementation of load balancing functionality:
See also
hook_service_group()
File
background_process.moduleView source
<?php
/**
* @file
* This module implements a framework for calling funtions in the background.
*
* @code
* $handle = background_process_start('mymodule_dosomething', $myvar1, $myvar2);
* $handle = background_process_start(array('myclass', 'mystaticfunction'), $myvar1, $myvar2);
* $handle = background_process_start(array($myobject, 'mymethod'), $myvar1, $myvar2);
* $handle = background_process_start_locked('dontrunconcurrently', 'mymodule_dosomething', $myvar1, $myvar2);
* @endcode
*
* For implementation of load balancing functionality:
* @see hook_service_group()
*/
use Drupal\Component\Utility\Timer;
use Drupal\Core\Database\Database;
include 'background_process.class.php';
/**
* Define Default Values.
*/
const BACKGROUND_PROCESS_SERVICE_TIMEOUT = 0;
const BACKGROUND_PROCESS_CONNECTION_TIMEOUT = 2;
const BACKGROUND_PROCESS_STREAM_TIMEOUT = 2;
const BACKGROUND_PROCESS_CLEANUP_AGE = 120;
const BACKGROUND_PROCESS_CLEANUP_AGE_RUNNING = 8 * 3600;
const BACKGROUND_PROCESS_CLEANUP_AGE_QUEUE = 86400;
const BACKGROUND_PROCESS_REDISPATCH_THRESHOLD = 10;
/**
* Define Default Values For Process.
*/
const BACKGROUND_PROCESS_STATUS_NONE = 0;
const BACKGROUND_PROCESS_STATUS_LOCKED = 1;
const BACKGROUND_PROCESS_STATUS_RUNNING = 2;
const BACKGROUND_PROCESS_STATUS_QUEUED = 3;
/**
* Implements hook_permission().
*/
function background_process_permission() {
return [
'administer background process' => [
'title' => t('Administer background processes'),
'description' => t('Perform administration tasks for background processes.'),
],
];
}
/**
* Implements hook_cron().
*/
function background_process_cron() {
// Don't use more than 120 seconds to unlock.
$expire = 120;
@set_time_limit($expire);
// Cleanup old handles.
$time = time();
$msg = t('Never started (auto unlock due to timeout)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$result = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, [
':start' => $time - \Drupal::config('background_process.settings')
->get('background_process_cleanup_age'),
':status' => BACKGROUND_PROCESS_STATUS_LOCKED,
]);
$handles = $result
->fetchAllAssoc('handle', PDO::FETCH_ASSOC);
foreach ($handles as $handle => $process) {
// Unlock the process.
if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
drupal_set_message(t("%handle unlocked: @msg", [
'%handle' => $handle,
'@msg' => $msg,
]));
}
else {
drupal_set_message(t("%handle could not be unlocked: @msg", [
'%handle' => $handle,
'@msg' => $msg,
]), 'error');
}
}
} while (!empty($handles));
// Cleanup stale requests.
$time = time();
$msg = t('Never finished (auto unlock due to long run)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$results = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, [
':start' => $time - \Drupal::config('background_process.settings')
->get('background_process_cleanup_age_running'),
':status' => BACKGROUND_PROCESS_STATUS_RUNNING,
]);
$handles = $result
->fetchAllAssoc('handle', PDO::FETCH_ASSOC);
foreach ($handles as $handle => $process) {
// Unlock the process.
if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
drupal_set_message(t("%handle unlocked: @msg", [
'%handle' => $handle,
'@msg' => $msg,
]));
}
else {
drupal_set_message(t("%handle could not be unlocked: @msg", [
'%handle' => $handle,
'@msg' => $msg,
]), 'error');
}
}
} while (!empty($results));
// Cleanup queued requests that were never processed.
$time = time();
$msg = t('Never picked up by cron worker (auto unlock due to timeout)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$results = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, [
':start' => $time - \Drupal::config('background_process.settings')
->get('background_process_cleanup_age_queue'),
':status' => BACKGROUND_PROCESS_STATUS_QUEUED,
]);
$handles = $result
->fetchAllAssoc('handle', PDO::FETCH_ASSOC);
foreach ($handles as $handle => $process) {
// Unlock the process.
if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
drupal_set_message(t("%handle unlocked: @msg", [
'%handle' => $handle,
'@msg' => $msg,
]));
}
else {
drupal_set_message(t("%handle could not be unlocked: @msg", [
'%handle' => $handle,
'@msg' => $msg,
]), 'error');
}
}
} while (!empty($results));
}
/**
* Implements hook_cron_alter().
*/
function background_process_cron_alter(&$items) {
$items['background_process_cron']['override_congestion_protection'] = TRUE;
$handle_prefix = \Drupal::config('ultimate_cron_handle_prefix')
->get('uc:');
if ($process = background_process_get_process($handle_prefix . 'background_process_cron')) {
if ($process->start + 30 < time()) {
background_process_unlock($process->handle, t('Self unlocking stale lock'), $process->start);
}
}
}
/**
* Implements hook_cronapi().
*/
function background_process_cronapi($op, $job = NULL) {
switch ($op) {
case 'list':
return [
'background_process_cron' => t('Cleanup old process handles'),
];
case 'rule':
return '* * * * *';
case 'configure':
return 'admin/config/system/background-process';
}
}
/**
* Implements hook_service_group().
*/
function background_process_service_group() {
$info = [];
$info['methods']['background_process_service_group_random'] = t('Random');
$info['methods']['background_process_service_group_round_robin'] = t('Pseudo round-robin');
return $info;
}
/**
* Implements to Load balancing based on random pick.
*/
function background_process_service_group_random($service_group) {
return $service_group['hosts'][rand(0, count($service_group['hosts']) - 1)];
}
/**
* Implements for Round-robin load balancing based on random pick.
*/
function background_process_service_group_round_robin($service_group) {
static $idx = NULL;
if (isset($idx)) {
$idx = ($idx + 1) % count($service_group['hosts']);
}
else {
$idx = rand(0, count($service_group['hosts']) - 1);
}
return $service_group['hosts'][$idx];
}
/**
* Implements to Access handler for service call.
*/
function background_process_service_access($handle, $token) {
// Setup service.
ignore_user_abort(TRUE);
// Damn those slashes!
$handle = rawurldecode($handle);
$token = rawurldecode($token);
// Ensure no session!
drupal_save_session(FALSE);
unset($_SESSION);
$process = background_process_get_process($handle);
if (!$process) {
\Drupal::logger('bg_process')
->notice('Unknown process: %handle', [
'%handle' => $handle,
]);
return FALSE;
}
if ($token !== $process->token) {
\Drupal::logger('bg_process')
->notice('Invalid token: %token for handle: %handle', [
'%token' => $token,
'%handle' => $handle,
]);
return FALSE;
}
// Login as the user that requested the call.
$user = \Drupal::currentUser();
if ($process->uid) {
$load_user = \Drupal::entityManager()
->getStorage('user')
->load($process->uid);
if (!$load_user) {
// Invalid user!
\Drupal::logger('bg_process')
->notice('Invalid user: %uid for handle: %handle', [
'%uid' => $process->uid,
'%handle' => $handle,
]);
return FALSE;
}
$user = $load_user;
}
else {
$user = drupal_anonymous_user();
}
return TRUE;
}
/**
* Implements hook_init().
*/
function background_process_init() {
// Only determine if we're told to do so.
if (empty($_SESSION['background_process_determine_default_service_host'])) {
return;
}
// Don't determine on check-token page, to avoid infinite loop.
if (strpos($_GET['q'], 'background-process/check-token') === 0) {
return;
}
if (\Drupal::config('install_task')
->get() != 'done') {
return;
}
// Determine the default service host.
background_process_determine_and_save_default_service_host();
unset($_SESSION['background_process_determine_default_service_host']);
}
/**
* Implements hook_cron_queue_info().
*/
function background_process_cron_queue_info() {
$queues['background_process'] = [
'worker callback' => '_background_process_queue',
];
$background_process_queues = \Drupal::config('background_process.settings')
->get('background_process_queues');
foreach ($background_process_queues as $queue_name) {
$queues['bgp:' . $queue_name] = [
'worker callback' => '_background_process_queue',
];
}
return $queues;
}
/**
* Worker callback for processing queued function call.
*/
function _background_process_queue($item) {
$oldhandle = background_process_current_handle();
list($handle, $token) = $item;
if (background_process_service_access($handle, $token)) {
try {
background_process_service_execute(rawurldecode($handle), TRUE);
background_process_current_handle($oldhandle);
} catch (Exception $e) {
background_process_current_handle($oldhandle);
background_process_update_status(rawurldecode($handle), BACKGROUND_PROCESS_STATUS_QUEUED);
throw $e;
}
}
}
/**
* Implements to Get/set current handle.
*/
function background_process_current_handle($handle = NULL) {
static $current_handle = NULL;
if (isset($handle)) {
$current_handle = $handle;
}
return $current_handle;
}
/**
* Implements to Get a unique handle based on a callback.
*/
function background_process_generate_handle($callback) {
return md5(serialize($callback) . ':' . microtime(TRUE) . ':' . rand(1, 5000));
}
/**
* Implements to Start background process.
*/
function background_process_start($callback) {
$process = new BackgroundProcess();
$args = func_get_args();
array_splice($args, 0, 1);
$result = $process
->start($callback, $args);
return $result ? $process->handle : $result;
}
/**
* Implements to Start locked background process.
*/
function background_process_start_locked($handle, $callback) {
$process = new BackgroundProcess($handle);
$args = func_get_args();
array_splice($args, 0, 2);
$result = $process
->start($callback, $args);
return $result ? $process->handle : $result;
}
/**
* Implements Queue the function call passing function arguments.
*/
function background_process_queue($callback) {
$process = new BackgroundProcess();
$args = func_get_args();
array_splice($args, 0, 1);
return $process
->queue($callback, $args);
}
/**
* Implements to Queue locked background process.
*/
function background_process_queue_locked($handle, $callback) {
$process = new BackgroundProcess($handle);
$args = func_get_args();
array_splice($args, 0, 2);
return $process
->queue($callback, $args);
}
/**
* Implements to Cleanup cache menu and ensure all locks are released (again).
*/
function _background_process_cleanup_menu($cid) {
drupal_flush_all_caches('cache_menu', $cid);
// Release locks in case cache_clear_all() set's a lock and lock_release_all()
// has already been run.
\Drupal::lock()
->releaseAll();
}
/**
* Implements to Call the function requested by the service call.
*/
function background_process_service_start($handle, $return = FALSE) {
header('Content-Type', 'text/plain');
// Let's clean up the mess the menu-router system leaves behind.
$cid = 'menu_item:' . hash('sha256', $_GET['q']);
drupal_register_shutdown_function('_background_process_cleanup_menu', $cid);
// Setup service.
ignore_user_abort(TRUE);
@set_time_limit(\Drupal::config('background_process.settings')
->get('background_process_service_timeout'));
$handle = rawurldecode($handle);
return background_process_service_execute($handle, $return);
}
/**
* Implements to Execute the service.
*/
function background_process_service_execute($handle, $return = FALSE) {
// Access handler.
$process = background_process_get_process($handle);
if (!$process) {
\Drupal::logger('bg_process')
->error('Process not found for handle: %handle', [
'%handle' => $handle,
]);
if ($return) {
return;
}
else {
exit;
}
}
$process->start_stamp = microtime(TRUE);
try {
$old_db = Database::setActiveConnection('background_process');
$claimed = db_update('background_process')
->fields([
'start_stamp' => sprintf("%.06f", $process->start_stamp),
'exec_status' => BACKGROUND_PROCESS_STATUS_RUNNING,
])
->condition('handle', $handle)
->condition('exec_status', [
BACKGROUND_PROCESS_STATUS_LOCKED,
BACKGROUND_PROCESS_STATUS_QUEUED,
], 'IN')
->execute();
Database::setActiveConnection($old_db);
if ($claimed) {
$process->exec_status = BACKGROUND_PROCESS_STATUS_RUNNING;
$process = BackgroundProcess::load($process);
$process
->sendMessage('claimed');
background_process_current_handle($handle);
}
else {
if ($return) {
return;
}
else {
exit;
}
}
} catch (Exception $e) {
Database::setActiveConnection($old_db);
throw $e;
}
// Make sure the process is removed when we're done.
if (!$return) {
drupal_register_shutdown_function('background_process_remove_process', $process->handle, $process->start_stamp);
}
if (is_callable($process->callback)) {
$old_db = NULL;
try {
if (!$return) {
drupal_register_shutdown_function('module_invoke_all', 'background_process_shutdown', $process);
}
$callback = _background_process_callback_name($process->callback);
$old_db = Database::setActiveConnection('background_process');
progress_initialize_progress($handle, "Background process '{$callback}' initialized");
Database::setActiveConnection($old_db);
call_user_func_array($process->callback, $process->args);
$old_db = Database::setActiveConnection('background_process');
progress_end_progress($handle, "Background process '{$callback}' finished");
Database::setActiveConnection($old_db);
if ($return) {
background_process_remove_process($process->handle, $process->start_stamp);
\Drupal::moduleHandler()
->invokeAll('background_process_shutdown', [
$process,
]);
}
} catch (Exception $e) {
// Exception occurred, switch back to proper db if necessary.
if ($old_db) {
Database::setActiveConnection($old_db);
}
if (!$return) {
\Drupal::moduleHandler()
->invokeAll('background_process_shutdown', [
$process,
(string) $e,
]);
}
throw $e;
}
}
else {
// Function not found.
\Drupal::logger('bg_process')
->error('Callback: %callback not found', [
'%callback' => $process->callback,
]);
}
if ($return) {
return;
}
else {
exit;
}
}
/**
* Implements to Restart the current background process.
*/
function background_process_restart() {
$args = func_get_args();
call_user_func_array('background_process_keepalive', $args);
exit;
}
/**
* Implements to Keep process alive.
*/
function background_process_keepalive() {
$args = func_get_args();
$handle = background_process_current_handle();
if (!$handle) {
throw new Exception(t('Background process handle %handle not found', [
'%handle' => $handle,
]));
}
$process = background_process_get_process($handle);
if (!$process) {
throw new Exception(t('Background process %handle not found', [
'%handle' => $handle,
]));
}
drupal_register_shutdown_function('_background_process_restart', $process, $args);
}
/**
* Implements to Check if the background process has started.
*/
function background_process_is_started($handle) {
$old_db = Database::setActiveConnection('background_process');
$progress = progress_get_progress($handle);
Database::setActiveConnection($old_db);
return !empty($progress);
}
/**
* Implements to Check if the background process has finished.
*/
function background_process_is_finished($handle) {
$old_db = Database::setActiveConnection('background_process');
$progress = progress_get_progress($handle);
Database::setActiveConnection($old_db);
return empty($progress) || $progress->end;
}
/**
* Implements to Set background process.
*/
function background_process_set_process($handle, $callback, $uid, $args, $token) {
// Setup parameters.
$args = serialize($args);
$callback = serialize($callback);
// Get user.
if (!isset($uid)) {
$user = \Drupal::currentUser();
$uid = $user->uid;
if ($uid == '') {
$uid = '0';
}
}
try {
$old_db = Database::setActiveConnection('background_process');
$result = db_update('background_process')
->fields([
'callback' => $callback,
'args' => $args,
'uid' => $uid,
'token' => $token,
])
->condition('handle', $handle)
->execute();
Database::setActiveConnection($old_db);
return $result;
} catch (Exception $e) {
Database::setActiveConnection($old_db);
throw $e;
}
}
/**
* Implements to Lock process.
*/
function background_process_lock_process($handle, $status = BACKGROUND_PROCESS_STATUS_LOCKED) {
try {
$old_db = Database::setActiveConnection('background_process');
db_insert('background_process')
->fields([
'handle' => $handle,
'start_stamp' => sprintf("%.06f", microtime(TRUE)),
'exec_status' => $status,
])
->execute();
Database::setActiveConnection($old_db);
_background_process_ensure_cleanup($handle);
return TRUE;
} catch (Exception $e) {
Database::setActiveConnection($old_db);
return FALSE;
}
}
/**
* Implements to Set status for background process.
*/
function background_process_update_status($handle, $status) {
db_update('background_process')
->fields([
'exec_status' => $status,
])
->condition('handle', $handle)
->execute();
}
/**
* Get background process.
*/
function background_process_get_process($handle) {
try {
$old_db = Database::setActiveConnection('background_process');
$result = db_select('background_process', 'bp')
->fields('bp', [
'handle',
'callback',
'args',
'uid',
'token',
'service_host',
'start_stamp',
'exec_status',
])
->condition('handle', $handle)
->execute()
->fetchObject();
Database::setActiveConnection($old_db);
} catch (Exception $e) {
Database::setActiveConnection($old_db);
throw $e;
}
if ($result) {
$result->args = unserialize($result->args);
$result->callback = unserialize($result->callback);
$result->start = $result->start_stamp;
$result->status = $result->exec_status;
return $result;
}
return FALSE;
}
/**
* Get background process.
*/
function background_process_get_processes($status = NULL) {
$old_db = Database::setActiveConnection('background_process');
$result = db_select('background_process', 'bp')
->fields('bp', [
'handle',
'callback',
'args',
'uid',
'token',
'service_host',
'start_stamp',
'exec_status',
]);
if (isset($status)) {
$result = $result
->condition('bp.status', $status);
}
$result = $result
->execute();
$processes = [];
while ($process = $result
->fetchObject()) {
$process->args = unserialize($process->args);
$process->callback = unserialize($process->callback);
$process->start = $process->start_stamp;
$process->status = $process->exec_status;
$processes[] = $process;
}
Database::setActiveConnection($old_db);
return $processes;
}
/**
* Implements to Remove a background process.
*/
function background_process_remove_process($handle, $start = NULL) {
$old_db = Database::setActiveConnection('background_process');
if (isset($start)) {
$result = db_delete('background_process')
->condition('handle', $handle)
->condition('start_stamp', sprintf("%.06f", $start), '=')
->execute();
}
else {
$result = db_delete('background_process')
->condition('handle', $handle)
->execute();
}
Database::setActiveConnection($old_db);
return $result;
}
/**
* Unlock background process.
*/
function background_process_unlock($handle, $msg = NULL, $start = NULL) {
$process = background_process_get_process($handle);
if ($process && (!isset($start) || $start === $process->start)) {
// Unlock the process.
if (background_process_remove_process($process->handle, $process->start)) {
$user = \Drupal::currentUser();
$username = $user->uid ? $user->name : t('anonymous');
\Drupal::moduleHandler()
->invokeAll('background_process_shutdown', [
$process,
$msg ? $msg : t('Manually unlocked by !name', [
'!name' => $username,
]),
]);
return TRUE;
}
}
return FALSE;
}
/**
* Implements to Set a service host for a background process.
*/
function background_process_set_service_host($handle, $service_host) {
try {
$old_db = Database::setActiveConnection('background_process');
$result = db_update('background_process')
->fields([
'service_host' => $service_host ? $service_host : '',
])
->condition('handle', $handle)
->execute();
Database::setActiveConnection($old_db);
return $result;
} catch (Exception $e) {
Database::setActiveConnection($old_db);
throw $e;
}
}
/**
* Implements to Get service hosts defined in the system.
*/
function background_process_get_service_groups() {
$service_groups = \Drupal::config('background_process.settings')
->get('background_process_service_groups');
$service_groups = [
'default' => [
'hosts' => [
\Drupal::config('background_process.settings')
->get('background_process_default_service_host'),
],
],
];
foreach ($service_groups as &$service_group) {
$service_group += [
'method' => 'background_process_service_group_round_robin',
];
}
return $service_groups;
}
/**
* Implements to Determine host for current installation.
*/
function background_process_determine_default_service_host() {
$token = md5(session_id() . md5(uniqid(mt_rand(), TRUE)) . md5(uniqid(mt_rand(), TRUE)));
\Drupal::configFactory()
->getEditable('background_process.settings')
->set('background_process_token', $token)
->save();
global $conf;
$auth = isset($_SERVER['PHP_AUTH_USER']) ? $_SERVER['PHP_AUTH_USER'] . ':' . $_SERVER['PHP_AUTH_PW'] . '@' : '';
$scheme = isset($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://';
global $base_url;
$url = parse_url($base_url);
$path = empty($url['path']) ? '' : $url['path'];
$candidates = [
[
'base_url' => $base_url,
],
[
'base_url' => $scheme . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
],
[
'base_url' => $scheme . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
],
[
'base_url' => $scheme . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
],
[
'base_url' => $scheme . $auth . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
],
[
'base_url' => $scheme . $auth . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
],
[
'base_url' => $scheme . $auth . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
],
];
$found = NULL;
foreach ($candidates as $i => $candidate) {
$conf['background_process_service_hosts']['__test'] = $candidate;
list($url, $headers) = background_process_build_request('background-process/check-token', '__test');
if (empty($results[$url])) {
$results[$url] = background_process_http_request($url, [
'headers' => $headers,
'postpone' => TRUE,
'candidate' => $i,
'method' => 'POST',
]);
}
}
background_process_http_request_process($results);
foreach ($results as $result) {
if ($result->code == 200) {
if ($token === substr($result->data, 0, strlen($token))) {
$found = $candidates[$result->options['candidate']];
break;
}
}
}
if ($found) {
return $found;
}
return FALSE;
}
/**
* Implements to Build url and headers for http request.
*/
function background_process_build_request($url, $service_hostname = NULL, $options = []) {
$service_hosts = background_process_get_service_hosts();
if (!$service_hostname || empty($service_hosts[$service_hostname])) {
$service_hostname = 'default';
}
$options += [
'absolute' => TRUE,
'base_url' => $service_hosts,
];
$url = $service_hosts . '/' . $url;
$parsed = parse_url($url);
$host = !empty($service_hosts['http_host']) ? $service_hosts['http_host'] : (isset($parsed['host']) ? isset($parsed['port']) ? $parsed['host'] . ':' . $parsed['port'] : $parsed['host'] : NULL);
$headers = _background_process_request_headers();
$headers = _background_process_filter_headers($headers);
$headers['User-Agent'] = \Drupal::config('background_process.settings')
->get('background_process_user_agent');
$headers['Host'] = $host;
$headers['Connection'] = 'close';
if (isset($parsed['user'])) {
$headers['Authorization'] = 'Basic ' . base64_encode($parsed['user'] . ':' . $parsed['pass']);
}
return [
$url,
$headers,
];
}
/**
* Implements to Transform header array from key/value to strings.
*/
function background_process_build_headers($headers) {
$header = [];
foreach ($headers as $key => $value) {
$header[] = "{$key}: {$value}";
}
return $header;
}
/**
* Implements to Perform an http request.
*/
function background_process_http_request($url, array $options = []) {
// Parse the URL and make sure we can handle the schema.
$result = new stdClass();
$result->url = $url;
$result->options = $options;
$result->code = NULL;
$uri = @parse_url($url);
$result->uri = $uri;
if ($uri == FALSE) {
$result->error = 'unable to parse URL';
$result->code = -1001;
return _background_process_http_request_result($result);
}
if (!isset($uri['scheme'])) {
$result->error = 'missing schema';
$result->code = -1002;
return _background_process_http_request_result($result);
}
// Set default context to enable/disable SSL verification.
$default_context = stream_context_create([
'ssl' => [
'verify_peer' => \Drupal::config('background_process.settings')
->get('background_process_ssl_verification'),
'verify_peer_name' => \Drupal::config('background_process.settings')
->get('background_process_ssl_verification'),
],
]);
// Merge the default options.
$options += [
'headers' => [],
'method' => 'GET',
'data' => NULL,
'max_redirects' => 3,
'timeout' => \Drupal::config('background_process.settings')
->get('background_process_connection_timeout'),
'context' => $default_context,
'blocking' => FALSE,
'postpone' => FALSE,
];
// Stream_socket_client() requires timeout to be a float.
$options['timeout'] = (double) $options['timeout'];
$host = NULL;
switch ($uri['scheme']) {
case 'http':
case 'feed':
$port = isset($uri['port']) ? $uri['port'] : 80;
$socket = 'tcp://' . $uri['host'] . ':' . $port;
// Checking the host that do not take into account the port number.
$host = $uri['host'] . ($port != 80 ? ':' . $port : '');
break;
case 'https':
// Note: Only works when PHP is compiled with OpenSSL support.
$port = isset($uri['port']) ? $uri['port'] : 443;
$socket = 'ssl://' . $uri['host'] . ':' . $port;
$host = $uri['host'] . ($port != 443 ? ':' . $port : '');
break;
default:
$result->error = 'invalid schema ' . $uri['scheme'];
$result->code = -1003;
return _background_process_http_request_result($result);
}
if (!empty($host) && empty($options['headers']['Host'])) {
$options['headers']['Host'] = $host;
}
$result->options = $options;
$result->socket = $socket;
$result->postponed = $options['postpone'];
if ($result->postponed) {
return $result;
}
else {
return background_process_http_request_initiate($result);
}
}
/**
* Initiate the http request.
*/
function background_process_http_request_initiate(&$result) {
Timer[$result]['start'];
$options = $result->options;
$socket = $result->socket;
$uri = $result->uri;
$result->start = microtime(TRUE);
$result->data_ready = TRUE;
$result->response = '';
if (empty($options['context'])) {
$fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout']);
}
else {
// Create a stream with context. Allows verification of a SSL certificate.
$fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout'], STREAM_CLIENT_CONNECT, $options['context']);
}
// Make sure the socket opened properly.
if (!$fp) {
// When a network error occurs, we use a negative number so it does not
// clash with the HTTP status codes.
$result->code = -$errno;
$result->error = trim($errstr) ? trim($errstr) : t('Error opening socket @socket', [
'@socket' => $socket,
]);
\Drupal::state()
->set('drupal_http_request_fails', TRUE);
return _background_process_http_request_result($result);
}
$result->fp = $fp;
// Construct the path to act on.
$path = isset($uri['path']) ? $uri['path'] : '/';
if (isset($uri['query'])) {
$path .= '?' . $uri['query'];
}
// Merge the default headers.
$options['headers'] += [
'User-Agent' => 'Drupal (+http://drupal.org/)',
];
// Only add Content-Length if we actually have any content or if it is a POST
// or PUT request. Some non-standard servers get confused by Content-Length in
// at least HEAD/GET requests, and Squid always requires Content-Length in
// POST/PUT requests.
$content_length = strlen($options['data']);
if ($content_length > 0 || $options['method'] == 'POST' || $options['method'] == 'PUT') {
$options['headers']['Content-Length'] = $content_length;
}
// If the server URL has a user then attempt to use basic authentication.
if (isset($uri['user'])) {
$options['headers']['Authorization'] = 'Basic ' . base64_encode($uri['user'] . (isset($uri['pass']) ? ':' . $uri['pass'] : ''));
}
// If the database prefix is being used
// by SimpleTest to run the tests in a copied.
// database then set the user-agent header to the database prefix so that any
// calls to other Drupal pages will run the SimpleTest prefixed database. The
// user-agent is used to ensure that multiple testing sessions running at the
// same time won't interfere with each other as they would if the database
// prefix were stored statically in a file or database variable.
$test_info =& $GLOBALS['drupal_test_info'];
if (!empty($test_info['test_run_id'])) {
$options['headers']['User-Agent'] = drupal_generate_test_ua($test_info['test_run_id']);
}
$request = $options['method'] . ' ' . $path . " HTTP/1.0\r\n";
foreach ($options['headers'] as $name => $value) {
$request .= $name . ': ' . trim($value) . "\r\n";
}
$request .= "\r\n" . $options['data'];
$result->request = $request;
// Calculate how much time is left of the original timeout value.
$timeout = $options['timeout'] - Timer[$result]['read'] / 1000;
if ($timeout > 0) {
stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1)));
fwrite($fp, $request);
stream_set_blocking($fp, 0);
}
if (!empty($options['blocking'])) {
return background_process_http_request_get_response($result);
}
return $result;
}
/**
* Get response for an http request.
*/
function background_process_http_request_get_response(&$result) {
if ($result->postponed) {
$result->postponed = FALSE;
return background_process_http_request_initiate($result);
}
if (isset($result->code)) {
return $result;
}
$fp = $result->fp;
$options = $result->options;
Timer[$result]['start'];
if (!empty($options['blocking'])) {
stream_set_blocking($fp, 1);
}
$info = stream_get_meta_data($fp);
$alive = !$info['eof'] && !$info['timed_out'];
while ($alive) {
// Calculate how much time is left of the original timeout value.
$timeout = $options['timeout'] - Timer[$result]['read'] / 1000;
if ($timeout <= 0) {
$info['timed_out'] = TRUE;
break;
}
stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1)));
$chunk = fread($fp, 1024);
$result->response .= $chunk;
$result->data_ready = empty($chunk) ? FALSE : TRUE;
$info = stream_get_meta_data($fp);
$alive = !$info['eof'] && !$info['timed_out'];
if (empty($options['blocking'])) {
break;
}
}
if ($alive) {
return $result;
}
fclose($fp);
if ($info['timed_out']) {
$result->code = HTTP_REQUEST_TIMEOUT;
$result->error = 'request timed out';
return _background_process_http_request_result($result);
}
list($response, $result->data) = preg_split("/\r\n\r\n|\n\n|\r\r/", $result->response, 2);
$response = preg_split("/\r\n|\n|\r/", $response);
// Parse the response status line.
list($protocol, $code, $status_message) = explode(' ', trim(array_shift($response)), 3);
$result->protocol = $protocol;
$result->status_message = $status_message;
$result->headers = [];
// Parse the response headers.
while ($line = trim(array_shift($response))) {
list($name, $value) = explode(':', $line, 2);
$name = strtolower($name);
if (isset($result->headers[$name]) && $name == 'set-cookie') {
$result->headers[$name] .= ',' . trim($value);
}
else {
$result->headers[$name] = trim($value);
}
}
$responses = [
100 => 'Continue',
101 => 'Switching Protocols',
200 => 'OK',
201 => 'Created',
202 => 'Accepted',
203 => 'Non-Authoritative Information',
204 => 'No Content',
205 => 'Reset Content',
206 => 'Partial Content',
300 => 'Multiple Choices',
301 => 'Moved Permanently',
302 => 'Found',
303 => 'See Other',
304 => 'Not Modified',
305 => 'Use Proxy',
307 => 'Temporary Redirect',
400 => 'Bad Request',
401 => 'Unauthorized',
402 => 'Payment Required',
403 => 'Forbidden',
404 => 'Not Found',
405 => 'Method Not Allowed',
406 => 'Not Acceptable',
407 => 'Proxy Authentication Required',
408 => 'Request Time-out',
409 => 'Conflict',
410 => 'Gone',
411 => 'Length Required',
412 => 'Precondition Failed',
413 => 'Request Entity Too Large',
414 => 'Request-URI Too Large',
415 => 'Unsupported Media Type',
416 => 'Requested range not satisfiable',
417 => 'Expectation Failed',
500 => 'Internal Server Error',
501 => 'Not Implemented',
502 => 'Bad Gateway',
503 => 'Service Unavailable',
504 => 'Gateway Time-out',
505 => 'HTTP Version not supported',
];
if (!isset($responses[$code])) {
$code = floor($code / 100) * 100;
}
$result->code = $code;
switch ($code) {
case 200:
case 304:
break;
case 301:
case 302:
case 307:
$location = $result->headers['location'];
$options['timeout'] -= Timer[$result]['read'] / 1000;
if ($options['timeout'] <= 0) {
$result->code = -1;
$result->error = 'request timed out';
}
elseif ($options['max_redirects']) {
// Redirect to the new location.
$options['max_redirects']--;
$result = background_process_http_request($location, $options);
if (empty($result->error)) {
background_process_http_request_get_response($result);
}
$result->redirect_code = $code;
}
if (!isset($result->redirect_url)) {
$result->redirect_url = $location;
}
break;
default:
$result->error = $status_message;
}
return _background_process_http_request_result($result);
}
/**
* Get request result.
*/
function _background_process_http_request_result($result) {
if (isset($result->code)) {
if (empty($result->end)) {
$result->end = microtime(TRUE);
}
if (!empty($result->options['callback']) && is_callable($result->options['callback'])) {
call_user_func($result->options['callback'], $result);
}
}
return $result;
}
/**
* Implements Process multiple http requests.
*/
function background_process_http_request_process(&$results, $options = []) {
$options += [
'timeout' => 30,
'interval' => 0.01,
'limit' => 0,
];
$interval = $options['interval'] * 1000000;
$expire = time() + $options['timeout'];
while ($results && time() < $expire) {
$cnt = 0;
$data_ready = FALSE;
foreach ($results as &$result) {
if (isset($result->code)) {
continue;
}
background_process_http_request_get_response($result);
$data_ready = $data_ready || $result->data_ready ? TRUE : FALSE;
$cnt++;
if ($options['limit'] && $cnt >= $options['limit']) {
break;
}
}
if (!$cnt) {
break;
}
if (!$data_ready) {
usleep($interval);
}
}
}
/**
* Implements to Determines the default service host.
*/
function background_process_determine_and_save_default_service_host() {
$host = background_process_determine_default_service_host();
if ($host) {
global $base_url;
drupal_set_message(t('Default service host determined at %base_url', [
'%base_url' => _background_process_secure_url($host['base_url']),
]));
if ($host['base_url'] === $base_url) {
\Drupal::config('background_process.settings')
->clear('background_process_derived_default_host')
->save();
}
else {
\Drupal::configFactory()
->getEditable('background_process.settings')
->set('background_process_derived_default_host', [
'default' => $host,
])
->save();
drupal_set_message(t('Default service host differs from base url (%base_url). If migrating database to other sites or environments, you will need to either run "Determine default service host" again, or configure the default service host manually through settings.php', [
'%base_url' => $base_url,
]), 'warning');
}
return TRUE;
}
else {
drupal_set_message(t('Could not determine default service host. Please configure background process in your settings.php'), 'error');
return FALSE;
}
}
/**
* Implements to Ensure lock is removed at end of request.
*/
function _background_process_ensure_cleanup($handle, $remove = FALSE) {
$handles =& drupal_static('background_process_handles_locked', NULL);
if (!isset($handles)) {
$handles = [];
drupal_register_shutdown_function('_background_process_cleanup_locks');
}
if ($remove) {
unset($handles[$handle]);
}
else {
$handles[$handle] = $handle;
}
}
/**
* Implements to Shutdown handler for removing locks.
*/
function _background_process_cleanup_locks() {
$handles =& drupal_static('background_process_handles_locked', NULL);
if (!empty($handles)) {
foreach ($handles as $handle) {
background_process_remove_process($handle);
}
}
}
/**
* Get string name of callback.
*/
function _background_process_callback_name($callback) {
if (is_array($callback)) {
if (is_object($callback[0])) {
$callback = get_class($callback[0]) . '->' . $callback[1];
}
else {
$callback = $callback[0] . '::' . $callback[1];
}
}
return $callback;
}
/**
* Get request headers.
*/
function _background_process_request_headers() {
foreach ($_SERVER as $key => $value) {
if (empty($value)) {
continue;
}
if (substr($key, 0, 5) == 'HTTP_') {
$key = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', substr($key, 5)))));
if (empty($key)) {
$headers[$key] = $value;
}
else {
$headers[$key] .= "; {$value}";
}
}
}
return $headers;
}
/**
* Remove headers we do not wish to pass on to the next request.
*/
function _background_process_filter_headers($headers) {
$result = [];
if (empty($headers)) {
return $result;
}
foreach ($headers as $key => $value) {
if (!preg_match('/^(Connection|Keep-Alive|Proxy-Authenticate|Proxy-Authorization|TE|Trailers|Transfer-Encoding|Upgrade|Set-Cookie|Content-Length|Host|Accept-Encoding)$/i', $key)) {
$result[$key] = $value;
}
}
return $result;
}
/**
* Secure a URL by obfuscating the password if present.
*/
function _background_process_secure_url($url) {
$url = parse_url($url);
if (!empty($url['pass'])) {
$url['pass'] = 'XXXXXXXX';
}
return _background_process_unparse_url($url);
}
/**
* Reverse logic of parse_url().
*/
function _background_process_unparse_url($parsed_url) {
$scheme = isset($parsed_url['scheme']) ? $parsed_url['scheme'] . '://' : '';
$host = isset($parsed_url['host']) ? $parsed_url['host'] : '';
$port = isset($parsed_url['port']) ? ':' . $parsed_url['port'] : '';
$user = isset($parsed_url['user']) ? $parsed_url['user'] : '';
$pass = isset($parsed_url['pass']) ? ':' . $parsed_url['pass'] : '';
$pass = $user || $pass ? "{$pass}@" : '';
$path = isset($parsed_url['path']) ? $parsed_url['path'] : '';
$query = isset($parsed_url['query']) ? '?' . $parsed_url['query'] : '';
$fragment = isset($parsed_url['fragment']) ? '#' . $parsed_url['fragment'] : '';
return "{$scheme}{$user}{$pass}{$host}{$port}{$path}{$query}{$fragment}";
}
/**
* Shutdown handler for restarting background process.
*/
function _background_process_restart($process, $args = []) {
$args = empty($args) ? $process->args : $args;
$new = BackgroundProcess::load($process);
$result = $new
->start($process->callback, $args);
return $result;
}
/**
* Get service hosts defined in the system.
*/
function background_process_get_service_hosts() {
global $base_url;
$service_hosts = \Drupal::config('background_process.settings')
->get('background_process_service_hosts');
$service_hosts += \Drupal::config('background_process.settings')
->get('background_process_derived_default_host');
return $service_hosts;
}
Functions
Name | Description |
---|---|
background_process_build_headers | Implements to Transform header array from key/value to strings. |
background_process_build_request | Implements to Build url and headers for http request. |
background_process_cron | Implements hook_cron(). |
background_process_cronapi | Implements hook_cronapi(). |
background_process_cron_alter | Implements hook_cron_alter(). |
background_process_cron_queue_info | Implements hook_cron_queue_info(). |
background_process_current_handle | Implements to Get/set current handle. |
background_process_determine_and_save_default_service_host | Implements to Determines the default service host. |
background_process_determine_default_service_host | Implements to Determine host for current installation. |
background_process_generate_handle | Implements to Get a unique handle based on a callback. |
background_process_get_process | Get background process. |
background_process_get_processes | Get background process. |
background_process_get_service_groups | Implements to Get service hosts defined in the system. |
background_process_get_service_hosts | Get service hosts defined in the system. |
background_process_http_request | Implements to Perform an http request. |
background_process_http_request_get_response | Get response for an http request. |
background_process_http_request_initiate | Initiate the http request. |
background_process_http_request_process | Implements Process multiple http requests. |
background_process_init | Implements hook_init(). |
background_process_is_finished | Implements to Check if the background process has finished. |
background_process_is_started | Implements to Check if the background process has started. |
background_process_keepalive | Implements to Keep process alive. |
background_process_lock_process | Implements to Lock process. |
background_process_permission | Implements hook_permission(). |
background_process_queue | Implements Queue the function call passing function arguments. |
background_process_queue_locked | Implements to Queue locked background process. |
background_process_remove_process | Implements to Remove a background process. |
background_process_restart | Implements to Restart the current background process. |
background_process_service_access | Implements to Access handler for service call. |
background_process_service_execute | Implements to Execute the service. |
background_process_service_group | Implements hook_service_group(). |
background_process_service_group_random | Implements to Load balancing based on random pick. |
background_process_service_group_round_robin | Implements for Round-robin load balancing based on random pick. |
background_process_service_start | Implements to Call the function requested by the service call. |
background_process_set_process | Implements to Set background process. |
background_process_set_service_host | Implements to Set a service host for a background process. |
background_process_start | Implements to Start background process. |
background_process_start_locked | Implements to Start locked background process. |
background_process_unlock | Unlock background process. |
background_process_update_status | Implements to Set status for background process. |
_background_process_callback_name | Get string name of callback. |
_background_process_cleanup_locks | Implements to Shutdown handler for removing locks. |
_background_process_cleanup_menu | Implements to Cleanup cache menu and ensure all locks are released (again). |
_background_process_ensure_cleanup | Implements to Ensure lock is removed at end of request. |
_background_process_filter_headers | Remove headers we do not wish to pass on to the next request. |
_background_process_http_request_result | Get request result. |
_background_process_queue | Worker callback for processing queued function call. |
_background_process_request_headers | Get request headers. |
_background_process_restart | Shutdown handler for restarting background process. |
_background_process_secure_url | Secure a URL by obfuscating the password if present. |
_background_process_unparse_url | Reverse logic of parse_url(). |