background_process.inc in Background Process 7.2
External API short overview
::load($pid) - load a process ::loadByHandle($handle) - load a process ::currentProcess() - get currently running process ::lock($handle = NULL) - lock new process ->getHandle() - get handle of process ->getProgress() - get current progress of process ->getProgressMessage() - get current progress message of process ->calculateETA() - calculate the ETA of the process (if applicable) ->setServiceHost($service_host) - set the service group to use ->setServiceGroup($service_group) - set the service group to use ->setCallback($callback, $args) - set callback and arguments ->setProgress($progress) - set the current progress of the process (0..1) ->setProgressInterval($interval) - set the interval in seconds (float) to write the status to the database ->setInclude($file) - include specific file before executing (can be called multiple times) ->setPhase($phase) - set the phase for which to run the process at ->setResultStorage(&$result) - return result in variable passed-by-reference. ->keepAlive($keepalive) - restart the process after completion ->setShutdownCallback($callback, $args = array()) - register handler to be run when shutting down the process ->unlock() - unlock the process ->reDispatch($threshold = 10) - reDispatch (or cleanup) the process if applicable
Example of loading node in background: $node = NULL; $processes[] = BackgroundProcess::lock()->setResultStorage($node)->setCallback('node_load', array(1))->dispatch(); BackgroundProcess::waitForFinish($processes); print_r($node);
File
background_process.incView source
<?php
define('BACKGROUND_PROCESS_ERROR_NO_LOCK', 0x1);
define('BACKGROUND_PROCESS_ERROR_LIMIT_REACHED', 0x2);
define('BACKGROUND_PROCESS_ERROR_NO_SERVICE_GROUP', 0x3);
define('BACKGROUND_PROCESS_ERROR_INVALID_BALANCER', 0x4);
define('BACKGROUND_PROCESS_ERROR_NO_PID', 0x5);
define('BACKGROUND_PROCESS_ERROR_ARGUMENTS', 0x6);
define('BACKGROUND_PROCESS_ERROR_INVALID_CALLBACK', 0x7);
/**
* @file
*
* External API short overview
*
* ::load($pid) - load a process
* ::loadByHandle($handle) - load a process
* ::currentProcess() - get currently running process
* ::lock($handle = NULL) - lock new process
* ->getHandle() - get handle of process
* ->getProgress() - get current progress of process
* ->getProgressMessage() - get current progress message of process
* ->calculateETA() - calculate the ETA of the process (if applicable)
* ->setServiceHost($service_host) - set the service group to use
* ->setServiceGroup($service_group) - set the service group to use
* ->setCallback($callback, $args) - set callback and arguments
* ->setProgress($progress) - set the current progress of the process (0..1)
* ->setProgressInterval($interval) - set the interval in seconds (float) to write the status to the database
* ->setInclude($file) - include specific file before executing (can be called multiple times)
* ->setPhase($phase) - set the phase for which to run the process at
* ->setResultStorage(&$result) - return result in variable passed-by-reference.
* ->keepAlive($keepalive) - restart the process after completion
* ->setShutdownCallback($callback, $args = array()) - register handler to be run when shutting down the process
* ->unlock() - unlock the process
* ->reDispatch($threshold = 10) - reDispatch (or cleanup) the process if applicable
*
* Example of loading node in background:
* $node = NULL;
* $processes[] = BackgroundProcess::lock()->setResultStorage($node)->setCallback('node_load', array(1))->dispatch();
* BackgroundProcess::waitForFinish($processes);
* print_r($node);
*/
class BackgroundProcess {
public $debug = TRUE;
public $connection = FALSE;
public $pid = NULL;
public $handle = NULL;
public $callback = NULL;
public $options = array();
public $uid = NULL;
public $token = NULL;
public $service_host = NULL;
public $service_group = NULL;
public $progress = -1;
public $request = NULL;
public $start_stamp = NULL;
public $exec_status = NULL;
public $result = NULL;
public $progress_last_updated = 0;
public $progress_interval = 1;
// Only update progress column once per second
public $registered = FALSE;
public $remove = FALSE;
public $shutdown = FALSE;
public $dirty = array();
public $keepalive = 0;
public $last_error;
private static $current_process = NULL;
private static $processes = array();
private $log = array(
'entries' => array(),
'severity' => -1,
);
/**
* Constructor.
* Will generate a "unique" handle for the process if none is specified.
*/
public function __construct($handle = NULL) {
$this->handle = isset($handle) ? $handle : md5(uniqid('bgp', TRUE));
$this->options = self::defaultOptions();
}
// ---------- FACTORY METHODS ----------
/**
* Get/set the current global process
*
* @param BackgroundProcess $process (optional)
* BackgroundProcess to set
* @return BackgroundProcess
* Current BackgroundProcess object
*/
public static function currentProcess(BackgroundProcess $process = NULL) {
if ($process) {
$old_process = self::$current_process;
self::$current_process = $process;
return $old_process;
}
return self::$current_process;
}
/**
* Instantiate a new BackgroundProcess object with data
*
* @param $data
* stdClass object with data to populate BackgroundProcess object with
* @return BackgroundProcess
*/
public static function create($data) {
$process = new BackgroundProcess();
foreach ($data as $key => $value) {
$process->{$key} = $value;
}
$process->callback = unserialize($process->callback);
$process->arguments = unserialize($process->arguments);
$process->options = $process->options ? unserialize($process->options) : array();
$process->options += self::defaultOptions();
$process->start = $process->start_stamp;
$process
->logDebug(__FUNCTION__);
self::$processes['handle'][$process->handle] = $process;
self::$processes['pid'][$process->pid] = $process;
return $process;
}
/**
* Load BackgroundProcess from the DB.
*
* @param $pid
* Process ID
* @param $reset (optional)
* Bypass static cache
* @return BackgroundProcess
*/
public static function load($pid, $reset = FALSE) {
// Ensure DB availability
if ($reset || !isset(self::$processes['pid'][$pid])) {
drupal_bootstrap(DRUPAL_BOOTSTRAP_DATABASE);
$result = db_select('background_process', 'bp', array(
'target' => 'background_process',
))
->fields('bp')
->condition('bp.pid', $pid)
->execute()
->fetchObject();
return $result ? self::create($result) : NULL;
}
return self::$processes['pid'][$pid];
}
/**
* Load BackgroundProcess from the DB.
*
* @param $pid
* Process ID
* @param $reset (optional)
* Bypass static cache
* @return BackgroundProcess
*/
public static function loadByHandle($handle, $reset = FALSE) {
// Ensure DB availability
if ($reset || !isset(self::$processes['handle'][$handle])) {
drupal_bootstrap(DRUPAL_BOOTSTRAP_DATABASE);
$result = db_select('background_process', 'bp', array(
'target' => 'background_process',
))
->fields('bp')
->condition('bp.handle', $handle)
->execute()
->fetchObject();
return $result ? self::create($result) : NULL;
}
return self::$processes['handle'][$handle];
}
/**
* Load all BackgroundProcess from the DB.
*
* @param optional $status
* Status
* @return array
* BackgroundProcess objects
*/
public static function loadAll($status = NULL) {
// Ensure DB availability
drupal_bootstrap(DRUPAL_BOOTSTRAP_DATABASE);
$result = db_select('background_process', 'bp', array(
'target' => 'background_process',
))
->fields('bp');
if (isset($status)) {
$result = $result
->condition('bp.exec_status', $status);
}
$result = $result
->execute();
$processes = array();
while ($process = $result
->fetchObject()) {
$processes[] = BackgroundProcess::create($process);
}
return $processes;
}
/**
* Create BackgroundProcess object and lock it.
*
* @param string $handle (optional)
* If specified, $handle will be used as key for the lock. Otherwise a unique handle will be generated.
* @return BackgroundProcess object
*/
public static function lock($handle = NULL) {
$process = new BackgroundProcess($handle);
$process->token = uniqid('BGP', TRUE);
try {
$process->exec_status = BACKGROUND_PROCESS_STATUS_LOCKED;
$process->created = $process->start_stamp = microtime(TRUE);
$process->pid = db_insert('background_process', array(
'target' => 'background_process',
))
->fields(array(
'token' => $process->token,
'handle' => $process->handle,
'created' => $process->created,
'start_stamp' => $process->start_stamp,
'exec_status' => $process->exec_status,
))
->execute();
$process
->sendMessage('locked');
$process
->logDebug(__FUNCTION__);
$process
->ensureCleanup(FALSE, TRUE);
return $process;
} catch (Exception $e) {
if ($e
->getCode() == '23000') {
throw new BackgroundProcessException(t('@handle already locked', array(
'@handle' => $handle,
)), BACKGROUND_PROCESS_ERROR_NO_LOCK);
}
throw $e;
}
}
// ---------- GETTERS ----------
/**
* Get current user id.
*/
public function getPID() {
return $this->pid;
}
/**
* Get current user id.
*/
public function getUID() {
return $this->uid;
}
/**
* Get current handle.
*/
public function getHandle() {
return $this->handle;
}
/**
* Get current token.
*/
public function getToken() {
return $this->token;
}
/**
* Get phase
*/
public function getPhase() {
return empty($this->options['phase']) ? NULL : $this->options['phase'];
}
/**
* Get registered shutdown callbacks.
*/
public function getShutdownCallbacks() {
return $this->options['shutdown_callbacks'];
}
/**
* Get current service group.
*
* @return string
* Name of service group
*/
public function getServiceGroup() {
return $this->service_group;
}
/**
* Get current service host.
*/
public function getServiceHost() {
return $this->service_host;
}
/**
* Get status
*/
public function getStatus() {
return $this->exec_status;
}
/**
* Get option
*/
public function getOption($key) {
return $this->options[$key];
}
/**
* Get created time (launch time)
*/
public function getCreated() {
return $this->created;
}
/**
* Get start time (considers keepalive)
*/
public function getStartTime() {
return $this->start_stamp;
}
/**
* Get dispatcher
*/
public function getDispatcher() {
return $this->options['dispatcher'];
}
/**
* Get result
*/
public function getResult() {
return $this->result;
}
// ---------- SETTERS ----------
/**
* Set current user id.
*/
public function setUID($uid) {
$this->uid = $this->dirty['uid'] = $uid;
return $this;
}
/**
* Set callback and arguments
*/
public function setCallback($callback, $arguments = array()) {
$this
->ensureProcess();
$this->callback = $callback;
$this->arguments = $arguments;
$this->dirty['callback'] = serialize($this->callback);
$this->dirty['arguments'] = serialize($this->arguments);
$this
->logDebug('setCallback' . " {$callback} (" . strlen($this->dirty['arguments']) . " bytes)");
return $this;
}
/**
* Include specific file for the callback execution.
*
* @param $file
* Relative path+file of the file to include.
*/
public function setInclude($file) {
$this->options['include'][] = $file;
return $this
->setOptions($this->options);
}
/**
* Set the bootstrap phase in which the process should run.
*
* @param int $phase
* Run at bootstrap phase.
*/
public function setPhase($phase) {
$this->options['phase'] = $phase;
return $this
->setOptions($this->options);
}
/**
* Reference to variable where the result should be stored.
*/
public function setResultStorage(&$result) {
$this->result =& $result;
return $this;
}
/**
* Register a function to be run at shutdown
*/
public function setShutdownCallback($callback, $args = array()) {
$this->options['shutdown_callbacks'][] = array(
$callback,
(array) $args,
);
return $this;
}
/**
* Set current service host.
*
* @param optional $service_host
* Service host to use. If invalid or none specified, the default service host will be used.
*/
public function setServiceHost($service_host = NULL) {
$this
->ensureProcess();
$service_hosts = background_process_get_service_hosts();
if (!$service_host || empty($service_hosts[$service_host])) {
// Invalid service hosts selected!
$service_host = variable_get('background_process_default_service_host', 'default');
}
if ($service_hosts[$service_host]['max_clients'] > 0) {
$clients = background_process_current_clients($service_host);
if ($service_hosts[$service_host]['max_clients'] <= $clients) {
$this
->keepAlive(FALSE)
->remove();
throw new BackgroundProcessException(t('Max clients limit reached'), BACKGROUND_PROCESS_ERROR_LIMIT_REACHED);
}
}
$this->service_host = $service_host;
$this->dirty['service_host'] = $this->service_host;
$this
->setDispatcher($service_hosts[$service_host]['dispatcher']);
return $this;
}
/**
* Set the service group. This method sets the service host based on the service group.
*
* @param optional $service_group
* Service group. If invalid or none specified, the default service group will be used.
*/
public function setServiceGroup($service_group_name = NULL) {
if (!$service_group_name && isset($this->service_group)) {
$service_group_name = $this->service_group;
}
$service_groups = background_process_get_service_groups();
$service_host = NULL;
if (!$service_group_name || empty($service_groups[$service_group_name])) {
$service_group_name = variable_get('background_process_default_service_group', 'default');
}
if (empty($service_groups[$service_group_name])) {
// Default service group not found, use default service host.
throw new BackgroundProcessException(t('Default service group not found'), BACKGROUND_PROCESS_ERROR_NO_SERVICE_GROUP);
}
else {
$this->service_group = $service_groups[$service_group_name];
// Ask the balancer for an appropriate service host
if (!is_callable($this->service_group['method'])) {
throw new BackgroundProcessException(t('Cannot call balancer method'), BACKGROUND_PROCESS_ERROR_INVALID_BALANCER);
}
$service_host = call_user_func($this->service_group['method'], $this->service_group);
}
$this
->setServiceHost($service_host);
return $this;
}
/**
* Set status
*/
public function setStatus($status) {
$this
->ensureProcess();
$this->dirty['exec_status'] = $this->exec_status = $status;
return $this;
}
/**
* Set callback options
*/
public function setOption($key, $value) {
$this
->logDebug(__FUNCTION__);
$this
->ensureProcess();
$this->options[$key] = $value;
$this->options += self::defaultOptions();
$this->dirty['options'] = serialize($this->options);
return $this;
}
/**
* Set callback options
*/
public function setOptions($options = array()) {
$this
->logDebug(__FUNCTION__);
$this
->ensureProcess();
$this->options = $options + self::defaultOptions();
$this->dirty['options'] = serialize($this->options);
return $this;
}
/**
* Set start time
*
* @param $start_stamp
* Unix timestamp (with microseconds) of start time
*/
public function setStartTime($start_stamp) {
$this
->ensureProcess();
$this->dirty['start_stamp'] = $this->start_stamp = $start_stamp;
return $this;
}
public function setDispatcher($dispatcher) {
$this->options['dispatcher'] = $dispatcher;
$this
->setOptions($this->options);
return $this;
}
// ---------- API ----------
/**
* Execute the background process callback function.
*/
public function execute() {
$this
->logDebug(__FUNCTION__);
if (!$this
->claim()) {
watchdog('bg_process', 'Could not claim process %pid : %handle', array(
'%pid' => $this->pid,
'%handle' => $this->handle,
), WATCHDOG_ERROR);
return FALSE;
}
$this
->ensureCleanup();
if ($this->options['detach']) {
$this
->flush();
}
if (!empty($this->options['include'])) {
foreach ($this->options['include'] as $file) {
include_once $file;
}
}
if (is_callable($this->callback)) {
$this
->logDebug($this->callback . ' is callable');
// Run indefinitly...
// @todo Make timeout configurable, perhaps through options?
set_time_limit(0);
// Run process as specified user
$old_user = NULL;
global $user;
$old_user = $user;
if ($this->uid && ($as_user = user_load($this->uid))) {
$user = $as_user;
}
else {
$user = drupal_anonymous_user();
}
// Set current process
$old_process = self::currentProcess($this);
try {
$this
->logDebug($this->callback . ' is called now!');
// Just to avoid endless loops on keepalive, check if arguments really is an array
if (!is_array($this->arguments)) {
$this
->keepAlive(FALSE);
throw new BackgroundProcessException(t('Background Process arguments is not an array!'), BACKGROUND_PROCESS_ERROR_ARGUMENTS);
}
module_invoke_all('background_process_pre_execute', $this);
$this->result = call_user_func_array($this->callback, $this->arguments);
module_invoke_all('background_process_post_execute', $this);
if ($this->options['store_result']) {
db_insert('background_process_result', array(
'target' => 'background_process',
))
->fields(array(
'pid' => $this->pid,
'result' => $this->result,
'created' => time(),
))
->execute();
}
$this
->shutdown();
self::currentProcess($old_process);
// Done, restore user.
if ($old_user) {
$user = $old_user;
}
return $this;
} catch (Exception $e) {
// Something went wrong. Restore user and rethrow exception.
if ($old_user) {
$user = $old_user;
}
self::currentProcess($old_process);
$this
->logDebug('exception thrown! ' . (string) $e);
throw $e;
}
}
else {
// Throw exception?
$this
->ensureProcess();
$this
->keepAlive(FALSE)
->writeData();
throw new BackgroundProcessException(t('Callback: %callback not found', array(
'%callback' => _background_process_callback_name($process->callback),
)), BACKGROUND_PROCESS_ERROR_INVALID_CALLBACK);
}
}
/**
* Use blocking style request, and instruct the subrequests to detach asap.
*
* @todo Bad naming?
*/
public function detach($detach = TRUE) {
$this
->logDebug(__FUNCTION__);
$this->options['detach'] = TRUE;
$this
->setOptions($this->options);
return $this;
}
/**
* Keep the request alive (i.e., restart after finish).
*/
public function keepAlive($keepalive = TRUE) {
$this
->logDebug(__FUNCTION__ . " : {$keepalive}");
$this->dirty['keepalive'] = $this->keepalive = (int) $keepalive;
if (empty($this->options['keepalive_counter'])) {
$this->options['store_result'] = FALSE;
$this
->setOptions($this->options);
}
return $this;
}
/**
* Do/don't store the result
*/
public function storeResult($store_result = TRUE) {
$this->options['store_result'] = $store_result;
$this
->setOptions($this->options);
return $this;
}
/**
* Shutdown a process (and ensure it doesn't start again by itself, i.e. "unlock")
*/
public function unlock() {
$this
->logDebug(__FUNCTION__);
return $this
->keepAlive(FALSE)
->ensureCleanup()
->shutdown();
}
/**
* Restart the request
*/
public function restart() {
return $this
->setStatus(BACKGROUND_PROCESS_STATUS_LOCKED)
->writeData()
->dispatch();
}
/**
* Launch the process
*/
public function dispatch() {
$this
->logDebug(__FUNCTION__ . ' - ' . $this->options['dispatcher']);
// Ensure sanity
$this
->ensureProcess();
$this
->ensureServiceHost();
$this
->ensureCleanup(FALSE, FALSE);
// Update status and send message
$this
->setStatus(BACKGROUND_PROCESS_STATUS_DISPATCHED)
->writeData();
$this
->sendMessage('dispatch');
module_invoke_all('background_process_pre_dispatch', $this);
// Write data again, in case pre dispatch hook changed something
$this
->writeData();
background_process_invoke_dispatcher($this->options['dispatcher'], $this);
module_invoke_all('background_process_post_dispatch', $this);
return $this;
}
/**
* If process has been locked for more than 10 seconds without starting
* then dispatch it again.
*/
public function reDispatch($threshold = 10) {
// If process is locked and hasn't started for X seconds, then relaunch
if ($this
->getStatus() == BACKGROUND_PROCESS_STATUS_LOCKED && $this
->getCreated() + $threshold <= time()) {
$this
->logDebug(__FUNCTION__);
$this
->dispatch();
return TRUE;
}
// This is not re-dispatch?!? ... (re)move it!
if ($this
->getStatus() == BACKGROUND_PROCESS_STATUS_LOCKED && $this
->getStartTime() + variable_get('background_process_cleanup_age', BACKGROUND_PROCESS_CLEANUP_AGE) <= time()) {
$this
->shutdown();
return FALSE;
}
return FALSE;
}
/**
* Dispatch multiple prepared processes
*
* @param array &$processes
* BackgroundProcess objects - dispatched processes will be removed from the array
* @return array
* Dispatched BackgroundProcess objects
*/
public static function dispatchAll(&$processes) {
$dispatched = array();
foreach ($processes as $idx => $process) {
try {
$process
->dispatch();
$dispatched[] = $process;
unset($processes[$idx]);
} catch (BackgroundProcessException $e) {
// Skip to the next.
$process->last_error = $e;
}
}
return $dispatched;
}
/**
* Finish the request by storing result and invoking callback finish?
*/
public static function waitForFinish($processes, $options = array()) {
if (empty($processes)) {
return 0;
}
$keyed = array();
foreach ($processes as $process) {
$keyed[$process->pid] = $process;
}
$options += array(
'interval' => 1,
'timeout' => 10,
);
$interval = $options['interval'] * 1000000;
$expire = microtime(TRUE) + $options['timeout'];
do {
$results = db_select('background_process_result', 'r', array(
'target' => 'background_process',
))
->fields('r')
->condition('r.pid', array_keys($keyed), 'IN')
->execute()
->fetchAll(PDO::FETCH_OBJ);
usleep($interval);
} while (microtime(TRUE) < $expire && count($results) < count($processes));
foreach ($results as $result) {
$keyed[$result->pid]->result = $result->result;
}
return count($results);
}
// ---------- LEGACY API (1.x) ----------
// Legacy wrapper
public function start($callback, $args) {
$process = BackgroundProcess::lock($this->handle);
$process
->ensureCleanup(FALSE, FALSE);
$this->handle = $process->handle;
$this->token = $process->token;
$this->exec_status = $process->exec_status;
$this->created = $process->created;
$this->pid = $process->pid;
$this
->ensureCleanup(FALSE, TRUE);
return $this
->setCallback($callback, $args)
->dispatch()
->getHandle();
}
// ---------- PROGRESS ----------
/**
* Get progress
*/
public function getProgress() {
return $this->progress;
}
/**
* Get progress message
*/
public function getProgressMessage() {
return $this->progress_message;
}
/**
* Set progress interval
*/
public function setProgressInterval($interval) {
$this->progress_interval = $interval;
}
/**
* Set progress
*/
public function setProgress($progress, $progress_message = NULL) {
$this
->ensureProcess();
$this->progress = $fields['progress'] = $progress;
if (isset($progress_message)) {
$this->progress_message = $fields['progress_message'] = $progress_message;
}
// Don't spam the DB if someone decides to set the progress very rapidly.
$time = microtime(TRUE);
if ($progress < 1 && $this->progress_last_updated + $this->progress_interval > $time) {
return $this;
}
$this->progress_last_updated = $time;
db_update('background_process', array(
'target' => 'background_process',
))
->fields($fields)
->condition('pid', $this->pid)
->execute();
$this
->sendMessage('setProgress');
return $this;
}
/**
* Calculate ETA of the process
*/
public function calculateETA() {
if ($this->progress > 0) {
return $this->created + 1 / $this->progress * (microtime(TRUE) - $this->created);
}
}
// ---------- LOGGING ----------
/**
* Log a debug message.
*
* @param $msg
* Message to log.
*/
public function logDebug($msg) {
if ($this->debug) {
error_log(getmypid() . ' - ' . request_uri() . " - {$this->pid} : {$this->handle} - " . $msg);
}
}
/**
* Log a message for the process.
*
* @param $message
* Message to log
* @param $severity
* Watchdog severity level (WATCHDOG_ERROR, etc.)
*/
public function log($message, $severity = -1) {
$this->log['entries'][] = array(
'message' => $message,
'severity' => $severity,
);
$this->log['severity'] = $this->log['severity'] < 0 || $severity >= 0 && $severity < $this->log['severity'] ? $severity : $this->log['severity'];
return $this;
}
/**
* Get accumulated log for the process.
*/
public function getLog() {
$original_log = $this->log;
// Get drupal messages
$messages = drupal_get_messages(NULL, TRUE);
$messages['status'] = empty($messages['status']) ? array() : $messages['status'];
$messages['warning'] = empty($messages['warning']) ? array() : $messages['warning'];
$messages['error'] = empty($messages['error']) ? array() : $messages['error'];
foreach ($messages['status'] as $message) {
$this
->log($message);
}
foreach ($messages['warning'] as $message) {
$this
->log($message, WATCHDOG_WARNING);
}
foreach ($messages['error'] as $message) {
$this
->log($message, WATCHDOG_ERROR);
}
// Get error messages
$error = error_get_last();
if ($error) {
$message = $error['message'] . ' (line ' . $error['line'] . ' of ' . $error['file'] . ').' . "\n";
$severity = WATCHDOG_INFO;
if ($error['type'] && (E_NOTICE || E_USER_NOTICE || E_USER_WARNING)) {
$severity = WATCHDOG_NOTICE;
}
if ($error['type'] && (E_WARNING || E_CORE_WARNING || E_USER_WARNING)) {
$severity = WATCHDOG_WARNING;
}
if ($error['type'] && (E_ERROR || E_CORE_ERROR || E_USER_ERROR || E_RECOVERABLE_ERROR)) {
$severity = WATCHDOG_ERROR;
}
$this
->log($message, $severity);
}
$log = $this->log;
$this->log = $original_log;
return $log;
}
public function sendMessage($action) {
if (module_exists('nodejs')) {
// WATCH OUT FOR FUTURE DESTRUCTOR IN BackgroundProcess!!!
$object = clone $this;
$message = (object) array(
'channel' => 'background_process',
'data' => (object) array(
'action' => $action,
'background_process' => $object,
'timestamp' => microtime(TRUE),
),
'callback' => 'nodejsBackgroundProcess',
);
drupal_alter('background_process_message', $message);
nodejs_send_content_channel_message($message);
}
}
// ---------- PROCESS HANDLING ----------
private static function defaultOptions() {
return array(
'keepalive_counter' => 0,
'store_result' => TRUE,
'detach' => FALSE,
'shutdown_callbacks' => array(),
'dispatcher' => 'http',
);
}
/**
* Re-launch the process if necessary
*/
private function doKeepAlive() {
$this
->log(__FUNCTION__);
if ($this->keepalive && $this->options['dispatcher'] != 'foreground') {
$this
->logDebug(__FUNCTION__);
$updated = db_update('background_process', array(
'target' => 'background_process',
))
->fields(array(
'options' => serialize($this->options),
'exec_status' => $this->exec_status,
'created' => microtime(TRUE),
))
->condition('pid', $this->pid)
->condition('exec_status', BACKGROUND_PROCESS_STATUS_RUNNING)
->execute();
if ($updated) {
$this->options['keepalive_counter']++;
$this->exec_status = BACKGROUND_PROCESS_STATUS_LOCKED;
$this
->sendMessage('keepalive');
$this
->dispatch();
return TRUE;
}
}
return FALSE;
}
/**
* Remove the process from the DB (unlock).
*/
private function remove() {
$this
->ensureProcess();
$this
->logDebug(__FUNCTION__);
if ($this->remove) {
$this->remove = FALSE;
$this
->sendMessage('remove');
return db_delete('background_process', array(
'target' => 'background_process',
))
->condition('pid', $this->pid)
->execute();
}
return TRUE;
}
/**
* Flush output buffer.
*/
private function flush() {
header("Connection: close");
ob_flush();
flush();
return $this;
}
/**
* Claim process.
* Before executing the callback, make sure no one else is doing it.
*/
private function claim() {
$this
->logDebug(__FUNCTION__);
$start_stamp = microtime(TRUE);
$count = db_update('background_process', array(
'target' => 'background_process',
))
->fields(array(
'start_stamp' => $start_stamp,
'exec_status' => BACKGROUND_PROCESS_STATUS_RUNNING,
))
->condition('pid', $this->pid)
->condition('exec_status', array(
BACKGROUND_PROCESS_STATUS_DISPATCHED,
), 'IN')
->execute();
if ($count) {
$this->start_stamp = $start_stamp;
$this->exec_status = BACKGROUND_PROCESS_STATUS_RUNNING;
$this
->sendMessage('claimed');
}
return $count;
}
/**
* Shutdown
*/
public function shutdown() {
$this
->logDebug(__FUNCTION__ . " : {$this->shutdown}");
try {
$this
->writeData();
} catch (Exception $e) {
// During shutdown, let's ignore DB errors, so shutdown handler may
// pickup errors, etc. But we won't restart the process.
$this
->keepAlive(FALSE);
$this
->log((string) $e, WATCHDOG_ERROR);
}
// Inform shutdown handlers when done.
if ($this->shutdown) {
$this
->logDebug('shutting down!');
// @todo Bump bootstrap level?
if (function_exists('module_invoke_all')) {
$this
->logDebug('invoking shutdown handlers');
$this->start = $this->start_stamp;
module_invoke_all('background_process_shutdown', $this);
}
$this
->logDebug('calling shutdown callbacks');
foreach ($this
->getShutdownCallbacks() as $info) {
list($callback, $args) = $info;
$args[] = $this;
call_user_func_array($callback, $args);
}
$this->shutdown = FALSE;
return $this
->doKeepAlive() ? TRUE : $this
->remove();
}
return $this
->remove();
}
/**
* Write applicable data (if any) to the database
*/
public function writeData() {
if (empty($this->dirty)) {
return;
}
$this
->logDebug(__FUNCTION__);
try {
db_update('background_process', array(
'target' => 'background_process',
))
->fields($this->dirty)
->condition('pid', $this->pid)
->execute();
$this
->sendMessage('writeData');
} catch (Exception $e) {
throw $e;
}
$this
->logDebug(__FUNCTION__ . ' - done');
$this->dirty = array();
}
// ---------- SANITIZERS ----------
/**
* Make sure prerequisites are met.
*/
private function ensureProcess() {
if (!$this->pid) {
throw new BackgroundProcessException(t('Background process not initialized'), BACKGROUND_PROCESS_ERROR_NO_PID);
}
// Ensure DB availability
drupal_bootstrap(DRUPAL_BOOTSTRAP_DATABASE);
return $this;
}
/**
* Make sure we clean up after ourselves.
*/
private function ensureCleanup($shutdown = TRUE, $remove = TRUE) {
$this->shutdown = $shutdown;
$this->remove = $remove;
if (!$this->registered) {
$this
->logDebug(__FUNCTION__);
$this->registered = TRUE;
// Make sure the process is removed when we're done
drupal_register_shutdown_function(array(
$this,
'shutdown',
));
}
return $this;
}
/**
* Make sure that the process has been designated a service host.
*/
public function ensureServiceHost() {
if (!$this->service_host) {
$this
->setServiceGroup();
$this
->writeData();
}
return $this;
}
}
class BackgroundProcessException extends Exception {
}