View source
<?php
namespace Elasticsearch\Connections;
use Elasticsearch\Connections\GuzzleConnection;
use Elasticsearch\Common\Exceptions\AlreadyExpiredException;
use Elasticsearch\Common\Exceptions\Conflict409Exception;
use Elasticsearch\Common\Exceptions\Forbidden403Exception;
use Elasticsearch\Common\Exceptions\InvalidArgumentException;
use Elasticsearch\Common\Exceptions\Missing404Exception;
use Elasticsearch\Common\Exceptions\NoDocumentsToGetException;
use Elasticsearch\Common\Exceptions\NoShardAvailableException;
use Elasticsearch\Common\Exceptions\RoutingMissingException;
use Elasticsearch\Common\Exceptions\ScriptLangNotSupportedException;
use Elasticsearch\Common\Exceptions\TransportException;
use Guzzle\Http\Client;
use Guzzle\Http\Exception\ClientErrorResponseException;
use Guzzle\Http\Exception\CurlException;
use Guzzle\Http\Exception\ServerErrorResponseException;
use Guzzle\Http\Message\Header\HeaderCollection;
use Guzzle\Http\Message\Request;
use Guzzle\Http\Message\Response;
use Psr\Log\LoggerInterface;
use Elasticsearch\Common\DICBuilder;
class GuzzleConnectionDebug extends AbstractConnection implements ConnectionInterface {
public static $debugOutput;
private $guzzle;
private $connectionOpts = array();
public function __construct($hostDetails, $connectionParams, LoggerInterface $log, LoggerInterface $trace) {
if (isset($connectionParams['guzzleClient']) !== true) {
$guzzleOptions = array();
$guzzleOptions['curl.options']['body_as_string'] = true;
if (isset($connectionParams['auth']) === true) {
$guzzleOptions['request.options']['auth'] = $connectionParams['auth'];
}
if (!isset($connectionParams['guzzleOptions'])) {
$connectionParams['guzzleOptions'] = array();
}
$connectionParams = array_merge($connectionParams, $connectionParams['guzzleOptions']);
$this->guzzle = new \Guzzle\Http\Client(null, $guzzleOptions);
}
if (isset($hostDetails['port']) !== true) {
$hostDetails['port'] = 9200;
}
if (isset($connectionParams)) {
$this->connectionOpts = $connectionParams;
}
return parent::__construct($hostDetails, $connectionParams, $log, $trace);
}
public function getTransportSchema() {
return $this->transportSchema;
}
public function getLastRequestInfo() {
return $this->lastRequest;
}
public function performRequest($method, $uri, $params = null, $body = null, $options = array()) {
$uri = $this
->getURI($uri, $params);
$options += $this->connectionOpts;
$request = $this
->buildGuzzleRequest($method, $uri, $body, $options);
self::$debugOutput[] = array(
'request' => array(
'method' => $method,
'uri' => $uri,
'params' => $params,
'body' => $body,
'options' => $options,
),
);
$time = time();
$response = $this
->sendRequest($request, $body);
self::$debugOutput[] = array(
'response' => array(
'status' => $response
->getStatusCode(),
'text' => $response
->getBody(true),
'info' => $response
->getInfo(),
),
);
return array(
'status' => $response
->getStatusCode(),
'text' => $response
->getBody(true),
'info' => $response
->getInfo(),
);
}
private function getURI($uri, $params) {
$uri = $this->host . $uri;
if (isset($params) === true) {
$uri .= '?' . http_build_query($params);
}
return $uri;
}
private function buildGuzzleRequest($method, $uri, $body, $options = array()) {
if ($method === 'GET' && isset($body) === true) {
$method = 'POST';
}
if (isset($body) === true) {
$request = $this->guzzle
->{$method}($uri, array(), $body, $options);
}
else {
$request = $this->guzzle
->{$method}($uri, array(), $options);
}
return $request;
}
private function sendRequest(Request $request, $body) {
try {
$request
->send();
} catch (ServerErrorResponseException $exception) {
$this
->process5xxError($request, $exception, $body);
} catch (ClientErrorResponseException $exception) {
$this
->process4xxError($request, $exception, $body);
} catch (CurlException $exception) {
$this
->processCurlError($exception);
} catch (\Exception $exception) {
$error = 'Unexpected error: ' . $exception
->getMessage();
$this->log
->critical($error);
throw new TransportException($error);
}
$this
->processSuccessfulRequest($request, $body);
return $request
->getResponse();
}
private function process5xxError(Request $request, ServerErrorResponseException $exception, $body) {
$this
->logErrorDueToFailure($request, $exception, $body);
$statusCode = $request
->getResponse()
->getStatusCode();
$exceptionText = $exception
->getMessage();
$responseBody = $request
->getResponse()
->getBody(true);
$exceptionText = "{$statusCode} Server Exception: {$exceptionText}\n{$responseBody}";
$this->log
->error($exceptionText);
if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) {
throw new RoutingMissingException($exceptionText, $statusCode, $exception);
}
elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) {
throw new NoDocumentsToGetException($exceptionText, $statusCode, $exception);
}
elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) {
throw new NoShardAvailableException($exceptionText, $statusCode, $exception);
}
else {
throw new \Elasticsearch\Common\Exceptions\ServerErrorResponseException($exceptionText, $statusCode, $exception);
}
}
private function process4xxError(Request $request, ClientErrorResponseException $exception, $body) {
$this
->logErrorDueToFailure($request, $exception, $body);
$statusCode = $request
->getResponse()
->getStatusCode();
$exceptionText = $exception
->getMessage();
$responseBody = $request
->getResponse()
->getBody(true);
$exceptionText = "{$statusCode} Server Exception: {$exceptionText}\n{$responseBody}";
if ($statusCode === 400 && strpos($responseBody, "AlreadyExpiredException") !== false) {
throw new AlreadyExpiredException($exceptionText, $statusCode, $exception);
}
elseif ($statusCode === 403) {
throw new Forbidden403Exception($exceptionText, $statusCode, $exception);
}
elseif ($statusCode === 404) {
throw new Missing404Exception($exceptionText, $statusCode, $exception);
}
elseif ($statusCode === 409) {
throw new Conflict409Exception($exceptionText, $statusCode, $exception);
}
elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) {
throw new ScriptLangNotSupportedException($exceptionText . $statusCode);
}
}
private function logErrorDueToFailure(Request $request, \Exception $exception, $body) {
$response = $request
->getResponse();
$headers = $request
->getHeaders()
->getAll();
$this
->logRequestFail($request
->getMethod(), $request
->getUrl(), $response
->getInfo('total_time'), $headers, $response
->getStatusCode(), $body, $exception
->getMessage());
}
private function processCurlError(CurlException $exception) {
$error = 'Curl error: ' . $exception
->getMessage();
$this->log
->error($error);
$this
->throwCurlException($exception
->getErrorNo(), $exception
->getError());
}
private function processSuccessfulRequest(Request $request, $body) {
$response = $request
->getResponse();
$headers = $request
->getHeaders()
->getAll();
$this
->logRequestSuccess($request
->getMethod(), $request
->getUrl(), $body, $headers, $response
->getStatusCode(), $response
->getBody(true), $response
->getInfo('total_time'));
}
}