You are here

background_process.module in Background Process 8

This module implements a framework for calling funtions in the background.

$handle = background_process_start('mymodule_dosomething', $myvar1, $myvar2);
$handle = background_process_start(array(
  'myclass',
  'mystaticfunction',
), $myvar1, $myvar2);
$handle = background_process_start(array(
  $myobject,
  'mymethod',
), $myvar1, $myvar2);
$handle = background_process_start_locked('dontrunconcurrently', 'mymodule_dosomething', $myvar1, $myvar2);

For implementation of load balancing functionality:

See also

hook_service_group()

File

background_process.module
View source
<?php

/**
 * @file
 * This module implements a framework for calling funtions in the background.
 *
 * @code
 * $handle = background_process_start('mymodule_dosomething', $myvar1, $myvar2);
 * $handle = background_process_start(array('myclass', 'mystaticfunction'), $myvar1, $myvar2);
 * $handle = background_process_start(array($myobject, 'mymethod'), $myvar1, $myvar2);
 * $handle = background_process_start_locked('dontrunconcurrently', 'mymodule_dosomething', $myvar1, $myvar2);
 * @endcode
 *
 * For implementation of load balancing functionality:
 * @see hook_service_group()
 */
use Drupal\Component\Utility\Timer;
use Drupal\Core\Database\Database;
include 'background_process.class.php';

/**
 * Define Default Values.
 */
const BACKGROUND_PROCESS_SERVICE_TIMEOUT = 0;
const BACKGROUND_PROCESS_CONNECTION_TIMEOUT = 2;
const BACKGROUND_PROCESS_STREAM_TIMEOUT = 2;
const BACKGROUND_PROCESS_CLEANUP_AGE = 120;
const BACKGROUND_PROCESS_CLEANUP_AGE_RUNNING = 8 * 3600;
const BACKGROUND_PROCESS_CLEANUP_AGE_QUEUE = 86400;
const BACKGROUND_PROCESS_REDISPATCH_THRESHOLD = 10;

/**
 * Define Default Values For Process.
 */
const BACKGROUND_PROCESS_STATUS_NONE = 0;
const BACKGROUND_PROCESS_STATUS_LOCKED = 1;
const BACKGROUND_PROCESS_STATUS_RUNNING = 2;
const BACKGROUND_PROCESS_STATUS_QUEUED = 3;

/**
 * Implements hook_permission().
 */
function background_process_permission() {
  return [
    'administer background process' => [
      'title' => t('Administer background processes'),
      'description' => t('Perform administration tasks for background processes.'),
    ],
  ];
}

/**
 * Implements hook_cron().
 */
function background_process_cron() {

  // Don't use more than 120 seconds to unlock.
  $expire = 120;
  @set_time_limit($expire);

  // Cleanup old handles.
  $time = time();
  $msg = t('Never started (auto unlock due to timeout)');
  do {
    if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
      break;
    }
    $result = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, [
      ':start' => $time - \Drupal::config('background_process.settings')
        ->get('background_process_cleanup_age'),
      ':status' => BACKGROUND_PROCESS_STATUS_LOCKED,
    ]);
    $handles = $result
      ->fetchAllAssoc('handle', PDO::FETCH_ASSOC);
    foreach ($handles as $handle => $process) {

      // Unlock the process.
      if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
        drupal_set_message(t("%handle unlocked: @msg", [
          '%handle' => $handle,
          '@msg' => $msg,
        ]));
      }
      else {
        drupal_set_message(t("%handle could not be unlocked: @msg", [
          '%handle' => $handle,
          '@msg' => $msg,
        ]), 'error');
      }
    }
  } while (!empty($handles));

  // Cleanup stale requests.
  $time = time();
  $msg = t('Never finished (auto unlock due to long run)');
  do {
    if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
      break;
    }
    $results = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, [
      ':start' => $time - \Drupal::config('background_process.settings')
        ->get('background_process_cleanup_age_running'),
      ':status' => BACKGROUND_PROCESS_STATUS_RUNNING,
    ]);
    $handles = $result
      ->fetchAllAssoc('handle', PDO::FETCH_ASSOC);
    foreach ($handles as $handle => $process) {

      // Unlock the process.
      if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
        drupal_set_message(t("%handle unlocked: @msg", [
          '%handle' => $handle,
          '@msg' => $msg,
        ]));
      }
      else {
        drupal_set_message(t("%handle could not be unlocked: @msg", [
          '%handle' => $handle,
          '@msg' => $msg,
        ]), 'error');
      }
    }
  } while (!empty($results));

  // Cleanup queued requests that were never processed.
  $time = time();
  $msg = t('Never picked up by cron worker (auto unlock due to timeout)');
  do {
    if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {
      break;
    }
    $results = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, [
      ':start' => $time - \Drupal::config('background_process.settings')
        ->get('background_process_cleanup_age_queue'),
      ':status' => BACKGROUND_PROCESS_STATUS_QUEUED,
    ]);
    $handles = $result
      ->fetchAllAssoc('handle', PDO::FETCH_ASSOC);
    foreach ($handles as $handle => $process) {

      // Unlock the process.
      if (background_process_unlock($handle, $msg, $process['start_stamp'])) {
        drupal_set_message(t("%handle unlocked: @msg", [
          '%handle' => $handle,
          '@msg' => $msg,
        ]));
      }
      else {
        drupal_set_message(t("%handle could not be unlocked: @msg", [
          '%handle' => $handle,
          '@msg' => $msg,
        ]), 'error');
      }
    }
  } while (!empty($results));
}

/**
 * Implements hook_cron_alter().
 */
function background_process_cron_alter(&$items) {
  $items['background_process_cron']['override_congestion_protection'] = TRUE;
  $handle_prefix = \Drupal::config('ultimate_cron_handle_prefix')
    ->get('uc:');
  if ($process = background_process_get_process($handle_prefix . 'background_process_cron')) {
    if ($process->start + 30 < time()) {
      background_process_unlock($process->handle, t('Self unlocking stale lock'), $process->start);
    }
  }
}

/**
 * Implements hook_cronapi().
 */
function background_process_cronapi($op, $job = NULL) {
  switch ($op) {
    case 'list':
      return [
        'background_process_cron' => t('Cleanup old process handles'),
      ];
    case 'rule':
      return '* * * * *';
    case 'configure':
      return 'admin/config/system/background-process';
  }
}

/**
 * Implements hook_service_group().
 */
function background_process_service_group() {
  $info = [];
  $info['methods']['background_process_service_group_random'] = t('Random');
  $info['methods']['background_process_service_group_round_robin'] = t('Pseudo round-robin');
  return $info;
}

/**
 * Implements to Load balancing based on random pick.
 */
function background_process_service_group_random($service_group) {
  return $service_group['hosts'][rand(0, count($service_group['hosts']) - 1)];
}

/**
 * Implements for Round-robin load balancing based on random pick.
 */
function background_process_service_group_round_robin($service_group) {
  static $idx = NULL;
  if (isset($idx)) {
    $idx = ($idx + 1) % count($service_group['hosts']);
  }
  else {
    $idx = rand(0, count($service_group['hosts']) - 1);
  }
  return $service_group['hosts'][$idx];
}

/**
 * Implements to Access handler for service call.
 */
function background_process_service_access($handle, $token) {

  // Setup service.
  ignore_user_abort(TRUE);

  // Damn those slashes!
  $handle = rawurldecode($handle);
  $token = rawurldecode($token);

  // Ensure no session!
  drupal_save_session(FALSE);
  unset($_SESSION);
  $process = background_process_get_process($handle);
  if (!$process) {
    \Drupal::logger('bg_process')
      ->notice('Unknown process: %handle', [
      '%handle' => $handle,
    ]);
    return FALSE;
  }
  if ($token !== $process->token) {
    \Drupal::logger('bg_process')
      ->notice('Invalid token: %token for handle: %handle', [
      '%token' => $token,
      '%handle' => $handle,
    ]);
    return FALSE;
  }

  // Login as the user that requested the call.
  $user = \Drupal::currentUser();
  if ($process->uid) {
    $load_user = \Drupal::entityManager()
      ->getStorage('user')
      ->load($process->uid);
    if (!$load_user) {

      // Invalid user!
      \Drupal::logger('bg_process')
        ->notice('Invalid user: %uid for handle: %handle', [
        '%uid' => $process->uid,
        '%handle' => $handle,
      ]);
      return FALSE;
    }
    $user = $load_user;
  }
  else {
    $user = drupal_anonymous_user();
  }
  return TRUE;
}

/**
 * Implements hook_init().
 */
function background_process_init() {

  // Only determine if we're told to do so.
  if (empty($_SESSION['background_process_determine_default_service_host'])) {
    return;
  }

  // Don't determine on check-token page, to avoid infinite loop.
  if (strpos($_GET['q'], 'background-process/check-token') === 0) {
    return;
  }
  if (\Drupal::config('install_task')
    ->get() != 'done') {
    return;
  }

  // Determine the default service host.
  background_process_determine_and_save_default_service_host();
  unset($_SESSION['background_process_determine_default_service_host']);
}

/**
 * Implements hook_cron_queue_info().
 */
function background_process_cron_queue_info() {
  $queues['background_process'] = [
    'worker callback' => '_background_process_queue',
  ];
  $background_process_queues = \Drupal::config('background_process.settings')
    ->get('background_process_queues');
  foreach ($background_process_queues as $queue_name) {
    $queues['bgp:' . $queue_name] = [
      'worker callback' => '_background_process_queue',
    ];
  }
  return $queues;
}

/**
 * Worker callback for processing queued function call.
 */
function _background_process_queue($item) {
  $oldhandle = background_process_current_handle();
  list($handle, $token) = $item;
  if (background_process_service_access($handle, $token)) {
    try {
      background_process_service_execute(rawurldecode($handle), TRUE);
      background_process_current_handle($oldhandle);
    } catch (Exception $e) {
      background_process_current_handle($oldhandle);
      background_process_update_status(rawurldecode($handle), BACKGROUND_PROCESS_STATUS_QUEUED);
      throw $e;
    }
  }
}

/**
 * Implements to Get/set current handle.
 */
function background_process_current_handle($handle = NULL) {
  static $current_handle = NULL;
  if (isset($handle)) {
    $current_handle = $handle;
  }
  return $current_handle;
}

/**
 * Implements to Get a unique handle based on a callback.
 */
function background_process_generate_handle($callback) {
  return md5(serialize($callback) . ':' . microtime(TRUE) . ':' . rand(1, 5000));
}

/**
 * Implements to Start background process.
 */
function background_process_start($callback) {
  $process = new BackgroundProcess();
  $args = func_get_args();
  array_splice($args, 0, 1);
  $result = $process
    ->start($callback, $args);
  return $result ? $process->handle : $result;
}

/**
 * Implements to Start locked background process.
 */
function background_process_start_locked($handle, $callback) {
  $process = new BackgroundProcess($handle);
  $args = func_get_args();
  array_splice($args, 0, 2);
  $result = $process
    ->start($callback, $args);
  return $result ? $process->handle : $result;
}

/**
 * Implements Queue the function call passing function arguments.
 */
function background_process_queue($callback) {
  $process = new BackgroundProcess();
  $args = func_get_args();
  array_splice($args, 0, 1);
  return $process
    ->queue($callback, $args);
}

/**
 * Implements to Queue locked background process.
 */
function background_process_queue_locked($handle, $callback) {
  $process = new BackgroundProcess($handle);
  $args = func_get_args();
  array_splice($args, 0, 2);
  return $process
    ->queue($callback, $args);
}

/**
 * Implements to Cleanup cache menu and ensure all locks are released (again).
 */
function _background_process_cleanup_menu($cid) {
  drupal_flush_all_caches('cache_menu', $cid);

  // Release locks in case cache_clear_all() set's a lock and lock_release_all()
  // has already been run.
  \Drupal::lock()
    ->releaseAll();
}

/**
 * Implements to Call the function requested by the service call.
 */
function background_process_service_start($handle, $return = FALSE) {
  header('Content-Type', 'text/plain');

  // Let's clean up the mess the menu-router system leaves behind.
  $cid = 'menu_item:' . hash('sha256', $_GET['q']);
  drupal_register_shutdown_function('_background_process_cleanup_menu', $cid);

  // Setup service.
  ignore_user_abort(TRUE);
  @set_time_limit(\Drupal::config('background_process.settings')
    ->get('background_process_service_timeout'));
  $handle = rawurldecode($handle);
  return background_process_service_execute($handle, $return);
}

/**
 * Implements to Execute the service.
 */
function background_process_service_execute($handle, $return = FALSE) {

  // Access handler.
  $process = background_process_get_process($handle);
  if (!$process) {
    \Drupal::logger('bg_process')
      ->error('Process not found for handle: %handle', [
      '%handle' => $handle,
    ]);
    if ($return) {
      return;
    }
    else {
      exit;
    }
  }
  $process->start_stamp = microtime(TRUE);
  try {
    $old_db = Database::setActiveConnection('background_process');
    $claimed = db_update('background_process')
      ->fields([
      'start_stamp' => sprintf("%.06f", $process->start_stamp),
      'exec_status' => BACKGROUND_PROCESS_STATUS_RUNNING,
    ])
      ->condition('handle', $handle)
      ->condition('exec_status', [
      BACKGROUND_PROCESS_STATUS_LOCKED,
      BACKGROUND_PROCESS_STATUS_QUEUED,
    ], 'IN')
      ->execute();
    Database::setActiveConnection($old_db);
    if ($claimed) {
      $process->exec_status = BACKGROUND_PROCESS_STATUS_RUNNING;
      $process = BackgroundProcess::load($process);
      $process
        ->sendMessage('claimed');
      background_process_current_handle($handle);
    }
    else {
      if ($return) {
        return;
      }
      else {
        exit;
      }
    }
  } catch (Exception $e) {
    Database::setActiveConnection($old_db);
    throw $e;
  }

  // Make sure the process is removed when we're done.
  if (!$return) {
    drupal_register_shutdown_function('background_process_remove_process', $process->handle, $process->start_stamp);
  }
  if (is_callable($process->callback)) {
    $old_db = NULL;
    try {
      if (!$return) {
        drupal_register_shutdown_function('module_invoke_all', 'background_process_shutdown', $process);
      }
      $callback = _background_process_callback_name($process->callback);
      $old_db = Database::setActiveConnection('background_process');
      progress_initialize_progress($handle, "Background process '{$callback}' initialized");
      Database::setActiveConnection($old_db);
      call_user_func_array($process->callback, $process->args);
      $old_db = Database::setActiveConnection('background_process');
      progress_end_progress($handle, "Background process '{$callback}' finished");
      Database::setActiveConnection($old_db);
      if ($return) {
        background_process_remove_process($process->handle, $process->start_stamp);
        \Drupal::moduleHandler()
          ->invokeAll('background_process_shutdown', [
          $process,
        ]);
      }
    } catch (Exception $e) {

      // Exception occurred, switch back to proper db if necessary.
      if ($old_db) {
        Database::setActiveConnection($old_db);
      }
      if (!$return) {
        \Drupal::moduleHandler()
          ->invokeAll('background_process_shutdown', [
          $process,
          (string) $e,
        ]);
      }
      throw $e;
    }
  }
  else {

    // Function not found.
    \Drupal::logger('bg_process')
      ->error('Callback: %callback not found', [
      '%callback' => $process->callback,
    ]);
  }
  if ($return) {
    return;
  }
  else {
    exit;
  }
}

/**
 * Implements to Restart the current background process.
 */
function background_process_restart() {
  $args = func_get_args();
  call_user_func_array('background_process_keepalive', $args);
  exit;
}

/**
 * Implements to Keep process alive.
 */
function background_process_keepalive() {
  $args = func_get_args();
  $handle = background_process_current_handle();
  if (!$handle) {
    throw new Exception(t('Background process handle %handle not found', [
      '%handle' => $handle,
    ]));
  }
  $process = background_process_get_process($handle);
  if (!$process) {
    throw new Exception(t('Background process %handle not found', [
      '%handle' => $handle,
    ]));
  }
  drupal_register_shutdown_function('_background_process_restart', $process, $args);
}

/**
 * Implements to Check if the background process has started.
 */
function background_process_is_started($handle) {
  $old_db = Database::setActiveConnection('background_process');
  $progress = progress_get_progress($handle);
  Database::setActiveConnection($old_db);
  return !empty($progress);
}

/**
 * Implements to Check if the background process has finished.
 */
function background_process_is_finished($handle) {
  $old_db = Database::setActiveConnection('background_process');
  $progress = progress_get_progress($handle);
  Database::setActiveConnection($old_db);
  return empty($progress) || $progress->end;
}

/**
 * Implements to Set background process.
 */
function background_process_set_process($handle, $callback, $uid, $args, $token) {

  // Setup parameters.
  $args = serialize($args);
  $callback = serialize($callback);

  // Get user.
  if (!isset($uid)) {
    $user = \Drupal::currentUser();
    $uid = $user->uid;
    if ($uid == '') {
      $uid = '0';
    }
  }
  try {
    $old_db = Database::setActiveConnection('background_process');
    $result = db_update('background_process')
      ->fields([
      'callback' => $callback,
      'args' => $args,
      'uid' => $uid,
      'token' => $token,
    ])
      ->condition('handle', $handle)
      ->execute();
    Database::setActiveConnection($old_db);
    return $result;
  } catch (Exception $e) {
    Database::setActiveConnection($old_db);
    throw $e;
  }
}

/**
 * Implements to Lock process.
 */
function background_process_lock_process($handle, $status = BACKGROUND_PROCESS_STATUS_LOCKED) {
  try {
    $old_db = Database::setActiveConnection('background_process');
    db_insert('background_process')
      ->fields([
      'handle' => $handle,
      'start_stamp' => sprintf("%.06f", microtime(TRUE)),
      'exec_status' => $status,
    ])
      ->execute();
    Database::setActiveConnection($old_db);
    _background_process_ensure_cleanup($handle);
    return TRUE;
  } catch (Exception $e) {
    Database::setActiveConnection($old_db);
    return FALSE;
  }
}

/**
 * Implements to Set status for background process.
 */
function background_process_update_status($handle, $status) {
  db_update('background_process')
    ->fields([
    'exec_status' => $status,
  ])
    ->condition('handle', $handle)
    ->execute();
}

/**
 * Get background process.
 */
function background_process_get_process($handle) {
  try {
    $old_db = Database::setActiveConnection('background_process');
    $result = db_select('background_process', 'bp')
      ->fields('bp', [
      'handle',
      'callback',
      'args',
      'uid',
      'token',
      'service_host',
      'start_stamp',
      'exec_status',
    ])
      ->condition('handle', $handle)
      ->execute()
      ->fetchObject();
    Database::setActiveConnection($old_db);
  } catch (Exception $e) {
    Database::setActiveConnection($old_db);
    throw $e;
  }
  if ($result) {
    $result->args = unserialize($result->args);
    $result->callback = unserialize($result->callback);
    $result->start = $result->start_stamp;
    $result->status = $result->exec_status;
    return $result;
  }
  return FALSE;
}

/**
 * Get background process.
 */
function background_process_get_processes($status = NULL) {
  $old_db = Database::setActiveConnection('background_process');
  $result = db_select('background_process', 'bp')
    ->fields('bp', [
    'handle',
    'callback',
    'args',
    'uid',
    'token',
    'service_host',
    'start_stamp',
    'exec_status',
  ]);
  if (isset($status)) {
    $result = $result
      ->condition('bp.status', $status);
  }
  $result = $result
    ->execute();
  $processes = [];
  while ($process = $result
    ->fetchObject()) {
    $process->args = unserialize($process->args);
    $process->callback = unserialize($process->callback);
    $process->start = $process->start_stamp;
    $process->status = $process->exec_status;
    $processes[] = $process;
  }
  Database::setActiveConnection($old_db);
  return $processes;
}

/**
 * Implements to Remove a background process.
 */
function background_process_remove_process($handle, $start = NULL) {
  $old_db = Database::setActiveConnection('background_process');
  if (isset($start)) {
    $result = db_delete('background_process')
      ->condition('handle', $handle)
      ->condition('start_stamp', sprintf("%.06f", $start), '=')
      ->execute();
  }
  else {
    $result = db_delete('background_process')
      ->condition('handle', $handle)
      ->execute();
  }
  Database::setActiveConnection($old_db);
  return $result;
}

/**
 * Unlock background process.
 */
function background_process_unlock($handle, $msg = NULL, $start = NULL) {
  $process = background_process_get_process($handle);
  if ($process && (!isset($start) || $start === $process->start)) {

    // Unlock the process.
    if (background_process_remove_process($process->handle, $process->start)) {
      $user = \Drupal::currentUser();
      $username = $user->uid ? $user->name : t('anonymous');
      \Drupal::moduleHandler()
        ->invokeAll('background_process_shutdown', [
        $process,
        $msg ? $msg : t('Manually unlocked by !name', [
          '!name' => $username,
        ]),
      ]);
      return TRUE;
    }
  }
  return FALSE;
}

/**
 * Implements to Set a service host for a background process.
 */
function background_process_set_service_host($handle, $service_host) {
  try {
    $old_db = Database::setActiveConnection('background_process');
    $result = db_update('background_process')
      ->fields([
      'service_host' => $service_host ? $service_host : '',
    ])
      ->condition('handle', $handle)
      ->execute();
    Database::setActiveConnection($old_db);
    return $result;
  } catch (Exception $e) {
    Database::setActiveConnection($old_db);
    throw $e;
  }
}

/**
 * Implements to Get service hosts defined in the system.
 */
function background_process_get_service_groups() {
  $service_groups = \Drupal::config('background_process.settings')
    ->get('background_process_service_groups');
  $service_groups = [
    'default' => [
      'hosts' => [
        \Drupal::config('background_process.settings')
          ->get('background_process_default_service_host'),
      ],
    ],
  ];
  foreach ($service_groups as &$service_group) {
    $service_group += [
      'method' => 'background_process_service_group_round_robin',
    ];
  }
  return $service_groups;
}

/**
 * Implements to Determine host for current installation.
 */
function background_process_determine_default_service_host() {
  $token = md5(session_id() . md5(uniqid(mt_rand(), TRUE)) . md5(uniqid(mt_rand(), TRUE)));
  \Drupal::configFactory()
    ->getEditable('background_process.settings')
    ->set('background_process_token', $token)
    ->save();
  global $conf;
  $auth = isset($_SERVER['PHP_AUTH_USER']) ? $_SERVER['PHP_AUTH_USER'] . ':' . $_SERVER['PHP_AUTH_PW'] . '@' : '';
  $scheme = isset($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://';
  global $base_url;
  $url = parse_url($base_url);
  $path = empty($url['path']) ? '' : $url['path'];
  $candidates = [
    [
      'base_url' => $base_url,
    ],
    [
      'base_url' => $scheme . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path,
      'http_host' => $_SERVER['HTTP_HOST'],
    ],
    [
      'base_url' => $scheme . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path,
      'http_host' => $_SERVER['HTTP_HOST'],
    ],
    [
      'base_url' => $scheme . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path,
      'http_host' => $_SERVER['HTTP_HOST'],
    ],
    [
      'base_url' => $scheme . $auth . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path,
      'http_host' => $_SERVER['HTTP_HOST'],
    ],
    [
      'base_url' => $scheme . $auth . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path,
      'http_host' => $_SERVER['HTTP_HOST'],
    ],
    [
      'base_url' => $scheme . $auth . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path,
      'http_host' => $_SERVER['HTTP_HOST'],
    ],
  ];
  $found = NULL;
  foreach ($candidates as $i => $candidate) {
    $conf['background_process_service_hosts']['__test'] = $candidate;
    list($url, $headers) = background_process_build_request('background-process/check-token', '__test');
    if (empty($results[$url])) {
      $results[$url] = background_process_http_request($url, [
        'headers' => $headers,
        'postpone' => TRUE,
        'candidate' => $i,
        'method' => 'POST',
      ]);
    }
  }
  background_process_http_request_process($results);
  foreach ($results as $result) {
    if ($result->code == 200) {
      if ($token === substr($result->data, 0, strlen($token))) {
        $found = $candidates[$result->options['candidate']];
        break;
      }
    }
  }
  if ($found) {
    return $found;
  }
  return FALSE;
}

/**
 * Implements to Build url and headers for http request.
 */
function background_process_build_request($url, $service_hostname = NULL, $options = []) {
  $service_hosts = background_process_get_service_hosts();
  if (!$service_hostname || empty($service_hosts[$service_hostname])) {
    $service_hostname = 'default';
  }
  $options += [
    'absolute' => TRUE,
    'base_url' => $service_hosts,
  ];
  $url = $service_hosts . '/' . $url;
  $parsed = parse_url($url);
  $host = !empty($service_hosts['http_host']) ? $service_hosts['http_host'] : (isset($parsed['host']) ? isset($parsed['port']) ? $parsed['host'] . ':' . $parsed['port'] : $parsed['host'] : NULL);
  $headers = _background_process_request_headers();
  $headers = _background_process_filter_headers($headers);
  $headers['User-Agent'] = \Drupal::config('background_process.settings')
    ->get('background_process_user_agent');
  $headers['Host'] = $host;
  $headers['Connection'] = 'close';
  if (isset($parsed['user'])) {
    $headers['Authorization'] = 'Basic ' . base64_encode($parsed['user'] . ':' . $parsed['pass']);
  }
  return [
    $url,
    $headers,
  ];
}

/**
 * Implements to Transform header array from key/value to strings.
 */
function background_process_build_headers($headers) {
  $header = [];
  foreach ($headers as $key => $value) {
    $header[] = "{$key}: {$value}";
  }
  return $header;
}

/**
 * Implements to Perform an http request.
 */
function background_process_http_request($url, array $options = []) {

  // Parse the URL and make sure we can handle the schema.
  $result = new stdClass();
  $result->url = $url;
  $result->options = $options;
  $result->code = NULL;
  $uri = @parse_url($url);
  $result->uri = $uri;
  if ($uri == FALSE) {
    $result->error = 'unable to parse URL';
    $result->code = -1001;
    return _background_process_http_request_result($result);
  }
  if (!isset($uri['scheme'])) {
    $result->error = 'missing schema';
    $result->code = -1002;
    return _background_process_http_request_result($result);
  }

  // Set default context to enable/disable SSL verification.
  $default_context = stream_context_create([
    'ssl' => [
      'verify_peer' => \Drupal::config('background_process.settings')
        ->get('background_process_ssl_verification'),
      'verify_peer_name' => \Drupal::config('background_process.settings')
        ->get('background_process_ssl_verification'),
    ],
  ]);

  // Merge the default options.
  $options += [
    'headers' => [],
    'method' => 'GET',
    'data' => NULL,
    'max_redirects' => 3,
    'timeout' => \Drupal::config('background_process.settings')
      ->get('background_process_connection_timeout'),
    'context' => $default_context,
    'blocking' => FALSE,
    'postpone' => FALSE,
  ];

  // Stream_socket_client() requires timeout to be a float.
  $options['timeout'] = (double) $options['timeout'];
  $host = NULL;
  switch ($uri['scheme']) {
    case 'http':
    case 'feed':
      $port = isset($uri['port']) ? $uri['port'] : 80;
      $socket = 'tcp://' . $uri['host'] . ':' . $port;

      // Checking the host that do not take into account the port number.
      $host = $uri['host'] . ($port != 80 ? ':' . $port : '');
      break;
    case 'https':

      // Note: Only works when PHP is compiled with OpenSSL support.
      $port = isset($uri['port']) ? $uri['port'] : 443;
      $socket = 'ssl://' . $uri['host'] . ':' . $port;
      $host = $uri['host'] . ($port != 443 ? ':' . $port : '');
      break;
    default:
      $result->error = 'invalid schema ' . $uri['scheme'];
      $result->code = -1003;
      return _background_process_http_request_result($result);
  }
  if (!empty($host) && empty($options['headers']['Host'])) {
    $options['headers']['Host'] = $host;
  }
  $result->options = $options;
  $result->socket = $socket;
  $result->postponed = $options['postpone'];
  if ($result->postponed) {
    return $result;
  }
  else {
    return background_process_http_request_initiate($result);
  }
}

/**
 * Initiate the http request.
 */
function background_process_http_request_initiate(&$result) {
  Timer[$result]['start'];
  $options = $result->options;
  $socket = $result->socket;
  $uri = $result->uri;
  $result->start = microtime(TRUE);
  $result->data_ready = TRUE;
  $result->response = '';
  if (empty($options['context'])) {
    $fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout']);
  }
  else {

    // Create a stream with context. Allows verification of a SSL certificate.
    $fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout'], STREAM_CLIENT_CONNECT, $options['context']);
  }

  // Make sure the socket opened properly.
  if (!$fp) {

    // When a network error occurs, we use a negative number so it does not
    // clash with the HTTP status codes.
    $result->code = -$errno;
    $result->error = trim($errstr) ? trim($errstr) : t('Error opening socket @socket', [
      '@socket' => $socket,
    ]);
    \Drupal::state()
      ->set('drupal_http_request_fails', TRUE);
    return _background_process_http_request_result($result);
  }
  $result->fp = $fp;

  // Construct the path to act on.
  $path = isset($uri['path']) ? $uri['path'] : '/';
  if (isset($uri['query'])) {
    $path .= '?' . $uri['query'];
  }

  // Merge the default headers.
  $options['headers'] += [
    'User-Agent' => 'Drupal (+http://drupal.org/)',
  ];

  // Only add Content-Length if we actually have any content or if it is a POST
  // or PUT request. Some non-standard servers get confused by Content-Length in
  // at least HEAD/GET requests, and Squid always requires Content-Length in
  // POST/PUT requests.
  $content_length = strlen($options['data']);
  if ($content_length > 0 || $options['method'] == 'POST' || $options['method'] == 'PUT') {
    $options['headers']['Content-Length'] = $content_length;
  }

  // If the server URL has a user then attempt to use basic authentication.
  if (isset($uri['user'])) {
    $options['headers']['Authorization'] = 'Basic ' . base64_encode($uri['user'] . (isset($uri['pass']) ? ':' . $uri['pass'] : ''));
  }

  // If the database prefix is being used
  // by SimpleTest to run the tests in a copied.
  // database then set the user-agent header to the database prefix so that any
  // calls to other Drupal pages will run the SimpleTest prefixed database. The
  // user-agent is used to ensure that multiple testing sessions running at the
  // same time won't interfere with each other as they would if the database
  // prefix were stored statically in a file or database variable.
  $test_info =& $GLOBALS['drupal_test_info'];
  if (!empty($test_info['test_run_id'])) {
    $options['headers']['User-Agent'] = drupal_generate_test_ua($test_info['test_run_id']);
  }
  $request = $options['method'] . ' ' . $path . " HTTP/1.0\r\n";
  foreach ($options['headers'] as $name => $value) {
    $request .= $name . ': ' . trim($value) . "\r\n";
  }
  $request .= "\r\n" . $options['data'];
  $result->request = $request;

  // Calculate how much time is left of the original timeout value.
  $timeout = $options['timeout'] - Timer[$result]['read'] / 1000;
  if ($timeout > 0) {
    stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1)));
    fwrite($fp, $request);
    stream_set_blocking($fp, 0);
  }
  if (!empty($options['blocking'])) {
    return background_process_http_request_get_response($result);
  }
  return $result;
}

/**
 * Get response for an http request.
 */
function background_process_http_request_get_response(&$result) {
  if ($result->postponed) {
    $result->postponed = FALSE;
    return background_process_http_request_initiate($result);
  }
  if (isset($result->code)) {
    return $result;
  }
  $fp = $result->fp;
  $options = $result->options;
  Timer[$result]['start'];
  if (!empty($options['blocking'])) {
    stream_set_blocking($fp, 1);
  }
  $info = stream_get_meta_data($fp);
  $alive = !$info['eof'] && !$info['timed_out'];
  while ($alive) {

    // Calculate how much time is left of the original timeout value.
    $timeout = $options['timeout'] - Timer[$result]['read'] / 1000;
    if ($timeout <= 0) {
      $info['timed_out'] = TRUE;
      break;
    }
    stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1)));
    $chunk = fread($fp, 1024);
    $result->response .= $chunk;
    $result->data_ready = empty($chunk) ? FALSE : TRUE;
    $info = stream_get_meta_data($fp);
    $alive = !$info['eof'] && !$info['timed_out'];
    if (empty($options['blocking'])) {
      break;
    }
  }
  if ($alive) {
    return $result;
  }
  fclose($fp);
  if ($info['timed_out']) {
    $result->code = HTTP_REQUEST_TIMEOUT;
    $result->error = 'request timed out';
    return _background_process_http_request_result($result);
  }
  list($response, $result->data) = preg_split("/\r\n\r\n|\n\n|\r\r/", $result->response, 2);
  $response = preg_split("/\r\n|\n|\r/", $response);

  // Parse the response status line.
  list($protocol, $code, $status_message) = explode(' ', trim(array_shift($response)), 3);
  $result->protocol = $protocol;
  $result->status_message = $status_message;
  $result->headers = [];

  // Parse the response headers.
  while ($line = trim(array_shift($response))) {
    list($name, $value) = explode(':', $line, 2);
    $name = strtolower($name);
    if (isset($result->headers[$name]) && $name == 'set-cookie') {
      $result->headers[$name] .= ',' . trim($value);
    }
    else {
      $result->headers[$name] = trim($value);
    }
  }
  $responses = [
    100 => 'Continue',
    101 => 'Switching Protocols',
    200 => 'OK',
    201 => 'Created',
    202 => 'Accepted',
    203 => 'Non-Authoritative Information',
    204 => 'No Content',
    205 => 'Reset Content',
    206 => 'Partial Content',
    300 => 'Multiple Choices',
    301 => 'Moved Permanently',
    302 => 'Found',
    303 => 'See Other',
    304 => 'Not Modified',
    305 => 'Use Proxy',
    307 => 'Temporary Redirect',
    400 => 'Bad Request',
    401 => 'Unauthorized',
    402 => 'Payment Required',
    403 => 'Forbidden',
    404 => 'Not Found',
    405 => 'Method Not Allowed',
    406 => 'Not Acceptable',
    407 => 'Proxy Authentication Required',
    408 => 'Request Time-out',
    409 => 'Conflict',
    410 => 'Gone',
    411 => 'Length Required',
    412 => 'Precondition Failed',
    413 => 'Request Entity Too Large',
    414 => 'Request-URI Too Large',
    415 => 'Unsupported Media Type',
    416 => 'Requested range not satisfiable',
    417 => 'Expectation Failed',
    500 => 'Internal Server Error',
    501 => 'Not Implemented',
    502 => 'Bad Gateway',
    503 => 'Service Unavailable',
    504 => 'Gateway Time-out',
    505 => 'HTTP Version not supported',
  ];
  if (!isset($responses[$code])) {
    $code = floor($code / 100) * 100;
  }
  $result->code = $code;
  switch ($code) {
    case 200:
    case 304:
      break;
    case 301:
    case 302:
    case 307:
      $location = $result->headers['location'];
      $options['timeout'] -= Timer[$result]['read'] / 1000;
      if ($options['timeout'] <= 0) {
        $result->code = -1;
        $result->error = 'request timed out';
      }
      elseif ($options['max_redirects']) {

        // Redirect to the new location.
        $options['max_redirects']--;
        $result = background_process_http_request($location, $options);
        if (empty($result->error)) {
          background_process_http_request_get_response($result);
        }
        $result->redirect_code = $code;
      }
      if (!isset($result->redirect_url)) {
        $result->redirect_url = $location;
      }
      break;
    default:
      $result->error = $status_message;
  }
  return _background_process_http_request_result($result);
}

/**
 * Get request result.
 */
function _background_process_http_request_result($result) {
  if (isset($result->code)) {
    if (empty($result->end)) {
      $result->end = microtime(TRUE);
    }
    if (!empty($result->options['callback']) && is_callable($result->options['callback'])) {
      call_user_func($result->options['callback'], $result);
    }
  }
  return $result;
}

/**
 * Implements Process multiple http requests.
 */
function background_process_http_request_process(&$results, $options = []) {
  $options += [
    'timeout' => 30,
    'interval' => 0.01,
    'limit' => 0,
  ];
  $interval = $options['interval'] * 1000000;
  $expire = time() + $options['timeout'];
  while ($results && time() < $expire) {
    $cnt = 0;
    $data_ready = FALSE;
    foreach ($results as &$result) {
      if (isset($result->code)) {
        continue;
      }
      background_process_http_request_get_response($result);
      $data_ready = $data_ready || $result->data_ready ? TRUE : FALSE;
      $cnt++;
      if ($options['limit'] && $cnt >= $options['limit']) {
        break;
      }
    }
    if (!$cnt) {
      break;
    }
    if (!$data_ready) {
      usleep($interval);
    }
  }
}

/**
 * Implements to Determines the default service host.
 */
function background_process_determine_and_save_default_service_host() {
  $host = background_process_determine_default_service_host();
  if ($host) {
    global $base_url;
    drupal_set_message(t('Default service host determined at %base_url', [
      '%base_url' => _background_process_secure_url($host['base_url']),
    ]));
    if ($host['base_url'] === $base_url) {
      \Drupal::config('background_process.settings')
        ->clear('background_process_derived_default_host')
        ->save();
    }
    else {
      \Drupal::configFactory()
        ->getEditable('background_process.settings')
        ->set('background_process_derived_default_host', [
        'default' => $host,
      ])
        ->save();
      drupal_set_message(t('Default service host differs from base url (%base_url). If migrating database to other sites or environments, you will need to either run "Determine default service host" again, or configure the default service host manually through settings.php', [
        '%base_url' => $base_url,
      ]), 'warning');
    }
    return TRUE;
  }
  else {
    drupal_set_message(t('Could not determine default service host. Please configure background process in your settings.php'), 'error');
    return FALSE;
  }
}

/**
 * Implements to Ensure lock is removed at end of request.
 */
function _background_process_ensure_cleanup($handle, $remove = FALSE) {
  $handles =& drupal_static('background_process_handles_locked', NULL);
  if (!isset($handles)) {
    $handles = [];
    drupal_register_shutdown_function('_background_process_cleanup_locks');
  }
  if ($remove) {
    unset($handles[$handle]);
  }
  else {
    $handles[$handle] = $handle;
  }
}

/**
 * Implements to Shutdown handler for removing locks.
 */
function _background_process_cleanup_locks() {
  $handles =& drupal_static('background_process_handles_locked', NULL);
  if (!empty($handles)) {
    foreach ($handles as $handle) {
      background_process_remove_process($handle);
    }
  }
}

/**
 * Get string name of callback.
 */
function _background_process_callback_name($callback) {
  if (is_array($callback)) {
    if (is_object($callback[0])) {
      $callback = get_class($callback[0]) . '->' . $callback[1];
    }
    else {
      $callback = $callback[0] . '::' . $callback[1];
    }
  }
  return $callback;
}

/**
 * Get request headers.
 */
function _background_process_request_headers() {
  foreach ($_SERVER as $key => $value) {
    if (empty($value)) {
      continue;
    }
    if (substr($key, 0, 5) == 'HTTP_') {
      $key = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', substr($key, 5)))));
      if (empty($key)) {
        $headers[$key] = $value;
      }
      else {
        $headers[$key] .= "; {$value}";
      }
    }
  }
  return $headers;
}

/**
 * Remove headers we do not wish to pass on to the next request.
 */
function _background_process_filter_headers($headers) {
  $result = [];
  if (empty($headers)) {
    return $result;
  }
  foreach ($headers as $key => $value) {
    if (!preg_match('/^(Connection|Keep-Alive|Proxy-Authenticate|Proxy-Authorization|TE|Trailers|Transfer-Encoding|Upgrade|Set-Cookie|Content-Length|Host|Accept-Encoding)$/i', $key)) {
      $result[$key] = $value;
    }
  }
  return $result;
}

/**
 * Secure a URL by obfuscating the password if present.
 */
function _background_process_secure_url($url) {
  $url = parse_url($url);
  if (!empty($url['pass'])) {
    $url['pass'] = 'XXXXXXXX';
  }
  return _background_process_unparse_url($url);
}

/**
 * Reverse logic of parse_url().
 */
function _background_process_unparse_url($parsed_url) {
  $scheme = isset($parsed_url['scheme']) ? $parsed_url['scheme'] . '://' : '';
  $host = isset($parsed_url['host']) ? $parsed_url['host'] : '';
  $port = isset($parsed_url['port']) ? ':' . $parsed_url['port'] : '';
  $user = isset($parsed_url['user']) ? $parsed_url['user'] : '';
  $pass = isset($parsed_url['pass']) ? ':' . $parsed_url['pass'] : '';
  $pass = $user || $pass ? "{$pass}@" : '';
  $path = isset($parsed_url['path']) ? $parsed_url['path'] : '';
  $query = isset($parsed_url['query']) ? '?' . $parsed_url['query'] : '';
  $fragment = isset($parsed_url['fragment']) ? '#' . $parsed_url['fragment'] : '';
  return "{$scheme}{$user}{$pass}{$host}{$port}{$path}{$query}{$fragment}";
}

/**
 * Shutdown handler for restarting background process.
 */
function _background_process_restart($process, $args = []) {
  $args = empty($args) ? $process->args : $args;
  $new = BackgroundProcess::load($process);
  $result = $new
    ->start($process->callback, $args);
  return $result;
}

/**
 * Get service hosts defined in the system.
 */
function background_process_get_service_hosts() {
  global $base_url;
  $service_hosts = \Drupal::config('background_process.settings')
    ->get('background_process_service_hosts');
  $service_hosts += \Drupal::config('background_process.settings')
    ->get('background_process_derived_default_host');
  return $service_hosts;
}

Functions

Namesort descending Description
background_process_build_headers Implements to Transform header array from key/value to strings.
background_process_build_request Implements to Build url and headers for http request.
background_process_cron Implements hook_cron().
background_process_cronapi Implements hook_cronapi().
background_process_cron_alter Implements hook_cron_alter().
background_process_cron_queue_info Implements hook_cron_queue_info().
background_process_current_handle Implements to Get/set current handle.
background_process_determine_and_save_default_service_host Implements to Determines the default service host.
background_process_determine_default_service_host Implements to Determine host for current installation.
background_process_generate_handle Implements to Get a unique handle based on a callback.
background_process_get_process Get background process.
background_process_get_processes Get background process.
background_process_get_service_groups Implements to Get service hosts defined in the system.
background_process_get_service_hosts Get service hosts defined in the system.
background_process_http_request Implements to Perform an http request.
background_process_http_request_get_response Get response for an http request.
background_process_http_request_initiate Initiate the http request.
background_process_http_request_process Implements Process multiple http requests.
background_process_init Implements hook_init().
background_process_is_finished Implements to Check if the background process has finished.
background_process_is_started Implements to Check if the background process has started.
background_process_keepalive Implements to Keep process alive.
background_process_lock_process Implements to Lock process.
background_process_permission Implements hook_permission().
background_process_queue Implements Queue the function call passing function arguments.
background_process_queue_locked Implements to Queue locked background process.
background_process_remove_process Implements to Remove a background process.
background_process_restart Implements to Restart the current background process.
background_process_service_access Implements to Access handler for service call.
background_process_service_execute Implements to Execute the service.
background_process_service_group Implements hook_service_group().
background_process_service_group_random Implements to Load balancing based on random pick.
background_process_service_group_round_robin Implements for Round-robin load balancing based on random pick.
background_process_service_start Implements to Call the function requested by the service call.
background_process_set_process Implements to Set background process.
background_process_set_service_host Implements to Set a service host for a background process.
background_process_start Implements to Start background process.
background_process_start_locked Implements to Start locked background process.
background_process_unlock Unlock background process.
background_process_update_status Implements to Set status for background process.
_background_process_callback_name Get string name of callback.
_background_process_cleanup_locks Implements to Shutdown handler for removing locks.
_background_process_cleanup_menu Implements to Cleanup cache menu and ensure all locks are released (again).
_background_process_ensure_cleanup Implements to Ensure lock is removed at end of request.
_background_process_filter_headers Remove headers we do not wish to pass on to the next request.
_background_process_http_request_result Get request result.
_background_process_queue Worker callback for processing queued function call.
_background_process_request_headers Get request headers.
_background_process_restart Shutdown handler for restarting background process.
_background_process_secure_url Secure a URL by obfuscating the password if present.
_background_process_unparse_url Reverse logic of parse_url().

Constants