View source
<?php
include 'background_process.inc';
include 'background_process.http.inc';
define('BACKGROUND_PROCESS_SERVICE_TIMEOUT', 0);
define('BACKGROUND_PROCESS_CONNECTION_TIMEOUT', 2);
define('BACKGROUND_PROCESS_STREAM_TIMEOUT', 2);
define('BACKGROUND_PROCESS_CLEANUP_AGE', 120);
define('BACKGROUND_PROCESS_RUNNING_CLEANUP_AGE', 8 * 3600);
define('BACKGROUND_PROCESS_QUEUE_CLEANUP_AGE', 86400);
define('BACKGROUND_PROCESS_RESULT_CLEANUP_AGE', 3600);
define('BACKGROUND_PROCESS_DRUSH_CMD', '/usr/local/bin/drush');
define('BACKGROUND_PROCESS_QUEUE_NAME', 'background_process');
define('BACKGROUND_PROCESS_STATUS_NONE', 0);
define('BACKGROUND_PROCESS_STATUS_LOCKED', 1);
define('BACKGROUND_PROCESS_STATUS_RUNNING', 2);
define('BACKGROUND_PROCESS_STATUS_QUEUED', 3);
define('BACKGROUND_PROCESS_STATUS_DISPATCHED', 4);
function background_process_menu() {
$items = array();
$items['bgp-start/%'] = array(
'type' => MENU_CALLBACK,
'title' => 'Run background process',
'description' => 'Run background process',
'page callback' => 'background_process_service_start',
'page arguments' => array(
1,
),
'access callback' => 'background_process_service_access',
'access arguments' => array(
1,
),
);
$items['background-process/unlock/%background_process'] = array(
'type' => MENU_CALLBACK,
'title' => 'Unlock background process',
'description' => 'Unlock background process',
'page callback' => 'background_process_service_unlock',
'page arguments' => array(
2,
),
'access arguments' => array(
'administer background process',
),
);
$items['background-process/check-token'] = array(
'type' => MENU_CALLBACK,
'title' => 'Check background process token',
'description' => 'Check background process token',
'page callback' => 'background_process_check_token',
'page arguments' => array(
2,
),
'access callback' => TRUE,
'file' => 'background_process.pages.inc',
);
$items['admin/config/system/background-process/settings'] = array(
'type' => MENU_DEFAULT_LOCAL_TASK,
'title' => 'Settings',
'weight' => 1,
);
$items['admin/config/system/background-process'] = array(
'title' => 'Background process',
'description' => 'Administer background processes',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'background_process_settings_form',
),
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
);
$items['admin/config/system/background-process/overview'] = array(
'type' => MENU_LOCAL_TASK,
'title' => 'Overview',
'description' => 'Administer background processes',
'page callback' => 'background_process_overview_page',
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
'weight' => 3,
);
$items['admin/config/system/background-process/dispatchers/settings'] = array(
'type' => MENU_DEFAULT_LOCAL_TASK,
'title' => 'Settings',
'weight' => 1,
);
$items['admin/config/system/background-process/dispatchers'] = array(
'type' => MENU_LOCAL_TASK,
'title' => 'Dispatchers',
'description' => 'Administer dispatchers',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'background_process_dispatchers_form',
),
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
'weight' => 3,
);
$items['admin/config/system/background-process/dispatchers/http'] = array(
'type' => MENU_LOCAL_TASK,
'title' => 'HTTP',
'description' => 'Administer HTTP dispatcher',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'background_process_http_dispatcher_form',
),
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
'weight' => 3,
);
$items['admin/config/system/background-process/dispatchers/drush'] = array(
'type' => MENU_LOCAL_TASK,
'title' => 'Drush',
'description' => 'Administer Drush dispatcher',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'background_process_drush_dispatcher_form',
),
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
'weight' => 3,
);
$items['admin/config/system/background-process/dispatchers/queue'] = array(
'type' => MENU_LOCAL_TASK,
'title' => 'Queue',
'description' => 'Administer queue dispatcher',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'background_process_queue_dispatcher_form',
),
'access arguments' => array(
'administer background process',
),
'file' => 'background_process.admin.inc',
'weight' => 3,
);
return $items;
}
function background_process_background_process_dispatcher_info() {
$common = array(
'file' => drupal_get_path('module', 'background_process') . '/background_process.dispatchers.inc',
);
$items = array(
'http' => array(
'callback' => 'background_process_dispatcher_http',
'title callback' => 'background_process_dispatcher_http_title',
'options callback' => 'background_process_dispatcher_http_options',
) + $common,
'drush' => array(
'callback' => 'background_process_dispatcher_drush',
'title callback' => 'background_process_dispatcher_drush_title',
'options callback' => 'background_process_dispatcher_drush_options',
) + $common,
'queue' => array(
'callback' => 'background_process_dispatcher_queue',
'title callback' => 'background_process_dispatcher_queue_title',
'options callback' => 'background_process_dispatcher_queue_options',
) + $common,
'foreground' => array(
'callback' => 'background_process_dispatcher_foreground',
'title callback' => 'background_process_dispatcher_foreground_title',
) + $common,
);
return $items;
}
function background_process_get_dispatchers() {
$dispatchers =& drupal_static(__FUNCTION__);
if (!isset($dispatchers)) {
$cache = cache_get('background_process_dispatcher_info');
if ($cache) {
$dispatchers = $cache->data;
}
else {
$dispatchers = module_invoke_all('background_process_dispatcher_info');
drupal_alter('background_process_dispatcher_info', $dispatchers);
cache_set('background_process_dispatcher_info', $dispatchers, 'cache');
}
}
return $dispatchers;
}
function background_process_prepare_dispatcher_callback($dispatcher, $type = NULL) {
$dispatchers = background_process_get_dispatchers();
if (empty($dispatchers[$dispatcher])) {
throw new Exception(t('Dispatcher: %dispatcher not found', array(
'%dispatcher' => $dispatcher,
)));
}
if (isset($dispatchers[$dispatcher])) {
include_once $dispatchers[$dispatcher]['file'];
}
$key = ($type ? $type . ' ' : '') . 'callback';
return isset($dispatchers[$dispatcher][$key]) ? $dispatchers[$dispatcher][$key] : NULL;
}
function background_process_invoke_dispatcher($dispatcher, $process) {
$callback = background_process_prepare_dispatcher_callback($dispatcher);
return call_user_func($callback, $process);
}
function background_process_get_service_host_title($service_host) {
if ($callback = background_process_prepare_dispatcher_callback($service_host['dispatcher'], 'title')) {
return call_user_func($callback, $service_host);
}
return '';
}
function background_process_get_service_host_options($service_host) {
if ($callback = background_process_prepare_dispatcher_callback($service_host['dispatcher'], 'options')) {
return call_user_func($callback, $service_host);
}
return array();
}
function background_process_permission() {
return array(
'administer background process' => array(
'title' => t('Administer background processes'),
'description' => t('Perform administration tasks for background processes.'),
),
);
}
function background_process_cron_queue_info() {
$default = variable_get('background_process_queue_name', BACKGROUND_PROCESS_QUEUE_NAME);
$queues = array();
$queues[$default] = array(
'worker callback' => 'background_process_cron_queue_worker',
);
$service_hosts = background_process_get_service_hosts();
foreach ($service_hosts as $service_host) {
if ($service_host['dispatcher'] == 'queue' && isset($service_host['queue']) && !isset($queues[$service_host['queue']])) {
$queues[$service_host['queue']] = $queues[$default];
}
}
return $queues;
}
function background_process_cron_queue_worker($pid) {
if ($process = BackgroundProcess::load($pid)) {
$process
->execute();
}
else {
watchdog('bgprocess', 'Process: @pid not found', array(
'@pid' => $pid,
), WATCHDOG_ERROR);
}
}
function background_process_cron() {
$expire = 120;
set_time_limit($expire);
$time = time();
$pids = array(
0,
);
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$results = db_select('background_process', 'bp', array(
'target' => 'background_process',
))
->fields('bp')
->condition('bp.created', time() - 10, '<')
->condition('bp.created', time() - variable_get('background_process_cleanup_age', BACKGROUND_PROCESS_CLEANUP_AGE), '>')
->condition('bp.exec_status', BACKGROUND_PROCESS_STATUS_LOCKED)
->condition('bp.pid', $pids, 'NOT IN')
->range(0, 10)
->orderBy('bp.pid')
->execute()
->fetchAll(PDO::FETCH_OBJ);
foreach ($results as $result) {
$pids[] = $result->pid;
$process = BackgroundProcess::create($result);
$process
->reDispatch();
watchdog('bg_process', 'Redispatched process %handle (%pid)', array(
'%handle' => $process->handle,
'%pid' => $process->pid,
), WATCHDOG_INFO);
}
} while (!empty($results));
$time = time();
$msg = t('Never started (auto unlock due to timeout)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$results = db_select('background_process', 'bp', array(
'target' => 'background_process',
))
->fields('bp')
->condition('bp.created', time() - variable_get('background_process_cleanup_age', BACKGROUND_PROCESS_CLEANUP_AGE), '<')
->condition('bp.exec_status', BACKGROUND_PROCESS_STATUS_LOCKED)
->range(0, 10)
->orderBy('bp.pid')
->execute()
->fetchAll(PDO::FETCH_OBJ);
foreach ($results as $result) {
$process = BackgroundProcess::create($result);
$process
->log($msg)
->unlock();
watchdog('bg_process', 'Background process %handle (%pid) unlocked: !msg', array(
'%handle' => $process->handle,
'%pid' => $process->pid,
'!msg' => $msg,
), WATCHDOG_INFO);
}
} while (!empty($results));
$time = time();
$msg = t('Never finished (auto unlock due to long run)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$results = db_select('background_process', 'bp', array(
'target' => 'background_process',
))
->fields('bp')
->condition('bp.start_stamp', $time - variable_get('background_process_running_cleanup_age', BACKGROUND_PROCESS_RUNNING_CLEANUP_AGE), '<')
->condition('bp.exec_status', BACKGROUND_PROCESS_STATUS_RUNNING)
->range(0, 10)
->execute()
->fetchAll(PDO::FETCH_OBJ);
foreach ($results as $result) {
$process = BackgroundProcess::create($result);
$process
->log($msg)
->unlock();
watchdog('bg_process', 'Background process %handle (%pid) unlocked: !msg', array(
'%handle' => $process->handle,
'%pid' => $process->pid,
'!msg' => $msg,
), WATCHDOG_INFO);
}
} while (!empty($results));
$time = time();
$msg = t('Never started (auto unlock due to timeout)');
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$results = db_select('background_process', 'bp', array(
'target' => 'background_process',
))
->fields('bp')
->condition('bp.created', $time - variable_get('background_process_queue_cleanup_age', BACKGROUND_PROCESS_QUEUE_CLEANUP_AGE), '<')
->condition('bp.exec_status', BACKGROUND_PROCESS_STATUS_QUEUED)
->range(0, 10)
->execute()
->fetchAll(PDO::FETCH_OBJ);
foreach ($results as $result) {
$process = BackgroundProcess::create($result);
$process
->log($msg)
->unlock();
watchdog('bg_process', 'Background process %handle (%pid) unlocked: !msg', array(
'%handle' => $process->handle,
'%pid' => $process->pid,
'!msg' => $msg,
), WATCHDOG_INFO);
}
} while (!empty($results));
$time = time();
do {
if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
break;
}
$pids = db_select('background_process_result', 'r', array(
'target' => 'background_process',
))
->fields('r', array(
'pid',
))
->condition('r.created', $time - variable_get('background_process_result_cleanup_age', BACKGROUND_PROCESS_RESULT_CLEANUP_AGE), '<')
->range(0, 100)
->execute()
->fetchAllKeyed(0, 0);
if ($pids) {
db_delete('background_process_result', array(
'target' => 'background_process',
))
->condition('pid', $pids, 'IN')
->execute();
}
} while (!empty($pids));
}
function background_process_cron_schedule_alter(&$items) {
if (!isset($items['background_process_cron'])) {
$hooks = ultimate_cron_get_hooks();
if (isset($hooks['background_process_cron']['background_process'])) {
$process =& $hooks['background_process_cron']['background_process'];
if ($process && $process
->getStartTime() + 120 < time()) {
$process
->log(t('Self unlocking stale cleanup job'))
->unlock();
$process = NULL;
}
}
}
if (isset($items['background_process_cron'])) {
$org_items = $items;
$items = array();
$items['background_process_cron'] = $org_items['background_process_cron'];
$items += $org_items;
}
}
function background_process_watchdog($log = array()) {
if ($process = BackgroundProcess::currentProcess()) {
$log['variables'] = is_array($log['variables']) ? $log['variables'] : array();
$process
->log(t($log['message'], $log['variables']), $log['severity']);
}
}
function background_process_service_group() {
$info = array();
$info['methods']['background_process_service_group_round_robin'] = t('Pseudo round-robin');
$info['methods']['background_process_service_group_random'] = t('Random');
$info['methods']['background_process_service_group_idle'] = t('Idle');
return $info;
}
function background_process_service_group_random($service_group) {
return $service_group['hosts'][rand(0, count($service_group['hosts']) - 1)];
}
function background_process_service_group_round_robin($service_group) {
static $idx = NULL;
if (isset($idx)) {
$idx = ($idx + 1) % count($service_group['hosts']);
}
else {
$idx = rand(0, count($service_group['hosts']) - 1);
}
return $service_group['hosts'][$idx];
}
function background_process_service_group_idle($service_group) {
$current = array(
'host' => reset($service_group['hosts']),
'clients' => -1,
);
$service_hosts = background_process_get_service_hosts();
foreach ($service_group['hosts'] as $host) {
$clients = background_process_current_clients($host);
$max_clients = $service_hosts[$host]['max_clients'];
$available = $max_clients - $clients;
if ($available && $current['clients'] < $available) {
$current = array(
'host' => $host,
'clients' => $available,
);
}
}
if ($current['clients'] == -1) {
$current['host'] = background_process_service_group_round_robin($service_group);
}
return $current['host'];
}
function background_process_service_access($pid) {
drupal_save_session(FALSE);
unset($_SESSION);
$process = BackgroundProcess::load($pid);
if (!$process) {
watchdog('bg_process', 'Unknown process: %pid', array(
'%pid' => $pid,
));
return FALSE;
}
$token = $_POST['token'];
if ($token !== $process
->getToken()) {
watchdog('bg_process', 'Invalid token: %token for handle: %handle', array(
'%token' => $token,
'%handle' => $process
->getHandle(),
));
return FALSE;
}
return TRUE;
}
function background_process_current_clients($service_host) {
$clients = db_query("SELECT COUNT(1) FROM {background_process} WHERE service_host = :service_host", array(
':service_host' => $service_host,
), array(
'target' => 'background_process',
))
->fetchField();
return $clients;
}
function background_process_load($pid) {
if (is_numeric($pid)) {
return BackgroundProcess::load($pid);
}
else {
return BackgroundProcess::loadByHandle($pid);
}
}
function background_process_service_unlock($process) {
global $user;
if ($process
->log(t('Manually unlocked by @user', array(
'@user' => $user->name,
)))
->unlock()) {
drupal_set_message(t('Process %handle unlocked', array(
'%handle' => $process->handle,
)));
}
else {
drupal_set_message(t('Process %handle could not be unlocked', array(
'%handle' => $process->handle,
)), 'error');
}
drupal_goto();
}
function background_process_service_start($pid) {
$cid = 'menu_item:' . hash('sha256', $_GET['q']);
drupal_register_shutdown_function('cache_clear_all', $cid, 'cache_menu');
$process = BackgroundProcess::load($pid);
if (!$process) {
watchdog('bg_process', 'Unknown process: %pid', array(
'%pid' => $pid,
), WATCHDOG_ERROR);
}
$process
->execute();
exit;
}
function background_process_get_service_hosts() {
global $base_url;
$default_options = array(
'max_clients' => -1,
);
$service_hosts = variable_get('background_process_service_hosts', array());
$service_hosts += array(
'default' => array(
'dispatcher' => 'http',
'base_url' => $base_url,
),
);
foreach ($service_hosts as &$service_host) {
$service_host += background_process_get_service_host_options($service_host) + $default_options;
}
return $service_hosts;
}
function background_process_get_service_groups() {
$default_options = array(
'method' => 'background_process_service_group_round_robin',
);
$service_groups = variable_get('background_process_service_groups', array());
$service_groups += array(
'default' => array(
'hosts' => array(
variable_get('background_process_default_service_host', 'default'),
),
),
);
foreach ($service_groups as &$service_group) {
$service_group += $default_options;
}
return $service_groups;
}
function _background_process_callback_name($callback) {
if (is_array($callback)) {
if (is_object($callback[0])) {
$callback = get_class($callback[0]) . '->' . $callback[1];
}
else {
$callback = $callback[0] . '::' . $callback[1];
}
}
return $callback;
}
function background_process_current_handle($handle = NULL) {
$process = BackgroundProcess::currentProcess();
return $process
->getHandle();
}
function background_process_restart() {
$process = BackgroundProcess::currentProcess();
$process
->restart();
exit;
}
function background_process_keepalive() {
$process = BackgroundProcess::currentProcess();
$process
->keepAlive();
}
function background_process_start($callback) {
$args = func_get_args();
$callback = array_shift($args);
return BackgroundProcess::lock()
->setCallback($callback, $args)
->dispatch()
->getHandle();
}
function background_process_start_locked($handle, $callback) {
$args = func_get_args();
$handle = array_shift($args);
$callback = array_shift($args);
return BackgroundProcess::lock($handle)
->setCallback($callback, $args)
->dispatch()
->getHandle();
}
function background_process_queue($callback) {
$args = func_get_args();
$callback = array_shift($args);
return BackgroundProcess::lock()
->setCallback($callback, $args)
->queue()
->getHandle();
}
function background_process_queue_locked($handle, $callback) {
$args = func_get_args();
$handle = array_shift($args);
$callback = array_shift($args);
return BackgroundProcess::lock($handle)
->setCallback($callback, $args)
->queue()
->getHandle();
}
function background_process_get_process($handle) {
return BackgroundProcess::loadByHandle($handle);
}
function background_process_update_status($handle, $status) {
$process = BackgroundProcess::loadByHandle($handle);
$process
->setStatus($status)
->writeData();
return $process;
}