View source
<?php
include 'BackgroundProcess.class.php';
define('BACKGROUND_PROCESS_SERVICE_TIMEOUT', 0);
define('BACKGROUND_PROCESS_CONNECTION_TIMEOUT', 2);
define('BACKGROUND_PROCESS_STREAM_TIMEOUT', 2);
define('BACKGROUND_PROCESS_CLEANUP_AGE', 120);
define('BACKGROUND_PROCESS_CLEANUP_AGE_RUNNING', 8 * 3600);
define('BACKGROUND_PROCESS_CLEANUP_AGE_QUEUE', 86400);
define('BACKGROUND_PROCESS_REDISPATCH_THRESHOLD', 10);
define('BACKGROUND_PROCESS_USE_DOUBLE_ENCODING', TRUE);
define('BACKGROUND_PROCESS_STATUS_NONE', 0);
define('BACKGROUND_PROCESS_STATUS_LOCKED', 1);
define('BACKGROUND_PROCESS_STATUS_RUNNING', 2);
define('BACKGROUND_PROCESS_STATUS_QUEUED', 3);
if (version_compare(VERSION, '6.23', '<')) {
define('HTTP_REQUEST_TIMEOUT', -1);
}
if (!defined('DRUPAL_ROOT')) {
define('DRUPAL_ROOT', getcwd());
}
register_shutdown_function('_background_process_cwdfix');
function background_process_menu() {
$items = array();
$items['bgp-start/%/%'] = array(
'type' => MENU_CALLBACK,
'title' => 'Run background process',
'description' => 'Run background process',
'page callback' => 'background_process_service_start',
'page arguments' => array(
1,
),
'access callback' => 'background_process_service_access',
'access arguments' => array(
1,
2,
),
);
$items['background-process/unlock/%'] = array(
'type' => MENU_CALLBACK,
'title' => 'Unlock background process',
'description' => 'Unlock background process',
'page callback' => 'background_process_service_unlock',
'page arguments' => array(
2,
),
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
);
$items['background-process/check-token'] = array(
'type' => MENU_CALLBACK,
'title' => 'Check background process token',
'description' => 'Check background process token',
'page callback' => 'background_process_check_token',
'page arguments' => array(
2,
),
'access callback' => TRUE,
'file' => 'background_process.pages.inc',
);
$items['admin/settings/background-process/settings'] = array(
'type' => MENU_DEFAULT_LOCAL_TASK,
'title' => 'Settings',
'weight' => 1,
);
$items['admin/settings/background-process'] = array(
'title' => 'Background process',
'description' => 'Administer background processes',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'background_process_settings_form',
),
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
);
$items['admin/settings/background-process/overview'] = array(
'type' => MENU_LOCAL_TASK,
'title' => 'Overview',
'description' => 'Administer background processes',
'page callback' => 'background_process_overview_page',
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
'weight' => 3,
);
return $items;
}
function background_process_perm() {
return array(
'administer background process',
);
}
function background_process_cron() {
$expire = 120;
@set_time_limit($expire);
$time = time();
$msg = t('Never started (auto unlock due to timeout)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$result = db_query_range("SELECT handle, start_stamp \n FROM {background_process}\n WHERE start_stamp < '%d'\n AND exec_status = %d", $time - variable_get('background_process_cleanup_age', BACKGROUND_PROCESS_CLEANUP_AGE), BACKGROUND_PROCESS_STATUS_LOCKED, 0, 10);
$handles = array();
while ($row = db_fetch_array($result)) {
$handles[$row['handle']] = $row;
}
foreach ($handles as $handle => $process) {
if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
drupal_set_message(t("%handle unlocked: !msg", array(
'%handle' => $handle,
'!msg' => $msg,
)));
}
else {
drupal_set_message(t("%handle could not be unlocked: !msg", array(
'%handle' => $handle,
'!msg' => $msg,
)), 'error');
}
}
} while (!empty($handles));
$time = time();
$msg = t('Never finished (auto unlock due to long run)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$result = db_query_range("SELECT handle, start_stamp\n FROM {background_process}\n WHERE start_stamp < '%d'\n AND exec_status = %d", $time - variable_get('background_process_cleanup_age_running', BACKGROUND_PROCESS_CLEANUP_AGE_RUNNING), BACKGROUND_PROCESS_STATUS_RUNNING, 0, 10);
$handles = array();
while ($row = db_fetch_array($result)) {
$handles[$row['handle']] = $row;
}
foreach ($handles as $handle => $process) {
if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
drupal_set_message(t("%handle unlocked: !msg", array(
'%handle' => $handle,
'!msg' => $msg,
)));
}
else {
drupal_set_message(t("%handle could not be unlocked: !msg", array(
'%handle' => $handle,
'!msg' => $msg,
)), 'error');
}
}
} while (!empty($results));
$time = time();
$msg = t('Never picked up by cron worker (auto unlock due to timeout)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$result = db_query_range("SELECT handle, start_stamp\n FROM {background_process}\n WHERE start_stamp < '%d'\n AND exec_status = %d", $time - variable_get('background_process_cleanup_age_queue', BACKGROUND_PROCESS_CLEANUP_AGE_QUEUE), BACKGROUND_PROCESS_STATUS_QUEUED, 0, 10);
$handles = array();
while ($row = db_fetch_array($result)) {
$handles[$row['handle']] = $row;
}
foreach ($handles as $handle => $process) {
if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
drupal_set_message(t("%handle unlocked: !msg", array(
'%handle' => $handle,
'!msg' => $msg,
)));
}
else {
drupal_set_message(t("%handle could not be unlocked: !msg", array(
'%handle' => $handle,
'!msg' => $msg,
)), 'error');
}
}
} while (!empty($results));
}
function background_process_cron_alter(&$items) {
$items['background_process_cron']['override_congestion_protection'] = TRUE;
if ($process = background_process_get_process('uc:background_process_cron')) {
if ($process->start + 30 < time()) {
background_process_unlock($process->handle, t('Self unlocking stale lock'), $process->start);
}
}
}
function background_process_cronapi($op, $job = NULL) {
switch ($op) {
case 'list':
return array(
'background_process_cron' => t('Cleanup old process handles'),
);
case 'rule':
return '* * * * *';
case 'configure':
return 'admin/settings/background-process';
}
}
function background_process_service_group() {
$info = array();
$info['methods']['background_process_service_group_random'] = t('Random');
$info['methods']['background_process_service_group_round_robin'] = t('Pseudo round-robin');
return $info;
}
function background_process_service_group_random($service_group) {
return $service_group['hosts'][rand(0, count($service_group['hosts']) - 1)];
}
function background_process_service_group_round_robin($service_group) {
static $idx = NULL;
if (isset($idx)) {
$idx = ($idx + 1) % count($service_group['hosts']);
}
else {
$idx = rand(0, count($service_group['hosts']) - 1);
}
return $service_group['hosts'][$idx];
}
function background_process_service_access($handle, $token) {
$handle = rawurldecode($handle);
$token = rawurldecode($token);
session_save_session(FALSE);
unset($_SESSION);
$process = background_process_get_process($handle);
if (!$process) {
watchdog('bg_process', 'Unknown process: %handle', array(
'%handle' => $handle,
));
return FALSE;
}
if ($token !== $process->token) {
watchdog('bg_process', 'Invalid token: %token for handle: %handle', array(
'%token' => $token,
'%handle' => $handle,
));
return FALSE;
}
if ($process->uid) {
global $user;
$user = user_load($process->uid);
if (!$user) {
return FALSE;
}
}
else {
$user = drupal_anonymous_user();
}
return TRUE;
}
function background_process_init() {
if (empty($_SESSION['background_process_determine_default_service_host'])) {
return;
}
if (strpos($_GET['q'], 'background-process/check-token') === 0) {
return;
}
if (variable_get('install_task', FALSE) != 'done') {
return;
}
background_process_determine_and_save_default_service_host();
unset($_SESSION['background_process_determine_default_service_host']);
}
function background_process_cron_queue_info() {
$queues['background_process'] = array(
'worker callback' => '_background_process_queue',
);
$background_process_queues = variable_get('background_process_queues', array());
foreach ($background_process_queues as $queue_name) {
$queues['bgp:' . $queue_name] = array(
'worker callback' => '_background_process_queue',
);
}
return $queues;
}
function _background_process_queue($item) {
$oldhandle = background_process_current_handle();
list($handle, $token) = $item;
if (background_process_service_access($handle, $token)) {
try {
background_process_service_execute(rawurldecode($handle), TRUE);
background_process_current_handle($oldhandle);
} catch (Exception $e) {
background_process_current_handle($oldhandle);
background_process_update_status(rawurldecode($handle), BACKGROUND_PROCESS_STATUS_QUEUED);
throw $e;
}
}
}
function background_process_current_handle($handle = NULL) {
static $current_handle = NULL;
if (isset($handle)) {
$current_handle = $handle;
}
return $current_handle;
}
function background_process_generate_handle($callback) {
return md5(serialize($callback) . ':' . microtime(TRUE) . ':' . rand(1, 5000));
}
function background_process_start($callback) {
$process = new BackgroundProcess();
$args = func_get_args();
array_splice($args, 0, 1);
$result = $process
->start($callback, $args);
return $result ? $process->handle : $result;
}
function background_process_start_locked($handle, $callback) {
$process = new BackgroundProcess($handle);
$args = func_get_args();
array_splice($args, 0, 2);
$result = $process
->start($callback, $args);
return $result ? $process->handle : $result;
}
function background_process_queue($callback) {
$process = new BackgroundProcess();
$args = func_get_args();
array_splice($args, 0, 1);
return $process
->queue($callback, $args);
}
function background_process_queue_locked($handle, $callback) {
$process = new BackgroundProcess($handle);
$args = func_get_args();
array_splice($args, 0, 2);
return $process
->queue($callback, $args);
}
function background_process_service_start($handle, $return = FALSE) {
drupal_set_header('Content-Type: text/plain');
ignore_user_abort(TRUE);
@set_time_limit(variable_get('background_process_service_timeout', BACKGROUND_PROCESS_SERVICE_TIMEOUT));
$handle = rawurldecode($handle);
return background_process_service_execute($handle, $return);
}
function background_process_service_execute($handle, $return = FALSE) {
$process = background_process_get_process($handle);
if (!$process) {
watchdog('bg_process', 'Process not found for handle: %handle', array(
'%handle' => $handle,
), WATCHDOG_ERROR);
if ($return) {
return;
}
else {
exit;
}
}
$process->start_stamp = microtime(TRUE);
db_query("UPDATE {background_process} SET start_stamp = '%s', exec_status = %d WHERE handle = '%s' AND exec_status IN (%d, %d)", sprintf("%.06f", $process->start_stamp), BACKGROUND_PROCESS_STATUS_RUNNING, $handle, BACKGROUND_PROCESS_STATUS_LOCKED, BACKGROUND_PROCESS_STATUS_QUEUED);
$claimed = db_affected_rows();
if ($claimed) {
$process->exec_status = BACKGROUND_PROCESS_STATUS_RUNNING;
$process = BackgroundProcess::load($process);
background_process_current_handle($handle);
}
else {
if ($return) {
return;
}
else {
exit;
}
}
if (!$return) {
register_shutdown_function('background_process_remove_process', $process->handle, $process->start_stamp);
}
if (is_callable($process->callback)) {
try {
if (!$return) {
register_shutdown_function('module_invoke_all', 'background_process_shutdown', $process);
}
$callback = _background_process_callback_name($process->callback);
progress_initialize_progress($handle, "Background process '{$callback}' initialized");
call_user_func_array($process->callback, $process->args);
progress_end_progress($handle, "Background process '{$callback}' finished");
if ($return) {
background_process_remove_process($process->handle, $process->start_stamp);
module_invoke_all('background_process_shutdown', $process);
}
} catch (Exception $e) {
if (!$return) {
module_invoke_all('background_process_shutdown', $process, (string) $e);
}
throw $e;
}
}
else {
watchdog('bg_process', 'Callback: %callback not found', array(
'%callback' => $process->callback,
), WATCHDOG_ERROR);
}
if ($return) {
return;
}
else {
exit;
}
}
function background_process_restart() {
$args = func_get_args();
call_user_func_array('background_process_keepalive', $args);
exit;
}
function background_process_keepalive() {
$args = func_get_args();
$handle = background_process_current_handle();
if (!$handle) {
throw new Exception(t('Background process handle %handle not found', array(
'%handle' => $handle,
)));
}
$process = background_process_get_process($handle);
if (!$process) {
throw new Exception(t('Background process %handle not found', array(
'%handle' => $handle,
)));
}
register_shutdown_function('_background_process_restart', $process, $args);
}
function background_process_is_started($handle) {
$progress = progress_get_progress($handle);
return !empty($progress);
}
function background_process_is_finished($handle) {
$progress = progress_get_progress($handle);
return empty($progress) || $progress->end;
}
function background_process_set_process($handle, $callback, $uid, $args, $token) {
$args = serialize($args);
$callback = serialize($callback);
if (!isset($uid)) {
global $user;
$uid = $user->uid;
}
db_query("UPDATE {background_process} SET callback = '%s', args = '%s', uid = %d, token = '%s' WHERE handle = '%s'", $callback, $args, $uid, $token, $handle);
$result = db_affected_rows();
return $result;
}
function background_process_lock_process($handle, $status = BACKGROUND_PROCESS_STATUS_LOCKED) {
if (!@db_query("INSERT INTO {background_process} (handle, start_stamp, exec_status) VALUES('%s', '%s', %d)", $handle, sprintf("%.06f", microtime(TRUE)), $status)) {
return FALSE;
}
else {
_background_process_ensure_cleanup($handle);
return TRUE;
}
}
function background_process_update_status($handle, $status) {
return db_query("UPDATE {background_process} SET exec_status = %d WHERE handle = '%s'", $status, $handle);
}
function background_process_get_process($handle) {
if ($result = db_fetch_object(db_query("SELECT handle, callback, args, uid, token, service_host, start_stamp, exec_status FROM {background_process} WHERE handle = '%s'", $handle))) {
$result->args = unserialize($result->args);
$result->callback = unserialize($result->callback);
$result->start = $result->start_stamp;
$result->status = $result->exec_status;
}
return $result;
}
function background_process_get_processes($status = NULL) {
if (isset($status)) {
$result = db_query("SELECT handle, callback, args, uid, token, service_host, start_stamp, exec_status FROM {background_process} WHERE exec_status = %d", $status);
}
else {
$result = db_query("SELECT handle, callback, args, uid, token, service_host, start_stamp, exec_status FROM {background_process}");
}
$processes = array();
while ($process = db_fetch_object($result)) {
$process->args = unserialize($process->args);
$process->callback = unserialize($process->callback);
$process->start = $process->start_stamp;
$process->status = $process->exec_status;
$processes[] = $process;
}
return $processes;
}
function background_process_remove_process($handle, $start = NULL) {
if (isset($start)) {
$result = db_query("DELETE FROM {background_process} WHERE handle = '%s' AND start_stamp LIKE '%s'", $handle, sprintf("%.06f", $start));
}
else {
$result = db_query("DELETE FROM {background_process} WHERE handle = '%s'", $handle);
}
if ($result) {
$result = db_affected_rows();
}
return $result;
}
function background_process_unlock($handle, $msg = NULL, $start = NULL) {
$process = background_process_get_process($handle);
if ($process && (!isset($start) || $start === $process->start)) {
if (background_process_remove_process($process->handle, $process->start)) {
global $user;
module_invoke_all('background_process_shutdown', $process, $msg ? $msg : t('Manually unlocked by !name', array(
'!name' => $user->name,
)));
return TRUE;
}
}
return FALSE;
}
function background_process_set_service_host($handle, $service_host) {
$result = db_query("UPDATE {background_process} SET service_host = '%s' WHERE handle = '%s'", $service_host ? $service_host : '', $handle);
return $result;
}
function background_process_get_service_hosts() {
global $base_url;
$service_hosts = variable_get('background_process_service_hosts', array());
$service_hosts += variable_get('background_process_derived_default_host', array(
'default' => array(
'base_url' => $base_url,
),
));
return $service_hosts;
}
function background_process_get_service_groups() {
$service_groups = variable_get('background_process_service_groups', array());
$service_groups += array(
'default' => array(
'hosts' => array(
variable_get('background_process_default_service_host', 'default'),
),
),
);
foreach ($service_groups as &$service_group) {
$service_group += array(
'method' => 'background_process_service_group_round_robin',
);
}
return $service_groups;
}
function background_process_determine_default_service_host() {
$token = md5(session_id() . md5(uniqid(mt_rand(), TRUE)) . md5(uniqid(mt_rand(), TRUE)));
variable_set('background_process_token', $token);
global $conf;
$auth = isset($_SERVER['PHP_AUTH_USER']) ? $_SERVER['PHP_AUTH_USER'] . ':' . $_SERVER['PHP_AUTH_PW'] . '@' : '';
$scheme = isset($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://';
global $base_url;
$url = parse_url($base_url);
$path = empty($url['path']) ? '' : $url['path'];
$candidates = array(
array(
'base_url' => $base_url,
),
array(
'base_url' => $scheme . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
),
array(
'base_url' => $scheme . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
),
array(
'base_url' => $scheme . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
),
array(
'base_url' => $scheme . $auth . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
),
array(
'base_url' => $scheme . $auth . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
),
array(
'base_url' => $scheme . $auth . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path,
'http_host' => $_SERVER['HTTP_HOST'],
),
);
$found = NULL;
foreach ($candidates as $i => $candidate) {
$conf['background_process_service_hosts']['__test'] = $candidate;
list($url, $headers) = background_process_build_request('background-process/check-token/' . rawurlencode(rawurlencode(':encode_detector')), '__test');
if (empty($results[$url])) {
$results[$url] = background_process_http_request($url, array(
'headers' => $headers,
'postpone' => TRUE,
'candidate' => $i,
'method' => 'POST',
));
}
}
background_process_http_request_process($results);
foreach ($results as $result) {
if ($result->code == 200) {
if ($result->data && ($data = unserialize($result->data))) {
if ($token === $data['token']) {
$found = $candidates[$result->options['candidate']];
$use_double_encoding = rawurldecode($data['encode_detector']) == ':encode_detector' ? TRUE : FALSE;
variable_set('background_process_use_double_encoding', $use_double_encoding);
break;
}
}
}
}
if ($found) {
return $found;
}
return FALSE;
}
function background_process_build_request($url, $service_hostname = NULL, $options = array()) {
$service_hosts = background_process_get_service_hosts();
if (!$service_hostname || empty($service_hosts[$service_hostname])) {
$service_hostname = 'default';
}
$service_host = $service_hosts[$service_hostname];
$options += array(
'absolute' => TRUE,
'base_url' => $service_host['base_url'],
);
$url = url($url, $options);
$parsed = parse_url($url);
$host = !empty($service_host['http_host']) ? $service_host['http_host'] : (isset($parsed['host']) ? $parsed['host'] : NULL);
$headers = _background_process_request_headers();
$headers = _background_process_filter_headers($headers);
$headers['Host'] = $host;
$headers['Connection'] = 'close';
if (isset($parsed['user'])) {
$headers['Authorization'] = 'Basic ' . base64_encode($parsed['user'] . ':' . $parsed['pass']);
}
return array(
$url,
$headers,
);
}
function background_process_build_headers($headers) {
$header = array();
foreach ($headers as $key => $value) {
$header[] = "{$key}: {$value}";
}
return $header;
}
function background_process_http_request($url, array $options = array()) {
$result = new stdClass();
$result->url = $url;
$result->options = $options;
$result->code = NULL;
$uri = @parse_url($url);
$result->uri = $uri;
if ($uri == FALSE) {
$result->error = 'unable to parse URL';
$result->code = -1001;
return _background_process_http_request_result($result);
}
if (!isset($uri['scheme'])) {
$result->error = 'missing schema';
$result->code = -1002;
return _background_process_http_request_result($result);
}
$options += array(
'headers' => array(),
'method' => 'GET',
'data' => NULL,
'max_redirects' => 3,
'timeout' => variable_get('background_process_connection_timeout', BACKGROUND_PROCESS_CONNECTION_TIMEOUT),
'context' => NULL,
'blocking' => FALSE,
'postpone' => FALSE,
);
$options['timeout'] = (double) $options['timeout'];
$host = NULL;
switch ($uri['scheme']) {
case 'http':
case 'feed':
$port = isset($uri['port']) ? $uri['port'] : 80;
$socket = 'tcp://' . $uri['host'] . ':' . $port;
$host = $uri['host'] . ($port != 80 ? ':' . $port : '');
break;
case 'https':
$port = isset($uri['port']) ? $uri['port'] : 443;
$socket = 'ssl://' . $uri['host'] . ':' . $port;
$host = $uri['host'] . ($port != 443 ? ':' . $port : '');
break;
default:
$result->error = 'invalid schema ' . $uri['scheme'];
$result->code = -1003;
return _background_process_http_request_result($result);
}
if (!empty($host) && empty($options['headers']['Host'])) {
$options['headers']['Host'] = $host;
}
$result->options = $options;
$result->socket = $socket;
$result->postponed = $options['postpone'];
if ($result->postponed) {
return $result;
}
else {
return background_process_http_request_initiate($result);
}
}
function background_process_http_request_initiate(&$result) {
timer_start(__FUNCTION__);
$options = $result->options;
$socket = $result->socket;
$uri = $result->uri;
$result->start = microtime(TRUE);
$result->data_ready = TRUE;
if (empty($options['context'])) {
$fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout']);
}
else {
$fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout'], STREAM_CLIENT_CONNECT, $options['context']);
}
if (!$fp) {
$result->code = -$errno;
$result->error = trim($errstr) ? trim($errstr) : t('Error opening socket @socket', array(
'@socket' => $socket,
));
return _background_process_http_request_result($result);
}
$result->fp = $fp;
$path = isset($uri['path']) ? $uri['path'] : '/';
if (isset($uri['query'])) {
$path .= '?' . $uri['query'];
}
$options['headers'] += array(
'User-Agent' => 'Drupal (+http://drupal.org/)',
);
$content_length = strlen($options['data']);
if ($content_length > 0 || $options['method'] == 'POST' || $options['method'] == 'PUT') {
$options['headers']['Content-Length'] = $content_length;
}
if (isset($uri['user'])) {
$options['headers']['Authorization'] = 'Basic ' . base64_encode($uri['user'] . (isset($uri['pass']) ? ':' . $uri['pass'] : ''));
}
$request = $options['method'] . ' ' . $path . " HTTP/1.0\r\n";
foreach ($options['headers'] as $name => $value) {
$request .= $name . ': ' . trim($value) . "\r\n";
}
$request .= "\r\n" . $options['data'];
$result->request = $request;
$timeout = $options['timeout'] - timer_read(__FUNCTION__) / 1000;
if ($timeout > 0) {
stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1)));
fwrite($fp, $request);
stream_set_blocking($fp, 0);
}
if (!empty($options['blocking'])) {
return background_process_http_request_get_response($result);
}
return $result;
}
function background_process_http_request_get_response(&$result) {
if ($result->postponed) {
$result->postponed = FALSE;
return background_process_http_request_initiate($result);
}
if (isset($result->code)) {
return $result;
}
$fp = $result->fp;
$options = $result->options;
timer_start(__FUNCTION__);
if (!empty($options['blocking'])) {
stream_set_blocking($fp, 1);
}
$info = stream_get_meta_data($fp);
$alive = !$info['eof'] && !$info['timed_out'];
while ($alive) {
$timeout = $options['timeout'] - timer_read(__FUNCTION__) / 1000;
if ($timeout <= 0) {
$info['timed_out'] = TRUE;
break;
}
stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1)));
$chunk = fread($fp, 1024);
$result->response .= $chunk;
$result->data_ready = empty($chunk) ? FALSE : TRUE;
$info = stream_get_meta_data($fp);
$alive = !$info['eof'] && !$info['timed_out'];
if (empty($options['blocking'])) {
break;
}
}
if ($alive) {
return $result;
}
fclose($fp);
if ($info['timed_out']) {
$result->code = HTTP_REQUEST_TIMEOUT;
$result->error = 'request timed out';
return _background_process_http_request_result($result);
}
list($response, $result->data) = preg_split("/\r\n\r\n|\n\n|\r\r/", $result->response, 2);
$response = preg_split("/\r\n|\n|\r/", $response);
list($protocol, $code, $status_message) = explode(' ', trim(array_shift($response)), 3);
$result->protocol = $protocol;
$result->status_message = $status_message;
$result->headers = array();
while ($line = trim(array_shift($response))) {
list($name, $value) = explode(':', $line, 2);
$name = strtolower($name);
if (isset($result->headers[$name]) && $name == 'set-cookie') {
$result->headers[$name] .= ',' . trim($value);
}
else {
$result->headers[$name] = trim($value);
}
}
$responses = array(
100 => 'Continue',
101 => 'Switching Protocols',
200 => 'OK',
201 => 'Created',
202 => 'Accepted',
203 => 'Non-Authoritative Information',
204 => 'No Content',
205 => 'Reset Content',
206 => 'Partial Content',
300 => 'Multiple Choices',
301 => 'Moved Permanently',
302 => 'Found',
303 => 'See Other',
304 => 'Not Modified',
305 => 'Use Proxy',
307 => 'Temporary Redirect',
400 => 'Bad Request',
401 => 'Unauthorized',
402 => 'Payment Required',
403 => 'Forbidden',
404 => 'Not Found',
405 => 'Method Not Allowed',
406 => 'Not Acceptable',
407 => 'Proxy Authentication Required',
408 => 'Request Time-out',
409 => 'Conflict',
410 => 'Gone',
411 => 'Length Required',
412 => 'Precondition Failed',
413 => 'Request Entity Too Large',
414 => 'Request-URI Too Large',
415 => 'Unsupported Media Type',
416 => 'Requested range not satisfiable',
417 => 'Expectation Failed',
500 => 'Internal Server Error',
501 => 'Not Implemented',
502 => 'Bad Gateway',
503 => 'Service Unavailable',
504 => 'Gateway Time-out',
505 => 'HTTP Version not supported',
);
if (!isset($responses[$code])) {
$code = floor($code / 100) * 100;
}
$result->code = $code;
switch ($code) {
case 200:
case 304:
break;
case 301:
case 302:
case 307:
$location = $result->headers['location'];
$options['timeout'] -= timer_read(__FUNCTION__) / 1000;
if ($options['timeout'] <= 0) {
$result->code = -1;
$result->error = 'request timed out';
}
elseif ($options['max_redirects']) {
$options['max_redirects']--;
$result = background_process_http_request($location, $options);
if (empty($result->error)) {
background_process_http_request_get_response($result);
}
$result->redirect_code = $code;
}
if (!isset($result->redirect_url)) {
$result->redirect_url = $location;
}
break;
default:
$result->error = $status_message;
}
return _background_process_http_request_result($result);
}
function _background_process_http_request_result($result) {
if (isset($result->code)) {
if (empty($result->end)) {
$result->end = microtime(TRUE);
}
if (!empty($result->options['callback']) && is_callable($result->options['callback'])) {
call_user_func($result->options['callback'], $result);
}
}
return $result;
}
function background_process_http_request_process(&$results, $options = array()) {
$options += array(
'timeout' => 30,
'interval' => 0.01,
'limit' => 0,
);
$interval = $options['interval'] * 1000000;
$expire = time() + $options['timeout'];
while ($results && time() < $expire) {
$cnt = 0;
$data_ready = FALSE;
foreach ($results as $i => &$result) {
if (isset($result->code)) {
continue;
}
background_process_http_request_get_response($result);
$data_ready = $data_ready || $result->data_ready ? TRUE : FALSE;
$cnt++;
if ($options['limit'] && $cnt >= $options['limit']) {
break;
}
}
if (!$cnt) {
break;
}
if (!$data_ready) {
usleep($interval);
}
}
}
function background_process_determine_and_save_default_service_host() {
$host = background_process_determine_default_service_host();
if ($host) {
global $base_url;
drupal_set_message(t('Default service host determined at %base_url', array(
'%base_url' => _background_process_secure_url($host['base_url']),
)));
if ($host['base_url'] === $base_url) {
variable_del('background_process_derived_default_host');
}
else {
variable_set('background_process_derived_default_host', array(
'default' => $host,
));
drupal_set_message(t('Default service host differs from base url (%base_url). If migrating database to other sites or environments, you will need to either run "Determine default service host" again, or configure the default service host manually through settings.php', array(
'%base_url' => $base_url,
)), 'warning');
}
return TRUE;
}
else {
drupal_set_message(t('Could not determine default service host. Please configure background process in your settings.php'), 'error');
return FALSE;
}
}
function _background_process_ensure_cleanup($handle, $remove = FALSE) {
$handles =& background_process_static('background_process_handles_locked', NULL);
if (!isset($handles)) {
$handles = array();
register_shutdown_function('_background_process_cleanup_locks');
}
if ($remove) {
unset($handles[$handle]);
}
else {
$handles[$handle] = $handle;
}
}
function _background_process_cleanup_locks() {
$handles =& background_process_static('background_process_handles_locked', NULL);
if (!empty($handles)) {
foreach ($handles as $handle) {
background_process_remove_process($handle);
}
}
}
function _background_process_callback_name($callback) {
if (is_array($callback)) {
if (is_object($callback[0])) {
$callback = get_class($callback[0]) . '->' . $callback[1];
}
else {
$callback = $callback[0] . '::' . $callback[1];
}
}
return $callback;
}
function _background_process_request_headers() {
foreach ($_SERVER as $key => $value) {
if (substr($key, 0, 5) == 'HTTP_') {
$key = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', substr($key, 5)))));
if (empty($headers[$key])) {
$headers[$key] = $value;
}
else {
$headers[$key] .= "; {$value}";
}
}
}
return $headers;
}
function _background_process_filter_headers($headers) {
$result = array();
if (empty($headers)) {
return $result;
}
foreach ($headers as $key => $value) {
if (!preg_match('/^(Connection|Keep-Alive|Proxy-Authenticate|Proxy-Authorization|TE|Trailers|Transfer-Encoding|Upgrade|Set-Cookie|Content-Length|Host|Accept-Encoding)$/i', $key)) {
$result[$key] = $value;
}
}
return $result;
}
function _background_process_secure_url($url) {
$url = parse_url($url);
if (!empty($url['pass'])) {
$url['pass'] = 'XXXXXXXX';
}
return _background_process_unparse_url($url);
}
function _background_process_unparse_url($parsed_url) {
$scheme = isset($parsed_url['scheme']) ? $parsed_url['scheme'] . '://' : '';
$host = isset($parsed_url['host']) ? $parsed_url['host'] : '';
$port = isset($parsed_url['port']) ? ':' . $parsed_url['port'] : '';
$user = isset($parsed_url['user']) ? $parsed_url['user'] : '';
$pass = isset($parsed_url['pass']) ? ':' . $parsed_url['pass'] : '';
$pass = $user || $pass ? "{$pass}@" : '';
$path = isset($parsed_url['path']) ? $parsed_url['path'] : '';
$query = isset($parsed_url['query']) ? '?' . $parsed_url['query'] : '';
$fragment = isset($parsed_url['fragment']) ? '#' . $parsed_url['fragment'] : '';
return "{$scheme}{$user}{$pass}{$host}{$port}{$path}{$query}{$fragment}";
}
function _background_process_restart($process, $args = array()) {
$args = empty($args) ? $process->args : $args;
$new = BackgroundProcess::load($process);
$result = $new
->start($process->callback, $args);
}
function _background_process_cwdfix() {
chdir(DRUPAL_ROOT);
}
function &background_process_static($name, $default_value = NULL, $reset = FALSE) {
static $data = array(), $default = array();
if (isset($data[$name]) || array_key_exists($name, $data)) {
if ($reset) {
$data[$name] = $default[$name];
}
return $data[$name];
}
if (isset($name)) {
if ($reset) {
return $data;
}
$default[$name] = $data[$name] = $default_value;
return $data[$name];
}
foreach ($default as $name => $value) {
$data[$name] = $value;
}
return $data;
}