You are here

function httprl_send_request in HTTP Parallel Request & Threading Library 6

Same name and namespace in other branches
  1. 7 httprl.module \httprl_send_request()

Perform many HTTP requests.

This is a flexible and powerful HTTP client implementation. Correctly handles GET, POST, PUT or any other HTTP requests.

Parameters

$results: (optional) Array of results.

Return value

bool TRUE if function worked as planed.

See also

drupal_http_request()

6 calls to httprl_send_request()
httprl.examples.php in examples/httprl.examples.php
HTTP Parallel Request Library code examples.
httprl_batch_callback in ./httprl.module
Given an array of data, use multiple processes to crunch it.
httprl_call_user_func_array_async in ./httprl.module
Run the callback with the given params in the background.
httprl_install_http_test in ./httprl.install
Issue a HTTP request to admin/httprl-test, verifying that the server got it.
httprl_override_core in ./httprl.module
Queue and send off http request.

... See full list

File

./httprl.module, line 1264
HTTP Parallel Request Library module.

Code

function httprl_send_request($results = NULL) {
  static $responses = array();
  static $counter = 0;
  static $output = array();
  static $static_stall_freads = FALSE;
  if (!is_null($results)) {
    if (empty($results)) {
      return FALSE;
    }

    // Put the connection information into the responses array.
    foreach ($results as $result) {
      $responses[$counter] = $result;
      $counter++;
    }
    return TRUE;
  }

  // Exit if there is nothing to do.
  if (empty($responses)) {
    return FALSE;
  }

  // If the t function is not available use httprl_pr.
  $t = function_exists('t') ? 't' : 'httprl_pr';

  // Remove errors from responses array and set the global timeout.
  $global_timeout = 1;
  $global_connection_limit = 1;
  $stall_freads = FALSE;
  foreach ($responses as $id => &$result) {
    if (!empty($result->error)) {
      $result->status = 'Connection not made.';

      // Do post processing on the stream.
      httprl_post_processing($id, $responses, $output);
      continue;
    }
    if (!empty($result->cached)) {

      // Used the cached data.
      $output[$result->cached->url] = $result->cached;
      unset($responses[$id]);
      continue;
    }

    // Get connection limits.
    $global_connection_limit = max($global_connection_limit, $result->options['global_connections']);
    if (!isset($domain_connection_limit[$result->options['headers']['Host']])) {
      $domain_connection_limit[$result->options['headers']['Host']] = max(1, $result->options['domain_connections']);
    }
    else {
      $domain_connection_limit[$result->options['headers']['Host']] = max($domain_connection_limit[$result->options['headers']['Host']], $result->options['domain_connections']);
    }

    // Set global timeout.
    $global_timeout = max($global_timeout, $result->options['global_timeout']);

    // Issue fwrite, return. Run fread later on in the script.
    if (!empty($result->options['stall_fread']) && !$static_stall_freads) {
      $static_stall_freads = TRUE;
      $stall_freads = TRUE;
    }
  }

  // Record start time.
  $start_time_this_run = $start_time_global = microtime(TRUE);

  // Record the number of db pings done.
  $ping_db_counts = array();
  $full_bootstrap = httprl_drupal_full_bootstrap();

  // Run the loop as long as we have a stream to read/write to.
  $stream_select_timeout = 1;
  $stream_write_count = 0;
  while (!empty($responses)) {

    // Initialize connection limits.
    $this_run = array();
    $global_connection_count = 0;
    $domain_connection_count = array();
    $restart_timers = FALSE;

    // Get time.
    $now = microtime(TRUE);

    // Calculate times.
    $elapsed_time = $now - $start_time_this_run;
    $start_time_this_run = $now;
    $global_time = $global_timeout - ($start_time_this_run - $start_time_global);

    // See if the DB needs to be pinged.
    $rounded_time = floor($elapsed_time);
    if ($full_bootstrap && !empty($result->options['ping_db']) && $rounded_time >= $result->options['ping_db'] && $rounded_time % $result->options['ping_db'] == 0 && empty($ping_db_counts[$rounded_time])) {
      $empty_array = array();
      system_get_files_database($empty_array, 'ping_db');
      $ping_db_counts[$rounded_time] = 1;
    }

    // Inspect each stream, checking for timeouts and connection limits.
    foreach ($responses as $id => &$result) {

      // See if function timed out.
      if ($global_time <= 0) {

        // Function timed out & the request is not done.
        if ($result->status == 'Connecting.') {
          $result->error = $t('Function timed out. TCP.');

          // If stream is not done writing, then remove one from the write count.
          if (isset($result->fp)) {
            $stream_write_count--;
          }
        }
        elseif ($result->status == 'Writing to server.') {
          $result->error = $t('Function timed out. Write.');

          // If stream is not done writing, then remove one from the write count.
          if (isset($result->fp)) {
            $stream_write_count--;
          }
        }
        else {
          $result->error = $t('Function timed out. Read');
        }
        $result->code = HTTPRL_FUNCTION_TIMEOUT;
        $result->status = 'Done.';

        // Do post processing on the stream and close it.
        httprl_post_processing($id, $responses, $output, $global_time);
        continue;
      }

      // Do not calculate local timeout if a file pointer doesn't exist.
      if (isset($result->fp)) {

        // Add the elapsed time to this stream.
        $result->running_time += $elapsed_time;

        // Calculate how much time is left of the original timeout value.
        $timeout = $result->options['timeout'] - $result->running_time;

        // Connection was dropped or connection timed out.
        if ($timeout <= 0) {
          $result->error = $t('Connection timed out.');

          // Stream timed out & the request is not done.
          if ($result->status == 'Writing to server.') {
            $result->error .= ' ' . $t('Write.');

            // If stream is not done writing, then remove one from the write count.
            $stream_write_count--;
          }
          else {
            $result->error .= ' ' . $t('Read.');
          }
          $result->code = HTTPRL_REQUEST_TIMEOUT;
          $result->status = 'Done.';

          // Do post processing on the stream.
          httprl_post_processing($id, $responses, $output, $timeout);
          continue;
        }

        // Connection was dropped or connection timed out.
        if ($result->status == 'Connecting.' && $result->running_time > $result->options['connect_timeout']) {
          $socket_name = stream_socket_get_name($result->fp, TRUE);
          if (empty($socket_name) || $result->running_time > $result->options['connect_timeout'] * 1.5) {
            $result->error = $t('Connection timed out.');

            // Stream timed out & the request is not done.
            if ($result->status == 'Connecting.') {
              $result->error .= ' ' . $t('TCP Connect Timeout.');

              // If stream is not done writing, then remove one from the write count.
              $stream_write_count--;
            }
            $result->code = HTTPRL_REQUEST_TIMEOUT;
            $result->status = 'Done.';

            // Do post processing on the stream.
            httprl_post_processing($id, $responses, $output, $timeout);
            continue;
          }
        }
        if (!isset($responses[$id]->time_to_first_byte) && $result->running_time > $result->options['ttfb_timeout']) {
          $result->error = $t('Connection timed out. Time to First Byte Timeout.');
          $result->code = HTTPRL_REQUEST_ABORTED;
          $result->status = 'Done.';

          // Do post processing on the stream.
          httprl_post_processing($id, $responses, $output, $timeout);
          continue;
        }
      }

      // Connection was handled elsewhere.
      if (!isset($result->fp) && $result->status != 'Connecting.') {

        // Do post processing on the stream.
        httprl_post_processing($id, $responses, $output);
        continue;
      }

      // Set the connection limits for this run.
      // Get the host name.
      $host = $result->options['headers']['Host'];

      // Set the domain connection limit if none has been defined yet.
      if (!isset($domain_connection_limit[$host])) {
        $domain_connection_limit[$host] = max(1, $result->options['domain_connections']);
      }

      // Count up the number of connections.
      $global_connection_count++;
      if (empty($domain_connection_count[$host])) {
        $domain_connection_count[$host] = 1;
      }
      else {
        $domain_connection_count[$host]++;
      }

      // If the conditions are correct, let the stream be ran in this loop.
      if ($global_connection_limit >= $global_connection_count && $domain_connection_limit[$host] >= $domain_connection_count[$host]) {

        // Establish a new connection.
        if (!isset($result->fp) && $result->status == 'Connecting.') {

          // Establish a connection to the server.
          httprl_establish_stream_connection($result);

          // Reset timer.
          $restart_timers = TRUE;

          // Get lock if needed.
          if (!empty($result->options['lock_name'])) {
            httprl_acquire_lock($result);
          }

          // If connection can not be established bail out here.
          if (!$result->fp) {

            // Do post processing on the stream.
            httprl_post_processing($id, $responses, $output);
            $domain_connection_count[$host]--;
            $global_connection_count--;
            continue;
          }
          $stream_write_count++;
        }
        if (!empty($result->fp)) {
          $this_run[$id] = $result->fp;
        }
      }
    }

    // All streams removed; exit loop.
    if (empty($responses)) {
      break;
    }

    // Restart timers.
    if ($restart_timers) {
      $start_time_this_run = microtime(TRUE);
    }

    // No streams selected; restart loop from the top.
    if (empty($this_run)) {
      continue;
    }

    // Set the read and write vars to the streams var.
    $read = $write = $this_run;
    $except = array();

    // Do some voodoo and open all streams at once. Wait 25ms for streams to
    // respond.
    if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {

      // If on windows, use error suppression http://drupal.org/node/1869026
      $n = @stream_select($read, $write, $except, $stream_select_timeout, 25000);
    }
    else {
      $n = stream_select($read, $write, $except, $stream_select_timeout, 25000);
    }
    $stream_select_timeout = 0;

    // An error occurred with the streams. Remove bad ones.
    if ($n === FALSE) {
      $merged = array_unique(array_merge($read, $write, $except));
      foreach ($merged as $m) {
        $id = array_search($m, $this_run);
        @fclose($m);
        if ($id !== FALSE && isset($responses[$id])) {
          watchdog('httprl', 'The following url had a stream_select error and had to be terminated: %info', array(
            '%info' => $responses[$id]->url,
          ), WATCHDOG_ERROR);
          unset($responses[$id]);
        }
      }
    }

    // We have some streams to read/write to.
    $rw_done = FALSE;
    if (!empty($n)) {
      if (!empty($read) && is_array($read)) {

        // Readable sockets either have data for us, or are failed connection
        // attempts.
        foreach ($read as $r) {
          $id = array_search($r, $this_run);

          // Make sure ID is in the streams.
          if ($id === FALSE) {
            @fclose($r);
            continue;
          }

          // Close the stream if ID not found in $responses.
          if (!array_key_exists($id, $responses)) {
            @fclose($r);
            continue;
          }

          // Do not read from the non blocking sockets.
          if (empty($responses[$id]->options['blocking'])) {

            // Do post processing on the stream and close it.
            httprl_post_processing($id, $responses, $output);
            continue;
          }

          // Read socket.
          $chunk = fread($r, $responses[$id]->chunk_size);
          if (strlen($chunk) > 0) {
            $rw_done = TRUE;
            if (!isset($responses[$id]->time_to_first_byte)) {

              // Calculate Time to First Byte.
              $responses[$id]->time_to_first_byte = $result->running_time + microtime(TRUE) - $start_time_this_run;
            }
          }
          $responses[$id]->data .= $chunk;

          // Process the headers if we have some data.
          if (!empty($responses[$id]->data) && empty($responses[$id]->headers) && (strpos($responses[$id]->data, "\r\n\r\n") || strpos($responses[$id]->data, "\n\n") || strpos($responses[$id]->data, "\r\r"))) {

            // See if the headers are in the data stream.
            httprl_parse_data($responses[$id]);
            if (!empty($responses[$id]->headers)) {

              // Stream was a redirect, kill & close this connection; redirect is
              // being followed now.
              if (!empty($responses[$id]->options['internal_states']['kill'])) {
                fclose($r);
                unset($responses[$id]);
                continue;
              }

              // Now that we have the headers, increase the chunk size.
              $responses[$id]->chunk_size = $responses[$id]->options['chunk_size_read'];

              // If a range header is set, 200 was returned, method is GET,
              // calculate how many bytes need to be downloaded.
              if (!empty($responses[$id]->options['headers']['Range']) && $responses[$id]->code == 200 && $responses[$id]->options['method'] == 'GET') {
                $responses[$id]->ranges = httprl_get_ranges($responses[$id]->options['headers']['Range']);
                $responses[$id]->options['max_data_size'] = httprl_get_last_byte_from_range($responses[$id]->ranges);
              }
            }
          }

          // Close the connection if Transfer-Encoding & Content-Encoding are not
          // used, a Range request was made and the currently downloaded data size
          // is larger than the Range request.
          if (!empty($responses[$id]->options['max_data_size']) && is_numeric($responses[$id]->options['max_data_size']) && (!isset($result->headers['transfer-encoding']) || $result->headers['transfer-encoding'] != 'chunked') && (!isset($result->headers['content-encoding']) || $result->headers['content-encoding'] != 'gzip' && $result->headers['content-encoding'] != 'deflate') && $responses[$id]->options['max_data_size'] < httprl_strlen($responses[$id]->data)) {
            $responses[$id]->status = 'Done.';

            // Do post processing on the stream.
            httprl_post_processing($id, $responses, $output);
            continue;
          }

          // Get stream data.
          $info = stream_get_meta_data($r);
          $alive = !$info['eof'] && !feof($r) && !$info['timed_out'] && strlen($chunk);
          if (!$alive && $responses[$id]->status !== 'Request sent, waiting for response.') {
            if ($responses[$id]->status == 'Connecting.') {
              $responses[$id]->error = $t('Connection refused by destination. TCP.');
              $responses[$id]->code = HTTPRL_CONNECTION_REFUSED;
            }
            if ($responses[$id]->status == 'Writing to server.') {
              $responses[$id]->error = $t('Connection refused by destination. Write.');
              $responses[$id]->code = HTTPRL_CONNECTION_REFUSED;
            }
            $responses[$id]->status = 'Done.';

            // Do post processing on the stream.
            httprl_post_processing($id, $responses, $output);
            continue;
          }
          else {
            $responses[$id]->status = 'Reading data';
          }
        }
      }

      // Write to each stream if it is available.
      if ($stream_write_count > 0 && !empty($write) && is_array($write)) {
        foreach ($write as $w) {
          $id = array_search($w, $this_run);

          // Make sure ID is in the streams & status is for writing.
          if ($id === FALSE || empty($responses[$id]->status) || $responses[$id]->status != 'Connecting.' && $responses[$id]->status != 'Writing to server.') {
            continue;
          }

          // Keep track of how many bytes are sent.
          if (!isset($responses[$id]->bytes_sent)) {
            $responses[$id]->bytes_sent = 0;
          }

          // Have twice the number of bytes available for fwrite.
          $data_to_send = substr($responses[$id]->request, $responses[$id]->bytes_sent, 2 * $responses[$id]->options['chunk_size_write']);

          // Calculate the number of bytes we need to write to the stream.
          $len = httprl_strlen($data_to_send);
          if ($len > 0) {

            // Write to the stream.
            $bytes = fwrite($w, $data_to_send, min($responses[$id]->options['chunk_size_write'], $len));
          }
          else {

            // Nothing to write.
            $bytes = $len;
          }

          // See if we are done with writing.
          if ($bytes === FALSE) {

            // fwrite failed.
            $responses[$id]->error = $t('fwrite() failed.');
            $responses[$id]->code = HTTPRL_REQUEST_FWRITE_FAIL;
            $responses[$id]->status = 'Done.';
            $stream_write_count--;

            // Do post processing on the stream.
            httprl_post_processing($id, $responses, $output);
            continue;
          }
          elseif ($bytes >= $len) {

            // fwrite is done.
            $stream_write_count--;

            // If this is a non blocking request then close the connection and
            // destroy the stream.
            if (empty($responses[$id]->options['blocking'])) {
              $responses[$id]->status = 'Non-Blocking request sent out. Not waiting for the response.';

              // Do post processing on the stream.
              httprl_post_processing($id, $responses, $output);
              continue;
            }
            else {

              // All data has been written to the socket. We are read only from
              // here on out.
              $responses[$id]->status = "Request sent, waiting for response.";
            }

            // Record how many bytes were sent.
            $responses[$id]->bytes_sent += $bytes;
            $rw_done = TRUE;
          }
          else {

            // Change status to 'Writing to server.'
            if ($responses[$id]->status = 'Connecting.') {
              $responses[$id]->status = 'Writing to server.';
            }

            // There is more data to write to this socket. Cut what was sent
            // across the stream and resend whats left next time in the loop.
            $responses[$id]->bytes_sent += $bytes;
            $rw_done = TRUE;
          }
        }
      }
      elseif ($stall_freads) {
        return;
      }
    }
    if (!$rw_done) {

      // Wait 5ms for data buffers.
      usleep(5000);
    }
  }

  // Copy output.
  $return = $output;

  // Free memory/reset static variables.
  $responses = array();
  $counter = 0;
  $output = array();
  $static_stall_freads = FALSE;
  return $return;
}