function httprl_send_request in HTTP Parallel Request & Threading Library 6
Same name and namespace in other branches
- 7 httprl.module \httprl_send_request()
Perform many HTTP requests.
This is a flexible and powerful HTTP client implementation. Correctly handles GET, POST, PUT or any other HTTP requests.
Parameters
$results: (optional) Array of results.
Return value
bool TRUE if function worked as planed.
See also
6 calls to httprl_send_request()
- httprl.examples.php in examples/
httprl.examples.php - HTTP Parallel Request Library code examples.
- httprl_batch_callback in ./
httprl.module - Given an array of data, use multiple processes to crunch it.
- httprl_call_user_func_array_async in ./
httprl.module - Run the callback with the given params in the background.
- httprl_install_http_test in ./
httprl.install - Issue a HTTP request to admin/httprl-test, verifying that the server got it.
- httprl_override_core in ./
httprl.module - Queue and send off http request.
File
- ./
httprl.module, line 1264 - HTTP Parallel Request Library module.
Code
function httprl_send_request($results = NULL) {
static $responses = array();
static $counter = 0;
static $output = array();
static $static_stall_freads = FALSE;
if (!is_null($results)) {
if (empty($results)) {
return FALSE;
}
// Put the connection information into the responses array.
foreach ($results as $result) {
$responses[$counter] = $result;
$counter++;
}
return TRUE;
}
// Exit if there is nothing to do.
if (empty($responses)) {
return FALSE;
}
// If the t function is not available use httprl_pr.
$t = function_exists('t') ? 't' : 'httprl_pr';
// Remove errors from responses array and set the global timeout.
$global_timeout = 1;
$global_connection_limit = 1;
$stall_freads = FALSE;
foreach ($responses as $id => &$result) {
if (!empty($result->error)) {
$result->status = 'Connection not made.';
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output);
continue;
}
if (!empty($result->cached)) {
// Used the cached data.
$output[$result->cached->url] = $result->cached;
unset($responses[$id]);
continue;
}
// Get connection limits.
$global_connection_limit = max($global_connection_limit, $result->options['global_connections']);
if (!isset($domain_connection_limit[$result->options['headers']['Host']])) {
$domain_connection_limit[$result->options['headers']['Host']] = max(1, $result->options['domain_connections']);
}
else {
$domain_connection_limit[$result->options['headers']['Host']] = max($domain_connection_limit[$result->options['headers']['Host']], $result->options['domain_connections']);
}
// Set global timeout.
$global_timeout = max($global_timeout, $result->options['global_timeout']);
// Issue fwrite, return. Run fread later on in the script.
if (!empty($result->options['stall_fread']) && !$static_stall_freads) {
$static_stall_freads = TRUE;
$stall_freads = TRUE;
}
}
// Record start time.
$start_time_this_run = $start_time_global = microtime(TRUE);
// Record the number of db pings done.
$ping_db_counts = array();
$full_bootstrap = httprl_drupal_full_bootstrap();
// Run the loop as long as we have a stream to read/write to.
$stream_select_timeout = 1;
$stream_write_count = 0;
while (!empty($responses)) {
// Initialize connection limits.
$this_run = array();
$global_connection_count = 0;
$domain_connection_count = array();
$restart_timers = FALSE;
// Get time.
$now = microtime(TRUE);
// Calculate times.
$elapsed_time = $now - $start_time_this_run;
$start_time_this_run = $now;
$global_time = $global_timeout - ($start_time_this_run - $start_time_global);
// See if the DB needs to be pinged.
$rounded_time = floor($elapsed_time);
if ($full_bootstrap && !empty($result->options['ping_db']) && $rounded_time >= $result->options['ping_db'] && $rounded_time % $result->options['ping_db'] == 0 && empty($ping_db_counts[$rounded_time])) {
$empty_array = array();
system_get_files_database($empty_array, 'ping_db');
$ping_db_counts[$rounded_time] = 1;
}
// Inspect each stream, checking for timeouts and connection limits.
foreach ($responses as $id => &$result) {
// See if function timed out.
if ($global_time <= 0) {
// Function timed out & the request is not done.
if ($result->status == 'Connecting.') {
$result->error = $t('Function timed out. TCP.');
// If stream is not done writing, then remove one from the write count.
if (isset($result->fp)) {
$stream_write_count--;
}
}
elseif ($result->status == 'Writing to server.') {
$result->error = $t('Function timed out. Write.');
// If stream is not done writing, then remove one from the write count.
if (isset($result->fp)) {
$stream_write_count--;
}
}
else {
$result->error = $t('Function timed out. Read');
}
$result->code = HTTPRL_FUNCTION_TIMEOUT;
$result->status = 'Done.';
// Do post processing on the stream and close it.
httprl_post_processing($id, $responses, $output, $global_time);
continue;
}
// Do not calculate local timeout if a file pointer doesn't exist.
if (isset($result->fp)) {
// Add the elapsed time to this stream.
$result->running_time += $elapsed_time;
// Calculate how much time is left of the original timeout value.
$timeout = $result->options['timeout'] - $result->running_time;
// Connection was dropped or connection timed out.
if ($timeout <= 0) {
$result->error = $t('Connection timed out.');
// Stream timed out & the request is not done.
if ($result->status == 'Writing to server.') {
$result->error .= ' ' . $t('Write.');
// If stream is not done writing, then remove one from the write count.
$stream_write_count--;
}
else {
$result->error .= ' ' . $t('Read.');
}
$result->code = HTTPRL_REQUEST_TIMEOUT;
$result->status = 'Done.';
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output, $timeout);
continue;
}
// Connection was dropped or connection timed out.
if ($result->status == 'Connecting.' && $result->running_time > $result->options['connect_timeout']) {
$socket_name = stream_socket_get_name($result->fp, TRUE);
if (empty($socket_name) || $result->running_time > $result->options['connect_timeout'] * 1.5) {
$result->error = $t('Connection timed out.');
// Stream timed out & the request is not done.
if ($result->status == 'Connecting.') {
$result->error .= ' ' . $t('TCP Connect Timeout.');
// If stream is not done writing, then remove one from the write count.
$stream_write_count--;
}
$result->code = HTTPRL_REQUEST_TIMEOUT;
$result->status = 'Done.';
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output, $timeout);
continue;
}
}
if (!isset($responses[$id]->time_to_first_byte) && $result->running_time > $result->options['ttfb_timeout']) {
$result->error = $t('Connection timed out. Time to First Byte Timeout.');
$result->code = HTTPRL_REQUEST_ABORTED;
$result->status = 'Done.';
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output, $timeout);
continue;
}
}
// Connection was handled elsewhere.
if (!isset($result->fp) && $result->status != 'Connecting.') {
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output);
continue;
}
// Set the connection limits for this run.
// Get the host name.
$host = $result->options['headers']['Host'];
// Set the domain connection limit if none has been defined yet.
if (!isset($domain_connection_limit[$host])) {
$domain_connection_limit[$host] = max(1, $result->options['domain_connections']);
}
// Count up the number of connections.
$global_connection_count++;
if (empty($domain_connection_count[$host])) {
$domain_connection_count[$host] = 1;
}
else {
$domain_connection_count[$host]++;
}
// If the conditions are correct, let the stream be ran in this loop.
if ($global_connection_limit >= $global_connection_count && $domain_connection_limit[$host] >= $domain_connection_count[$host]) {
// Establish a new connection.
if (!isset($result->fp) && $result->status == 'Connecting.') {
// Establish a connection to the server.
httprl_establish_stream_connection($result);
// Reset timer.
$restart_timers = TRUE;
// Get lock if needed.
if (!empty($result->options['lock_name'])) {
httprl_acquire_lock($result);
}
// If connection can not be established bail out here.
if (!$result->fp) {
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output);
$domain_connection_count[$host]--;
$global_connection_count--;
continue;
}
$stream_write_count++;
}
if (!empty($result->fp)) {
$this_run[$id] = $result->fp;
}
}
}
// All streams removed; exit loop.
if (empty($responses)) {
break;
}
// Restart timers.
if ($restart_timers) {
$start_time_this_run = microtime(TRUE);
}
// No streams selected; restart loop from the top.
if (empty($this_run)) {
continue;
}
// Set the read and write vars to the streams var.
$read = $write = $this_run;
$except = array();
// Do some voodoo and open all streams at once. Wait 25ms for streams to
// respond.
if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
// If on windows, use error suppression http://drupal.org/node/1869026
$n = @stream_select($read, $write, $except, $stream_select_timeout, 25000);
}
else {
$n = stream_select($read, $write, $except, $stream_select_timeout, 25000);
}
$stream_select_timeout = 0;
// An error occurred with the streams. Remove bad ones.
if ($n === FALSE) {
$merged = array_unique(array_merge($read, $write, $except));
foreach ($merged as $m) {
$id = array_search($m, $this_run);
@fclose($m);
if ($id !== FALSE && isset($responses[$id])) {
watchdog('httprl', 'The following url had a stream_select error and had to be terminated: %info', array(
'%info' => $responses[$id]->url,
), WATCHDOG_ERROR);
unset($responses[$id]);
}
}
}
// We have some streams to read/write to.
$rw_done = FALSE;
if (!empty($n)) {
if (!empty($read) && is_array($read)) {
// Readable sockets either have data for us, or are failed connection
// attempts.
foreach ($read as $r) {
$id = array_search($r, $this_run);
// Make sure ID is in the streams.
if ($id === FALSE) {
@fclose($r);
continue;
}
// Close the stream if ID not found in $responses.
if (!array_key_exists($id, $responses)) {
@fclose($r);
continue;
}
// Do not read from the non blocking sockets.
if (empty($responses[$id]->options['blocking'])) {
// Do post processing on the stream and close it.
httprl_post_processing($id, $responses, $output);
continue;
}
// Read socket.
$chunk = fread($r, $responses[$id]->chunk_size);
if (strlen($chunk) > 0) {
$rw_done = TRUE;
if (!isset($responses[$id]->time_to_first_byte)) {
// Calculate Time to First Byte.
$responses[$id]->time_to_first_byte = $result->running_time + microtime(TRUE) - $start_time_this_run;
}
}
$responses[$id]->data .= $chunk;
// Process the headers if we have some data.
if (!empty($responses[$id]->data) && empty($responses[$id]->headers) && (strpos($responses[$id]->data, "\r\n\r\n") || strpos($responses[$id]->data, "\n\n") || strpos($responses[$id]->data, "\r\r"))) {
// See if the headers are in the data stream.
httprl_parse_data($responses[$id]);
if (!empty($responses[$id]->headers)) {
// Stream was a redirect, kill & close this connection; redirect is
// being followed now.
if (!empty($responses[$id]->options['internal_states']['kill'])) {
fclose($r);
unset($responses[$id]);
continue;
}
// Now that we have the headers, increase the chunk size.
$responses[$id]->chunk_size = $responses[$id]->options['chunk_size_read'];
// If a range header is set, 200 was returned, method is GET,
// calculate how many bytes need to be downloaded.
if (!empty($responses[$id]->options['headers']['Range']) && $responses[$id]->code == 200 && $responses[$id]->options['method'] == 'GET') {
$responses[$id]->ranges = httprl_get_ranges($responses[$id]->options['headers']['Range']);
$responses[$id]->options['max_data_size'] = httprl_get_last_byte_from_range($responses[$id]->ranges);
}
}
}
// Close the connection if Transfer-Encoding & Content-Encoding are not
// used, a Range request was made and the currently downloaded data size
// is larger than the Range request.
if (!empty($responses[$id]->options['max_data_size']) && is_numeric($responses[$id]->options['max_data_size']) && (!isset($result->headers['transfer-encoding']) || $result->headers['transfer-encoding'] != 'chunked') && (!isset($result->headers['content-encoding']) || $result->headers['content-encoding'] != 'gzip' && $result->headers['content-encoding'] != 'deflate') && $responses[$id]->options['max_data_size'] < httprl_strlen($responses[$id]->data)) {
$responses[$id]->status = 'Done.';
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output);
continue;
}
// Get stream data.
$info = stream_get_meta_data($r);
$alive = !$info['eof'] && !feof($r) && !$info['timed_out'] && strlen($chunk);
if (!$alive && $responses[$id]->status !== 'Request sent, waiting for response.') {
if ($responses[$id]->status == 'Connecting.') {
$responses[$id]->error = $t('Connection refused by destination. TCP.');
$responses[$id]->code = HTTPRL_CONNECTION_REFUSED;
}
if ($responses[$id]->status == 'Writing to server.') {
$responses[$id]->error = $t('Connection refused by destination. Write.');
$responses[$id]->code = HTTPRL_CONNECTION_REFUSED;
}
$responses[$id]->status = 'Done.';
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output);
continue;
}
else {
$responses[$id]->status = 'Reading data';
}
}
}
// Write to each stream if it is available.
if ($stream_write_count > 0 && !empty($write) && is_array($write)) {
foreach ($write as $w) {
$id = array_search($w, $this_run);
// Make sure ID is in the streams & status is for writing.
if ($id === FALSE || empty($responses[$id]->status) || $responses[$id]->status != 'Connecting.' && $responses[$id]->status != 'Writing to server.') {
continue;
}
// Keep track of how many bytes are sent.
if (!isset($responses[$id]->bytes_sent)) {
$responses[$id]->bytes_sent = 0;
}
// Have twice the number of bytes available for fwrite.
$data_to_send = substr($responses[$id]->request, $responses[$id]->bytes_sent, 2 * $responses[$id]->options['chunk_size_write']);
// Calculate the number of bytes we need to write to the stream.
$len = httprl_strlen($data_to_send);
if ($len > 0) {
// Write to the stream.
$bytes = fwrite($w, $data_to_send, min($responses[$id]->options['chunk_size_write'], $len));
}
else {
// Nothing to write.
$bytes = $len;
}
// See if we are done with writing.
if ($bytes === FALSE) {
// fwrite failed.
$responses[$id]->error = $t('fwrite() failed.');
$responses[$id]->code = HTTPRL_REQUEST_FWRITE_FAIL;
$responses[$id]->status = 'Done.';
$stream_write_count--;
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output);
continue;
}
elseif ($bytes >= $len) {
// fwrite is done.
$stream_write_count--;
// If this is a non blocking request then close the connection and
// destroy the stream.
if (empty($responses[$id]->options['blocking'])) {
$responses[$id]->status = 'Non-Blocking request sent out. Not waiting for the response.';
// Do post processing on the stream.
httprl_post_processing($id, $responses, $output);
continue;
}
else {
// All data has been written to the socket. We are read only from
// here on out.
$responses[$id]->status = "Request sent, waiting for response.";
}
// Record how many bytes were sent.
$responses[$id]->bytes_sent += $bytes;
$rw_done = TRUE;
}
else {
// Change status to 'Writing to server.'
if ($responses[$id]->status = 'Connecting.') {
$responses[$id]->status = 'Writing to server.';
}
// There is more data to write to this socket. Cut what was sent
// across the stream and resend whats left next time in the loop.
$responses[$id]->bytes_sent += $bytes;
$rw_done = TRUE;
}
}
}
elseif ($stall_freads) {
return;
}
}
if (!$rw_done) {
// Wait 5ms for data buffers.
usleep(5000);
}
}
// Copy output.
$return = $output;
// Free memory/reset static variables.
$responses = array();
$counter = 0;
$output = array();
$static_stall_freads = FALSE;
return $return;
}