You are here

background_process.inc in Background Process 7.2

External API short overview

::load($pid) - load a process ::loadByHandle($handle) - load a process ::currentProcess() - get currently running process ::lock($handle = NULL) - lock new process ->getHandle() - get handle of process ->getProgress() - get current progress of process ->getProgressMessage() - get current progress message of process ->calculateETA() - calculate the ETA of the process (if applicable) ->setServiceHost($service_host) - set the service group to use ->setServiceGroup($service_group) - set the service group to use ->setCallback($callback, $args) - set callback and arguments ->setProgress($progress) - set the current progress of the process (0..1) ->setProgressInterval($interval) - set the interval in seconds (float) to write the status to the database ->setInclude($file) - include specific file before executing (can be called multiple times) ->setPhase($phase) - set the phase for which to run the process at ->setResultStorage(&$result) - return result in variable passed-by-reference. ->keepAlive($keepalive) - restart the process after completion ->setShutdownCallback($callback, $args = array()) - register handler to be run when shutting down the process ->unlock() - unlock the process ->reDispatch($threshold = 10) - reDispatch (or cleanup) the process if applicable

Example of loading node in background: $node = NULL; $processes[] = BackgroundProcess::lock()->setResultStorage($node)->setCallback('node_load', array(1))->dispatch(); BackgroundProcess::waitForFinish($processes); print_r($node);

File

background_process.inc
View source
<?php

define('BACKGROUND_PROCESS_ERROR_NO_LOCK', 0x1);
define('BACKGROUND_PROCESS_ERROR_LIMIT_REACHED', 0x2);
define('BACKGROUND_PROCESS_ERROR_NO_SERVICE_GROUP', 0x3);
define('BACKGROUND_PROCESS_ERROR_INVALID_BALANCER', 0x4);
define('BACKGROUND_PROCESS_ERROR_NO_PID', 0x5);
define('BACKGROUND_PROCESS_ERROR_ARGUMENTS', 0x6);
define('BACKGROUND_PROCESS_ERROR_INVALID_CALLBACK', 0x7);

/**
 * @file
 *
 * External API short overview
 *
 * ::load($pid) - load a process
 * ::loadByHandle($handle) - load a process
 * ::currentProcess() - get currently running process
 * ::lock($handle = NULL) - lock new process
 * ->getHandle() - get handle of process
 * ->getProgress() - get current progress of process
 * ->getProgressMessage() - get current progress message of process
 * ->calculateETA() - calculate the ETA of the process (if applicable)
 * ->setServiceHost($service_host) - set the service group to use
 * ->setServiceGroup($service_group) - set the service group to use
 * ->setCallback($callback, $args) - set callback and arguments
 * ->setProgress($progress) - set the current progress of the process (0..1)
 * ->setProgressInterval($interval) - set the interval in seconds (float) to write the status to the database
 * ->setInclude($file) - include specific file before executing (can be called multiple times)
 * ->setPhase($phase) - set the phase for which to run the process at
 * ->setResultStorage(&$result) - return result in variable passed-by-reference.
 * ->keepAlive($keepalive) - restart the process after completion
 * ->setShutdownCallback($callback, $args = array())  - register handler to be run when shutting down the process
 * ->unlock() - unlock the process
 * ->reDispatch($threshold = 10) - reDispatch (or cleanup) the process if applicable
 *
 * Example of loading node in background:
 * $node = NULL;
 * $processes[] = BackgroundProcess::lock()->setResultStorage($node)->setCallback('node_load', array(1))->dispatch();
 * BackgroundProcess::waitForFinish($processes);
 * print_r($node);
 */
class BackgroundProcess {
  public $debug = TRUE;
  public $connection = FALSE;
  public $pid = NULL;
  public $handle = NULL;
  public $callback = NULL;
  public $options = array();
  public $uid = NULL;
  public $token = NULL;
  public $service_host = NULL;
  public $service_group = NULL;
  public $progress = -1;
  public $request = NULL;
  public $start_stamp = NULL;
  public $exec_status = NULL;
  public $result = NULL;
  public $progress_last_updated = 0;
  public $progress_interval = 1;

  // Only update progress column once per second
  public $registered = FALSE;
  public $remove = FALSE;
  public $shutdown = FALSE;
  public $dirty = array();
  public $keepalive = 0;
  public $last_error;
  private static $current_process = NULL;
  private static $processes = array();
  private $log = array(
    'entries' => array(),
    'severity' => -1,
  );

  /**
   * Constructor.
   * Will generate a "unique" handle for the process if none is specified.
   */
  public function __construct($handle = NULL) {
    $this->handle = isset($handle) ? $handle : md5(uniqid('bgp', TRUE));
    $this->options = self::defaultOptions();
  }

  // ---------- FACTORY METHODS ----------

  /**
   * Get/set the current global process
   *
   * @param BackgroundProcess $process (optional)
   *   BackgroundProcess to set
   * @return BackgroundProcess
   *   Current BackgroundProcess object
   */
  public static function currentProcess(BackgroundProcess $process = NULL) {
    if ($process) {
      $old_process = self::$current_process;
      self::$current_process = $process;
      return $old_process;
    }
    return self::$current_process;
  }

  /**
   * Instantiate a new BackgroundProcess object with data
   *
   * @param $data
   *   stdClass object with data to populate BackgroundProcess object with
   * @return BackgroundProcess
   */
  public static function create($data) {
    $process = new BackgroundProcess();
    foreach ($data as $key => $value) {
      $process->{$key} = $value;
    }
    $process->callback = unserialize($process->callback);
    $process->arguments = unserialize($process->arguments);
    $process->options = $process->options ? unserialize($process->options) : array();
    $process->options += self::defaultOptions();
    $process->start = $process->start_stamp;
    $process
      ->logDebug(__FUNCTION__);
    self::$processes['handle'][$process->handle] = $process;
    self::$processes['pid'][$process->pid] = $process;
    return $process;
  }

  /**
   * Load BackgroundProcess from the DB.
   *
   * @param $pid
   *   Process ID
   * @param $reset (optional)
   *   Bypass static cache
   * @return BackgroundProcess
   */
  public static function load($pid, $reset = FALSE) {

    // Ensure DB availability
    if ($reset || !isset(self::$processes['pid'][$pid])) {
      drupal_bootstrap(DRUPAL_BOOTSTRAP_DATABASE);
      $result = db_select('background_process', 'bp', array(
        'target' => 'background_process',
      ))
        ->fields('bp')
        ->condition('bp.pid', $pid)
        ->execute()
        ->fetchObject();
      return $result ? self::create($result) : NULL;
    }
    return self::$processes['pid'][$pid];
  }

  /**
   * Load BackgroundProcess from the DB.
   *
   * @param $pid
   *   Process ID
   * @param $reset (optional)
   *   Bypass static cache
   * @return BackgroundProcess
   */
  public static function loadByHandle($handle, $reset = FALSE) {

    // Ensure DB availability
    if ($reset || !isset(self::$processes['handle'][$handle])) {
      drupal_bootstrap(DRUPAL_BOOTSTRAP_DATABASE);
      $result = db_select('background_process', 'bp', array(
        'target' => 'background_process',
      ))
        ->fields('bp')
        ->condition('bp.handle', $handle)
        ->execute()
        ->fetchObject();
      return $result ? self::create($result) : NULL;
    }
    return self::$processes['handle'][$handle];
  }

  /**
   * Load all BackgroundProcess from the DB.
   *
   * @param optional $status
   *   Status
   * @return array
   *   BackgroundProcess objects
   */
  public static function loadAll($status = NULL) {

    // Ensure DB availability
    drupal_bootstrap(DRUPAL_BOOTSTRAP_DATABASE);
    $result = db_select('background_process', 'bp', array(
      'target' => 'background_process',
    ))
      ->fields('bp');
    if (isset($status)) {
      $result = $result
        ->condition('bp.exec_status', $status);
    }
    $result = $result
      ->execute();
    $processes = array();
    while ($process = $result
      ->fetchObject()) {
      $processes[] = BackgroundProcess::create($process);
    }
    return $processes;
  }

  /**
   * Create BackgroundProcess object and lock it.
   *
   * @param string $handle (optional)
   *   If specified, $handle will be used as key for the lock. Otherwise a unique handle will be generated.
   * @return BackgroundProcess object
   */
  public static function lock($handle = NULL) {
    $process = new BackgroundProcess($handle);
    $process->token = uniqid('BGP', TRUE);
    try {
      $process->exec_status = BACKGROUND_PROCESS_STATUS_LOCKED;
      $process->created = $process->start_stamp = microtime(TRUE);
      $process->pid = db_insert('background_process', array(
        'target' => 'background_process',
      ))
        ->fields(array(
        'token' => $process->token,
        'handle' => $process->handle,
        'created' => $process->created,
        'start_stamp' => $process->start_stamp,
        'exec_status' => $process->exec_status,
      ))
        ->execute();
      $process
        ->sendMessage('locked');
      $process
        ->logDebug(__FUNCTION__);
      $process
        ->ensureCleanup(FALSE, TRUE);
      return $process;
    } catch (Exception $e) {
      if ($e
        ->getCode() == '23000') {
        throw new BackgroundProcessException(t('@handle already locked', array(
          '@handle' => $handle,
        )), BACKGROUND_PROCESS_ERROR_NO_LOCK);
      }
      throw $e;
    }
  }

  // ---------- GETTERS ----------

  /**
   * Get current user id.
   */
  public function getPID() {
    return $this->pid;
  }

  /**
   * Get current user id.
   */
  public function getUID() {
    return $this->uid;
  }

  /**
   * Get current handle.
   */
  public function getHandle() {
    return $this->handle;
  }

  /**
   * Get current token.
   */
  public function getToken() {
    return $this->token;
  }

  /**
   * Get phase
   */
  public function getPhase() {
    return empty($this->options['phase']) ? NULL : $this->options['phase'];
  }

  /**
   * Get registered shutdown callbacks.
   */
  public function getShutdownCallbacks() {
    return $this->options['shutdown_callbacks'];
  }

  /**
   * Get current service group.
   *
   * @return string
   *   Name of service group
   */
  public function getServiceGroup() {
    return $this->service_group;
  }

  /**
   * Get current service host.
   */
  public function getServiceHost() {
    return $this->service_host;
  }

  /**
   * Get status
   */
  public function getStatus() {
    return $this->exec_status;
  }

  /**
   * Get option
   */
  public function getOption($key) {
    return $this->options[$key];
  }

  /**
   * Get created time (launch time)
   */
  public function getCreated() {
    return $this->created;
  }

  /**
   * Get start time (considers keepalive)
   */
  public function getStartTime() {
    return $this->start_stamp;
  }

  /**
   * Get dispatcher
   */
  public function getDispatcher() {
    return $this->options['dispatcher'];
  }

  /**
   * Get result
   */
  public function getResult() {
    return $this->result;
  }

  // ---------- SETTERS ----------

  /**
   * Set current user id.
   */
  public function setUID($uid) {
    $this->uid = $this->dirty['uid'] = $uid;
    return $this;
  }

  /**
   * Set callback and arguments
   */
  public function setCallback($callback, $arguments = array()) {
    $this
      ->ensureProcess();
    $this->callback = $callback;
    $this->arguments = $arguments;
    $this->dirty['callback'] = serialize($this->callback);
    $this->dirty['arguments'] = serialize($this->arguments);
    $this
      ->logDebug('setCallback' . " {$callback} (" . strlen($this->dirty['arguments']) . " bytes)");
    return $this;
  }

  /**
   * Include specific file for the callback execution.
   *
   * @param $file
   *   Relative path+file of the file to include.
   */
  public function setInclude($file) {
    $this->options['include'][] = $file;
    return $this
      ->setOptions($this->options);
  }

  /**
   * Set the bootstrap phase in which the process should run.
   *
   * @param int $phase
   *   Run at bootstrap phase.
   */
  public function setPhase($phase) {
    $this->options['phase'] = $phase;
    return $this
      ->setOptions($this->options);
  }

  /**
   * Reference to variable where the result should be stored.
   */
  public function setResultStorage(&$result) {
    $this->result =& $result;
    return $this;
  }

  /**
   * Register a function to be run at shutdown
   */
  public function setShutdownCallback($callback, $args = array()) {
    $this->options['shutdown_callbacks'][] = array(
      $callback,
      (array) $args,
    );
    return $this;
  }

  /**
   * Set current service host.
   *
   * @param optional $service_host
   *   Service host to use. If invalid or none specified, the default service host will be used.
   */
  public function setServiceHost($service_host = NULL) {
    $this
      ->ensureProcess();
    $service_hosts = background_process_get_service_hosts();
    if (!$service_host || empty($service_hosts[$service_host])) {

      // Invalid service hosts selected!
      $service_host = variable_get('background_process_default_service_host', 'default');
    }
    if ($service_hosts[$service_host]['max_clients'] > 0) {
      $clients = background_process_current_clients($service_host);
      if ($service_hosts[$service_host]['max_clients'] <= $clients) {
        $this
          ->keepAlive(FALSE)
          ->remove();
        throw new BackgroundProcessException(t('Max clients limit reached'), BACKGROUND_PROCESS_ERROR_LIMIT_REACHED);
      }
    }
    $this->service_host = $service_host;
    $this->dirty['service_host'] = $this->service_host;
    $this
      ->setDispatcher($service_hosts[$service_host]['dispatcher']);
    return $this;
  }

  /**
   * Set the service group. This method sets the service host based on the service group.
   *
   * @param optional $service_group
   *   Service group. If invalid or none specified, the default service group will be used.
   */
  public function setServiceGroup($service_group_name = NULL) {
    if (!$service_group_name && isset($this->service_group)) {
      $service_group_name = $this->service_group;
    }
    $service_groups = background_process_get_service_groups();
    $service_host = NULL;
    if (!$service_group_name || empty($service_groups[$service_group_name])) {
      $service_group_name = variable_get('background_process_default_service_group', 'default');
    }
    if (empty($service_groups[$service_group_name])) {

      // Default service group not found, use default service host.
      throw new BackgroundProcessException(t('Default service group not found'), BACKGROUND_PROCESS_ERROR_NO_SERVICE_GROUP);
    }
    else {
      $this->service_group = $service_groups[$service_group_name];

      // Ask the balancer for an appropriate service host
      if (!is_callable($this->service_group['method'])) {
        throw new BackgroundProcessException(t('Cannot call balancer method'), BACKGROUND_PROCESS_ERROR_INVALID_BALANCER);
      }
      $service_host = call_user_func($this->service_group['method'], $this->service_group);
    }
    $this
      ->setServiceHost($service_host);
    return $this;
  }

  /**
   * Set status
   */
  public function setStatus($status) {
    $this
      ->ensureProcess();
    $this->dirty['exec_status'] = $this->exec_status = $status;
    return $this;
  }

  /**
   * Set callback options
   */
  public function setOption($key, $value) {
    $this
      ->logDebug(__FUNCTION__);
    $this
      ->ensureProcess();
    $this->options[$key] = $value;
    $this->options += self::defaultOptions();
    $this->dirty['options'] = serialize($this->options);
    return $this;
  }

  /**
   * Set callback options
   */
  public function setOptions($options = array()) {
    $this
      ->logDebug(__FUNCTION__);
    $this
      ->ensureProcess();
    $this->options = $options + self::defaultOptions();
    $this->dirty['options'] = serialize($this->options);
    return $this;
  }

  /**
   * Set start time
   *
   * @param $start_stamp
   *   Unix timestamp (with microseconds) of start time
   */
  public function setStartTime($start_stamp) {
    $this
      ->ensureProcess();
    $this->dirty['start_stamp'] = $this->start_stamp = $start_stamp;
    return $this;
  }
  public function setDispatcher($dispatcher) {
    $this->options['dispatcher'] = $dispatcher;
    $this
      ->setOptions($this->options);
    return $this;
  }

  // ---------- API ----------

  /**
   * Execute the background process callback function.
   */
  public function execute() {
    $this
      ->logDebug(__FUNCTION__);
    if (!$this
      ->claim()) {
      watchdog('bg_process', 'Could not claim process %pid : %handle', array(
        '%pid' => $this->pid,
        '%handle' => $this->handle,
      ), WATCHDOG_ERROR);
      return FALSE;
    }
    $this
      ->ensureCleanup();
    if ($this->options['detach']) {
      $this
        ->flush();
    }
    if (!empty($this->options['include'])) {
      foreach ($this->options['include'] as $file) {
        include_once $file;
      }
    }
    if (is_callable($this->callback)) {
      $this
        ->logDebug($this->callback . ' is callable');

      // Run indefinitly...
      // @todo Make timeout configurable, perhaps through options?
      set_time_limit(0);

      // Run process as specified user
      $old_user = NULL;
      global $user;
      $old_user = $user;
      if ($this->uid && ($as_user = user_load($this->uid))) {
        $user = $as_user;
      }
      else {
        $user = drupal_anonymous_user();
      }

      // Set current process
      $old_process = self::currentProcess($this);
      try {
        $this
          ->logDebug($this->callback . ' is called now!');

        // Just to avoid endless loops on keepalive, check if arguments really is an array
        if (!is_array($this->arguments)) {
          $this
            ->keepAlive(FALSE);
          throw new BackgroundProcessException(t('Background Process arguments is not an array!'), BACKGROUND_PROCESS_ERROR_ARGUMENTS);
        }
        module_invoke_all('background_process_pre_execute', $this);
        $this->result = call_user_func_array($this->callback, $this->arguments);
        module_invoke_all('background_process_post_execute', $this);
        if ($this->options['store_result']) {
          db_insert('background_process_result', array(
            'target' => 'background_process',
          ))
            ->fields(array(
            'pid' => $this->pid,
            'result' => $this->result,
            'created' => time(),
          ))
            ->execute();
        }
        $this
          ->shutdown();
        self::currentProcess($old_process);

        // Done, restore user.
        if ($old_user) {
          $user = $old_user;
        }
        return $this;
      } catch (Exception $e) {

        // Something went wrong. Restore user and rethrow exception.
        if ($old_user) {
          $user = $old_user;
        }
        self::currentProcess($old_process);
        $this
          ->logDebug('exception thrown! ' . (string) $e);
        throw $e;
      }
    }
    else {

      // Throw exception?
      $this
        ->ensureProcess();
      $this
        ->keepAlive(FALSE)
        ->writeData();
      throw new BackgroundProcessException(t('Callback: %callback not found', array(
        '%callback' => _background_process_callback_name($process->callback),
      )), BACKGROUND_PROCESS_ERROR_INVALID_CALLBACK);
    }
  }

  /**
   * Use blocking style request, and instruct the subrequests to detach asap.
   *
   * @todo Bad naming?
   */
  public function detach($detach = TRUE) {
    $this
      ->logDebug(__FUNCTION__);
    $this->options['detach'] = TRUE;
    $this
      ->setOptions($this->options);
    return $this;
  }

  /**
   * Keep the request alive (i.e., restart after finish).
   */
  public function keepAlive($keepalive = TRUE) {
    $this
      ->logDebug(__FUNCTION__ . " : {$keepalive}");
    $this->dirty['keepalive'] = $this->keepalive = (int) $keepalive;
    if (empty($this->options['keepalive_counter'])) {
      $this->options['store_result'] = FALSE;
      $this
        ->setOptions($this->options);
    }
    return $this;
  }

  /**
   * Do/don't store the result
   */
  public function storeResult($store_result = TRUE) {
    $this->options['store_result'] = $store_result;
    $this
      ->setOptions($this->options);
    return $this;
  }

  /**
   * Shutdown a process (and ensure it doesn't start again by itself, i.e. "unlock")
   */
  public function unlock() {
    $this
      ->logDebug(__FUNCTION__);
    return $this
      ->keepAlive(FALSE)
      ->ensureCleanup()
      ->shutdown();
  }

  /**
   * Restart the request
   */
  public function restart() {
    return $this
      ->setStatus(BACKGROUND_PROCESS_STATUS_LOCKED)
      ->writeData()
      ->dispatch();
  }

  /**
   * Launch the process
   */
  public function dispatch() {
    $this
      ->logDebug(__FUNCTION__ . ' - ' . $this->options['dispatcher']);

    // Ensure sanity
    $this
      ->ensureProcess();
    $this
      ->ensureServiceHost();
    $this
      ->ensureCleanup(FALSE, FALSE);

    // Update status and send message
    $this
      ->setStatus(BACKGROUND_PROCESS_STATUS_DISPATCHED)
      ->writeData();
    $this
      ->sendMessage('dispatch');
    module_invoke_all('background_process_pre_dispatch', $this);

    // Write data again, in case pre dispatch hook changed something
    $this
      ->writeData();
    background_process_invoke_dispatcher($this->options['dispatcher'], $this);
    module_invoke_all('background_process_post_dispatch', $this);
    return $this;
  }

  /**
   * If process has been locked for more than 10 seconds without starting
   * then dispatch it again.
   */
  public function reDispatch($threshold = 10) {

    // If process is locked and hasn't started for X seconds, then relaunch
    if ($this
      ->getStatus() == BACKGROUND_PROCESS_STATUS_LOCKED && $this
      ->getCreated() + $threshold <= time()) {
      $this
        ->logDebug(__FUNCTION__);
      $this
        ->dispatch();
      return TRUE;
    }

    // This is not re-dispatch?!? ... (re)move it!
    if ($this
      ->getStatus() == BACKGROUND_PROCESS_STATUS_LOCKED && $this
      ->getStartTime() + variable_get('background_process_cleanup_age', BACKGROUND_PROCESS_CLEANUP_AGE) <= time()) {
      $this
        ->shutdown();
      return FALSE;
    }
    return FALSE;
  }

  /**
   * Dispatch multiple prepared processes
   *
   * @param array &$processes
   *   BackgroundProcess objects - dispatched processes will be removed from the array
   * @return array
   *   Dispatched BackgroundProcess objects
   */
  public static function dispatchAll(&$processes) {
    $dispatched = array();
    foreach ($processes as $idx => $process) {
      try {
        $process
          ->dispatch();
        $dispatched[] = $process;
        unset($processes[$idx]);
      } catch (BackgroundProcessException $e) {

        // Skip to the next.
        $process->last_error = $e;
      }
    }
    return $dispatched;
  }

  /**
   * Finish the request by storing result and invoking callback finish?
   */
  public static function waitForFinish($processes, $options = array()) {
    if (empty($processes)) {
      return 0;
    }
    $keyed = array();
    foreach ($processes as $process) {
      $keyed[$process->pid] = $process;
    }
    $options += array(
      'interval' => 1,
      'timeout' => 10,
    );
    $interval = $options['interval'] * 1000000;
    $expire = microtime(TRUE) + $options['timeout'];
    do {
      $results = db_select('background_process_result', 'r', array(
        'target' => 'background_process',
      ))
        ->fields('r')
        ->condition('r.pid', array_keys($keyed), 'IN')
        ->execute()
        ->fetchAll(PDO::FETCH_OBJ);
      usleep($interval);
    } while (microtime(TRUE) < $expire && count($results) < count($processes));
    foreach ($results as $result) {
      $keyed[$result->pid]->result = $result->result;
    }
    return count($results);
  }

  // ---------- LEGACY API (1.x) ----------
  // Legacy wrapper
  public function start($callback, $args) {
    $process = BackgroundProcess::lock($this->handle);
    $process
      ->ensureCleanup(FALSE, FALSE);
    $this->handle = $process->handle;
    $this->token = $process->token;
    $this->exec_status = $process->exec_status;
    $this->created = $process->created;
    $this->pid = $process->pid;
    $this
      ->ensureCleanup(FALSE, TRUE);
    return $this
      ->setCallback($callback, $args)
      ->dispatch()
      ->getHandle();
  }

  // ---------- PROGRESS ----------

  /**
   * Get progress
   */
  public function getProgress() {
    return $this->progress;
  }

  /**
   * Get progress message
   */
  public function getProgressMessage() {
    return $this->progress_message;
  }

  /**
   * Set progress interval
   */
  public function setProgressInterval($interval) {
    $this->progress_interval = $interval;
  }

  /**
   * Set progress
   */
  public function setProgress($progress, $progress_message = NULL) {
    $this
      ->ensureProcess();
    $this->progress = $fields['progress'] = $progress;
    if (isset($progress_message)) {
      $this->progress_message = $fields['progress_message'] = $progress_message;
    }

    // Don't spam the DB if someone decides to set the progress very rapidly.
    $time = microtime(TRUE);
    if ($progress < 1 && $this->progress_last_updated + $this->progress_interval > $time) {
      return $this;
    }
    $this->progress_last_updated = $time;
    db_update('background_process', array(
      'target' => 'background_process',
    ))
      ->fields($fields)
      ->condition('pid', $this->pid)
      ->execute();
    $this
      ->sendMessage('setProgress');
    return $this;
  }

  /**
   * Calculate ETA of the process
   */
  public function calculateETA() {
    if ($this->progress > 0) {
      return $this->created + 1 / $this->progress * (microtime(TRUE) - $this->created);
    }
  }

  // ---------- LOGGING ----------

  /**
   * Log a debug message.
   *
   * @param $msg
   *   Message to log.
   */
  public function logDebug($msg) {
    if ($this->debug) {
      error_log(getmypid() . ' - ' . request_uri() . " - {$this->pid} : {$this->handle} - " . $msg);
    }
  }

  /**
   * Log a message for the process.
   *
   * @param $message
   *   Message to log
   * @param $severity
   *   Watchdog severity level (WATCHDOG_ERROR, etc.)
   */
  public function log($message, $severity = -1) {
    $this->log['entries'][] = array(
      'message' => $message,
      'severity' => $severity,
    );
    $this->log['severity'] = $this->log['severity'] < 0 || $severity >= 0 && $severity < $this->log['severity'] ? $severity : $this->log['severity'];
    return $this;
  }

  /**
   * Get accumulated log for the process.
   */
  public function getLog() {
    $original_log = $this->log;

    // Get drupal messages
    $messages = drupal_get_messages(NULL, TRUE);
    $messages['status'] = empty($messages['status']) ? array() : $messages['status'];
    $messages['warning'] = empty($messages['warning']) ? array() : $messages['warning'];
    $messages['error'] = empty($messages['error']) ? array() : $messages['error'];
    foreach ($messages['status'] as $message) {
      $this
        ->log($message);
    }
    foreach ($messages['warning'] as $message) {
      $this
        ->log($message, WATCHDOG_WARNING);
    }
    foreach ($messages['error'] as $message) {
      $this
        ->log($message, WATCHDOG_ERROR);
    }

    // Get error messages
    $error = error_get_last();
    if ($error) {
      $message = $error['message'] . ' (line ' . $error['line'] . ' of ' . $error['file'] . ').' . "\n";
      $severity = WATCHDOG_INFO;
      if ($error['type'] && (E_NOTICE || E_USER_NOTICE || E_USER_WARNING)) {
        $severity = WATCHDOG_NOTICE;
      }
      if ($error['type'] && (E_WARNING || E_CORE_WARNING || E_USER_WARNING)) {
        $severity = WATCHDOG_WARNING;
      }
      if ($error['type'] && (E_ERROR || E_CORE_ERROR || E_USER_ERROR || E_RECOVERABLE_ERROR)) {
        $severity = WATCHDOG_ERROR;
      }
      $this
        ->log($message, $severity);
    }
    $log = $this->log;
    $this->log = $original_log;
    return $log;
  }
  public function sendMessage($action) {
    if (module_exists('nodejs')) {

      // WATCH OUT FOR FUTURE DESTRUCTOR IN BackgroundProcess!!!
      $object = clone $this;
      $message = (object) array(
        'channel' => 'background_process',
        'data' => (object) array(
          'action' => $action,
          'background_process' => $object,
          'timestamp' => microtime(TRUE),
        ),
        'callback' => 'nodejsBackgroundProcess',
      );
      drupal_alter('background_process_message', $message);
      nodejs_send_content_channel_message($message);
    }
  }

  // ---------- PROCESS HANDLING ----------
  private static function defaultOptions() {
    return array(
      'keepalive_counter' => 0,
      'store_result' => TRUE,
      'detach' => FALSE,
      'shutdown_callbacks' => array(),
      'dispatcher' => 'http',
    );
  }

  /**
   * Re-launch the process if necessary
   */
  private function doKeepAlive() {
    $this
      ->log(__FUNCTION__);
    if ($this->keepalive && $this->options['dispatcher'] != 'foreground') {
      $this
        ->logDebug(__FUNCTION__);
      $updated = db_update('background_process', array(
        'target' => 'background_process',
      ))
        ->fields(array(
        'options' => serialize($this->options),
        'exec_status' => $this->exec_status,
        'created' => microtime(TRUE),
      ))
        ->condition('pid', $this->pid)
        ->condition('exec_status', BACKGROUND_PROCESS_STATUS_RUNNING)
        ->execute();
      if ($updated) {
        $this->options['keepalive_counter']++;
        $this->exec_status = BACKGROUND_PROCESS_STATUS_LOCKED;
        $this
          ->sendMessage('keepalive');
        $this
          ->dispatch();
        return TRUE;
      }
    }
    return FALSE;
  }

  /**
   * Remove the process from the DB (unlock).
   */
  private function remove() {
    $this
      ->ensureProcess();
    $this
      ->logDebug(__FUNCTION__);
    if ($this->remove) {
      $this->remove = FALSE;
      $this
        ->sendMessage('remove');
      return db_delete('background_process', array(
        'target' => 'background_process',
      ))
        ->condition('pid', $this->pid)
        ->execute();
    }
    return TRUE;
  }

  /**
   * Flush output buffer.
   */
  private function flush() {
    header("Connection: close");
    ob_flush();
    flush();
    return $this;
  }

  /**
   * Claim process.
   * Before executing the callback, make sure no one else is doing it.
   */
  private function claim() {
    $this
      ->logDebug(__FUNCTION__);
    $start_stamp = microtime(TRUE);
    $count = db_update('background_process', array(
      'target' => 'background_process',
    ))
      ->fields(array(
      'start_stamp' => $start_stamp,
      'exec_status' => BACKGROUND_PROCESS_STATUS_RUNNING,
    ))
      ->condition('pid', $this->pid)
      ->condition('exec_status', array(
      BACKGROUND_PROCESS_STATUS_DISPATCHED,
    ), 'IN')
      ->execute();
    if ($count) {
      $this->start_stamp = $start_stamp;
      $this->exec_status = BACKGROUND_PROCESS_STATUS_RUNNING;
      $this
        ->sendMessage('claimed');
    }
    return $count;
  }

  /**
   * Shutdown
   */
  public function shutdown() {
    $this
      ->logDebug(__FUNCTION__ . " : {$this->shutdown}");
    try {
      $this
        ->writeData();
    } catch (Exception $e) {

      // During shutdown, let's ignore DB errors, so shutdown handler may
      // pickup errors, etc. But we won't restart the process.
      $this
        ->keepAlive(FALSE);
      $this
        ->log((string) $e, WATCHDOG_ERROR);
    }

    // Inform shutdown handlers when done.
    if ($this->shutdown) {
      $this
        ->logDebug('shutting down!');

      // @todo Bump bootstrap level?
      if (function_exists('module_invoke_all')) {
        $this
          ->logDebug('invoking shutdown handlers');
        $this->start = $this->start_stamp;
        module_invoke_all('background_process_shutdown', $this);
      }
      $this
        ->logDebug('calling shutdown callbacks');
      foreach ($this
        ->getShutdownCallbacks() as $info) {
        list($callback, $args) = $info;
        $args[] = $this;
        call_user_func_array($callback, $args);
      }
      $this->shutdown = FALSE;
      return $this
        ->doKeepAlive() ? TRUE : $this
        ->remove();
    }
    return $this
      ->remove();
  }

  /**
   * Write applicable data (if any) to the database
   */
  public function writeData() {
    if (empty($this->dirty)) {
      return;
    }
    $this
      ->logDebug(__FUNCTION__);
    try {
      db_update('background_process', array(
        'target' => 'background_process',
      ))
        ->fields($this->dirty)
        ->condition('pid', $this->pid)
        ->execute();
      $this
        ->sendMessage('writeData');
    } catch (Exception $e) {
      throw $e;
    }
    $this
      ->logDebug(__FUNCTION__ . ' - done');
    $this->dirty = array();
  }

  // ---------- SANITIZERS ----------

  /**
   * Make sure prerequisites are met.
   */
  private function ensureProcess() {
    if (!$this->pid) {
      throw new BackgroundProcessException(t('Background process not initialized'), BACKGROUND_PROCESS_ERROR_NO_PID);
    }

    // Ensure DB availability
    drupal_bootstrap(DRUPAL_BOOTSTRAP_DATABASE);
    return $this;
  }

  /**
   * Make sure we clean up after ourselves.
   */
  private function ensureCleanup($shutdown = TRUE, $remove = TRUE) {
    $this->shutdown = $shutdown;
    $this->remove = $remove;
    if (!$this->registered) {
      $this
        ->logDebug(__FUNCTION__);
      $this->registered = TRUE;

      // Make sure the process is removed when we're done
      drupal_register_shutdown_function(array(
        $this,
        'shutdown',
      ));
    }
    return $this;
  }

  /**
   * Make sure that the process has been designated a service host.
   */
  public function ensureServiceHost() {
    if (!$this->service_host) {
      $this
        ->setServiceGroup();
      $this
        ->writeData();
    }
    return $this;
  }

}
class BackgroundProcessException extends Exception {

}