View source
<?php
namespace Drupal\elasticsearch_connector\Plugin\search_api\backend;
use Drupal\Component\Utility\Unicode;
use Drupal\Core\Config\Config;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Form\FormBuilderInterface;
use Drupal\Core\Form\FormStateInterface;
use Drupal\Core\Link;
use Drupal\Core\Url;
use Drupal\elasticsearch_connector\ClusterManager;
use Drupal\elasticsearch_connector\ElasticSearch\ClientManagerInterface;
use Drupal\elasticsearch_connector\ElasticSearch\Parameters\Factory\IndexFactory;
use Drupal\elasticsearch_connector\ElasticSearch\Parameters\Factory\SearchFactory;
use Drupal\elasticsearch_connector\Entity\Cluster;
use Drupal\elasticsearch_connector\Plugin\search_api\backend\SearchApiElasticsearchBackendInterface;
use Drupal\search_api\Backend\BackendPluginBase;
use Drupal\search_api\IndexInterface;
use Drupal\search_api\Query\QueryInterface;
use Drupal\search_api\Query\ResultSet;
use Drupal\search_api\Query\ResultSetInterface;
use Drupal\search_api\SearchApiException;
use Drupal\search_api_autocomplete\SearchApiAutocompleteSearchInterface;
use Drupal\search_api_autocomplete\SearchInterface;
use Drupal\search_api_autocomplete\Suggestion\SuggestionFactory;
use Elasticsearch\Common\Exceptions\ElasticsearchException;
use nodespark\DESConnector\Elasticsearch\Aggregations\Bucket\Terms;
use nodespark\DESConnector\Elasticsearch\Aggregations\Metrics\Stats;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Plugin\PluginFormInterface;
use Drupal\search_api\Plugin\PluginFormTrait;
class SearchApiElasticsearchBackend extends BackendPluginBase implements PluginFormInterface, SearchApiElasticsearchBackendInterface {
use PluginFormTrait;
const FACET_NO_LIMIT_SIZE = 10000;
const FUZZINESS_AUTO = 'auto';
protected $elasticsearchSettings;
protected $clusterId;
protected $cluster;
protected $client;
protected $formBuilder;
protected $moduleHandler;
protected $clientManager;
protected $clusterManager;
protected $entityTypeManager;
protected $logger;
protected $indexFactory;
public function __construct(array $configuration, $plugin_id, array $plugin_definition, FormBuilderInterface $form_builder, ModuleHandlerInterface $module_handler, ClientManagerInterface $client_manager, Config $elasticsearch_settings, LoggerInterface $logger, ClusterManager $cluster_manager, EntityTypeManagerInterface $entity_type_manager, IndexFactory $indexFactory) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->formBuilder = $form_builder;
$this->moduleHandler = $module_handler;
$this->clientManager = $client_manager;
$this->logger = $logger;
$this->elasticsearchSettings = $elasticsearch_settings;
$this->clusterManager = $cluster_manager;
$this->entityTypeManager = $entity_type_manager;
$this->indexFactory = $indexFactory;
if (empty($this->configuration['cluster_settings']['cluster'])) {
$this->configuration['cluster_settings']['cluster'] = $this->clusterManager
->getDefaultCluster();
}
$this->cluster = $this->entityTypeManager
->getStorage('elasticsearch_cluster')
->load($this->configuration['cluster_settings']['cluster']);
if (!isset($this->cluster)) {
throw new SearchApiException($this
->t('Cannot load the Elasticsearch cluster for your index.'));
}
$this->client = $this->clientManager
->getClientForCluster($this->cluster);
}
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static($configuration, $plugin_id, $plugin_definition, $container
->get('form_builder'), $container
->get('module_handler'), $container
->get('elasticsearch_connector.client_manager'), $container
->get('config.factory')
->get('elasticsearch.settings'), $container
->get('logger.factory')
->get('elasticconnector_sapi'), $container
->get('elasticsearch_connector.cluster_manager'), $container
->get('entity_type.manager'), $container
->get('elasticsearch_connector.index_factory'));
}
public function defaultConfiguration() {
return [
'cluster_settings' => [
'cluster' => '',
],
'scheme' => 'http',
'host' => 'localhost',
'port' => '9200',
'path' => '',
'excerpt' => FALSE,
'retrieve_data' => FALSE,
'highlight_data' => FALSE,
'http_method' => 'AUTO',
'autocorrect_spell' => TRUE,
'autocorrect_suggest_words' => TRUE,
'fuzziness' => self::FUZZINESS_AUTO,
];
}
public function buildConfigurationForm(array $form, FormStateInterface $form_state) {
if (!$this->server
->isNew()) {
$server_link = $this->cluster
->getSafeUrl();
$form['server_description'] = [
'#type' => 'item',
'#title' => $this
->t('Elasticsearch Cluster'),
'#description' => Link::fromTextAndUrl($server_link, Url::fromUri($server_link)),
];
}
$form['cluster_settings'] = [
'#type' => 'fieldset',
'#title' => t('Elasticsearch settings'),
];
$clusters = $this->clusterManager
->loadAllClusters(FALSE);
$options = [];
foreach ($clusters as $key => $cluster) {
$options[$key] = $cluster->cluster_id;
}
$options[$this->clusterManager
->getDefaultCluster()] = t('Default cluster: @name', [
'@name' => $this->clusterManager
->getDefaultCluster(),
]);
$form['cluster_settings']['cluster'] = [
'#type' => 'select',
'#title' => t('Cluster'),
'#required' => TRUE,
'#options' => $options,
'#default_value' => $this->configuration['cluster_settings']['cluster'] ? $this->configuration['cluster_settings']['cluster'] : '',
'#description' => t('Select the cluster you want to handle the connections.'),
];
$fuzziness_options = [
'' => $this
->t('- Disabled -'),
self::FUZZINESS_AUTO => self::FUZZINESS_AUTO,
];
$fuzziness_options += array_combine(range(0, 5), range(0, 5));
$form['fuzziness'] = [
'#type' => 'select',
'#title' => t('Fuzziness'),
'#required' => TRUE,
'#options' => $fuzziness_options,
'#default_value' => $this->configuration['fuzziness'],
'#description' => $this
->t('Some queries and APIs support parameters to allow inexact fuzzy matching, using the fuzziness parameter. See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/5.6/common-options.html#fuzziness">Fuzziness</a> for more information.'),
];
return $form;
}
public function getSupportedFeatures() {
return [
'search_api_autocomplete',
'search_api_facets',
'search_api_facets_operator_or',
'search_api_grouping',
'search_api_mlt',
];
}
public function getSupportedDataTypes() {
$data_types = [
'object',
'location',
];
$this->moduleHandler
->alter('elasticsearch_connector_supported_data_types', $data_types);
return $data_types;
}
public function viewSettings() {
$info = [];
$server_link = $this->cluster
->getSafeUrl();
$info[] = [
'label' => $this
->t('Elasticsearch server URI'),
'info' => Link::fromTextAndUrl($server_link, Url::fromUri($server_link)),
];
if ($this->server
->status()) {
$ping = $this->client
->isClusterOk();
if ($ping) {
$msg = $this
->t('The Elasticsearch server could be reached');
}
else {
$msg = $this
->t('The Elasticsearch server could not be reached. Further data is therefore unavailable.');
}
$info[] = [
'label' => $this
->t('Connection'),
'info' => $msg,
'status' => $ping ? 'ok' : 'error',
];
}
return $info;
}
public function getCluster() {
return $this->configuration['cluster_settings']['cluster'];
}
public function getFuzziness() {
return $this->configuration['fuzziness'];
}
public function addIndex(IndexInterface $index) {
$this
->updateIndex($index);
}
public function updateIndex(IndexInterface $index) {
try {
if (!$this->client
->indices()
->exists($this->indexFactory
->index($index))) {
$response = $this->client
->indices()
->create($this->indexFactory
->create($index));
if (!$this->client
->CheckResponseAck($response)) {
drupal_set_message($this
->t('The elasticsearch client was not able to create index'), 'error');
}
}
$this
->fieldsUpdated($index);
} catch (ElasticsearchException $e) {
drupal_set_message($e
->getMessage(), 'error');
}
}
public function fieldsUpdated(IndexInterface $index) {
$params = $this->indexFactory
->index($index, TRUE);
try {
if ($this->client
->indices()
->existsType($params)) {
$current_mapping = $this->client
->indices()
->getMapping($params);
if (!empty($current_mapping)) {
try {
$this->client
->indices()
->deleteMapping($params);
} catch (ElasticsearchException $e) {
$this
->removeIndex($index);
$this
->addIndex($index);
}
}
}
$response = $this->client
->indices()
->putMapping($this->indexFactory
->mapping($index));
if (!$this->client
->CheckResponseAck($response)) {
drupal_set_message(t('Cannot create the mapping of the fields!'), 'error');
}
} catch (ElasticsearchException $e) {
drupal_set_message($e
->getMessage(), 'error');
return FALSE;
}
return TRUE;
}
public function removeIndex($index) {
$params = $this->indexFactory
->index($index);
try {
if ($this->client
->indices()
->exists($params)) {
$this->client
->indices()
->delete($params);
}
} catch (ElasticsearchException $e) {
drupal_set_message($e
->getMessage(), 'error');
}
}
public function indexItems(IndexInterface $index, array $items) {
$elastic_type_exists = $this
->doesTypeExists($index);
if (empty($elastic_type_exists) || empty($items)) {
return array();
}
try {
$response = $this->client
->bulk($this->indexFactory
->bulkIndex($index, $items));
if (!empty($response['errors'])) {
foreach ($response['items'] as $item) {
if (!empty($item['index']['status']) && $item['index']['status'] == '400') {
$this->logger
->error('%reason. %caused_by for id: %id', [
'%reason' => $item['index']['error']['reason'],
'%caused_by' => $item['index']['error']['caused_by']['reason'],
'%id' => $item['index']['_id'],
]);
}
}
throw new SearchApiException($this
->t('An error occurred during indexing. Check your watchdog for more information.'));
}
} catch (ElasticsearchException $e) {
drupal_set_message($e
->getMessage(), 'error');
}
return array_keys($items);
}
public function deleteAllIndexItems(IndexInterface $index, $datasource_id = NULL) {
$this
->removeIndex($index);
$this
->addIndex($index);
}
public function deleteItems(IndexInterface $index = NULL, array $ids) {
if (!count($ids)) {
return;
}
try {
$this->client
->bulk($this->indexFactory
->bulkDelete($index, $ids));
} catch (ElasticsearchException $e) {
drupal_set_message($e
->getMessage(), 'error');
}
}
public function getAutocompleteSuggestions(QueryInterface $query, SearchInterface $search, $incomplete_key, $user_input) {
try {
$fields = $this
->getQueryFulltextFields($query);
if (count($fields) > 1) {
throw new \LogicException('Elasticsearch requires a single fulltext field for use with autocompletion! Please adjust your configuration.');
}
$query
->setOption('autocomplete', $incomplete_key);
$query
->setOption('autocomplete_field', reset($fields));
$query
->setOption('search_api_facets', FALSE);
$result = $this
->search($query);
$query
->postExecute($result);
$suggestions = [];
$factory = new SuggestionFactory($user_input);
$response = $result
->getExtraData('elasticsearch_response');
if (isset($response['aggregations']['autocomplete']['buckets'])) {
$suffix_start = strlen($user_input);
$buckets = $response['aggregations']['autocomplete']['buckets'];
foreach ($buckets as $bucket) {
$suggestions[] = $factory
->createFromSuggestionSuffix(substr($bucket['key'], $suffix_start), $bucket['doc_count']);
}
}
return $suggestions;
} catch (\Exception $e) {
$this->logger
->error($e
->getMessage());
return array();
}
}
public function search(QueryInterface $query) {
$search_result = $query
->getResults();
$index = $query
->getIndex();
$params = $this->indexFactory
->index($index, TRUE);
if (!$this->client
->indices()
->existsType($params)) {
return $search_result;
}
if ($query
->getOption('search_api_facets')) {
$this
->addFacets($query);
}
$params = SearchFactory::search($query);
if ($incomplete_key = $query
->getOption('autocomplete')) {
$incomplete_key = strtolower($incomplete_key);
$params['body']['aggs']['autocomplete']['terms'] = [
'field' => $query
->getOption('autocomplete_field'),
'include' => $incomplete_key . '.*',
];
}
try {
$this
->preQuery($query);
$response = $this->client
->search($params)
->getRawResponse();
$results = SearchFactory::parseResult($query, $response);
if ($query
->getOption('search_api_facets')) {
$this
->parseFacets($results, $query);
}
$this
->postQuery($results, $query, $response);
return $results;
} catch (\Exception $e) {
watchdog_exception('Elasticsearch API', $e);
return $search_result;
}
}
public function isAvailable() {
return $this->client
->isClusterOk();
}
protected function addFacets(QueryInterface $query) {
foreach ($query
->getOption('search_api_facets') as $key => $facet) {
$facet += [
'type' => NULL,
];
$object = NULL;
switch ($facet['type']) {
case 'stats':
$object = new Stats($key, $key);
break;
default:
$object = new Terms($key, $key);
$size = $facet['limit'] ? $facet['limit'] : self::FACET_NO_LIMIT_SIZE;
$object
->setSize($size);
if ($facet['operator'] == 'or') {
$object
->setGlobalScope(TRUE);
}
}
if (!empty($object)) {
$this->client
->aggregations()
->setAggregation($object);
}
}
}
protected function parseFacets(ResultSet $results, QueryInterface $query) {
$response = $results
->getExtraData('elasticsearch_response');
$facets = $query
->getOption('search_api_facets');
$attach = [];
foreach ($facets as $key => $facet) {
$terms = [];
if ($facet['operator'] == 'and' || $facet['operator'] == 'or' && !isset($response['aggregations'][$key . '_global'])) {
if (!empty($facet['type']) && $facet['type'] == 'stats') {
$terms = $response['aggregations'][$key];
}
else {
$buckets = $response['aggregations'][$key]['buckets'];
array_walk($buckets, function ($value) use (&$terms) {
$terms[] = [
'count' => $value['doc_count'],
'filter' => '"' . $value['key'] . '"',
];
});
}
}
elseif ($facet['operator'] == 'or') {
if (!empty($facet['type']) && $facet['type'] == 'stats') {
$terms = $response['aggregations'][$key . '_global'];
}
else {
$buckets = $response['aggregations'][$key . '_global'][$key]['buckets'];
array_walk($buckets, function ($value) use (&$terms) {
$terms[] = [
'count' => $value['doc_count'],
'filter' => '"' . $value['key'] . '"',
];
});
}
}
$attach[$key] = $terms;
}
$results
->setExtraData('search_api_facets', $attach);
}
protected function doesTypeExists(IndexInterface $index) {
$params = $this->indexFactory
->index($index, TRUE);
try {
return $this->client
->indices()
->existsType($params);
} catch (ElasticsearchException $e) {
drupal_set_message($e
->getMessage(), 'error');
return FALSE;
}
}
protected function getIndexId($machine_name) {
$id = $this->elasticsearchSettings
->get('index_prefix_' . $machine_name) . $machine_name;
$id = $this->elasticsearchSettings
->get('index_prefix') . $id;
return $id;
}
protected static function getDateGap($min, $max, $timestamp = TRUE) {
if ($timestamp !== TRUE) {
$min = strtotime($min);
$max = strtotime($max);
}
if (empty($min) || empty($max)) {
return 'DAY';
}
$diff = $max - $min;
switch (TRUE) {
case $diff > 86400 * 365:
return 'NONE';
case $diff > 86400 * gmdate('t', $min):
return 'YEAR';
case $diff > 86400:
return 'MONTH';
default:
return 'DAY';
}
}
protected function addSearchFacets(array &$params, QueryInterface $query) {
$facets = $query
->getOption('search_api_facets');
$index_fields = $this
->getIndexFields($query);
if (!empty($facets)) {
foreach ($facets as $facet_id => $facet_info) {
$field_id = $facet_info['field'];
$facet = [
$field_id => [],
];
if (!isset($index_fields[$field_id])) {
continue;
}
$field_type = search_api_extract_inner_type($index_fields[$field_id]['type']);
if ($field_type === 'date') {
$facet_type = 'date_histogram';
$facet[$field_id] = $this
->createDateFieldFacet($field_id, $facet);
}
else {
$facet_type = 'terms';
$facet[$field_id][$facet_type]['all_terms'] = (bool) $facet_info['missing'];
}
if (!empty($facet[$field_id])) {
$facet_info['facet_type'] = $facet_type;
$facet[$field_id] = $this
->addFacetOptions($facet[$field_id], $query, $facet_info);
}
$params['body']['facets'][$field_id] = $facet[$field_id];
}
}
}
protected function addFacetOptions(array &$facet, QueryInterface $query, $facet_info) {
$facet_limit = $this
->getFacetLimit($facet_info);
$facet_search_filter = $this
->getFacetSearchFilter($query, $facet_info);
$facet[$facet_info['facet_type']]['field'] = $facet_info['field'];
if (!empty($facet_search_filter)) {
$facet['facet_filter'] = $facet_search_filter;
}
if ($facet_limit > 0 && $facet_info['facet_type'] == 'terms') {
$facet[$facet_info['facet_type']]['size'] = $facet_limit;
}
return $facet;
}
protected function getFacetSearchFilter(QueryInterface $query, array $facet_info) {
$index_fields = $this
->getIndexFields($query);
if (isset($facet_info['operator']) && Unicode::strtolower($facet_info['operator']) == 'or') {
$facet_search_filter = $this
->parseConditionGroup($query
->getConditionGroup(), $index_fields, $facet_info['field']);
if (!empty($facet_search_filter)) {
$facet_search_filter = $facet_search_filter[0];
}
}
else {
$facet_search_filter = $this
->parseConditionGroup($query
->getConditionGroup(), $index_fields);
if (!empty($facet_search_filter)) {
$facet_search_filter = $facet_search_filter[0];
}
}
return $facet_search_filter;
}
protected function createDateFieldFacet($facet_id, array $facet) {
$result = $facet[$facet_id];
$date_interval = $this
->getDateFacetInterval($facet_id);
$result['date_histogram']['interval'] = $date_interval;
$result['date_histogram']['time_zone'] = 'UTC';
$result['date_histogram']['factor'] = 1000;
return $result;
}
protected function getFacetLimit(array $facet_info) {
$facet_limit = !empty($facet_info['limit']) ? $facet_info['limit'] : -1;
if ($facet_limit < 0) {
$facet_limit = $this
->getOption('facet_limit', 10);
}
return $facet_limit;
}
protected function getDateFacetInterval($facet_id) {
$searcher = key(facetapi_get_active_searchers());
$adapter = isset($searcher) ? facetapi_adapter_load($searcher) : NULL;
$date_gap = $this
->getDateGranularity($adapter, $facet_id);
switch ($date_gap) {
case 'YEAR':
$date_interval = 'month';
break;
case 'MONTH':
$date_interval = 'day';
break;
case 'DAY':
$date_interval = 'hour';
break;
default:
$date_interval = 'year';
}
return $date_interval;
}
public function getDateGranularity($adapter, $facet_id) {
$gap_weight = [
'YEAR' => 2,
'MONTH' => 1,
'DAY' => 0,
];
$gaps = [];
$date_gap = 'YEAR';
if (isset($adapter)) {
$active_items = $adapter
->getActiveItems([
'name' => $facet_id,
]);
if (!empty($active_items)) {
foreach ($active_items as $active_item) {
$value = $active_item['value'];
if (strpos($value, ' TO ') > 0) {
list($date_min, $date_max) = explode(' TO ', str_replace([
'[',
']',
], '', $value), 2);
$gap = self::getDateGap($date_min, $date_max, FALSE);
if (isset($gap_weight[$gap])) {
$gaps[] = $gap_weight[$gap];
}
}
}
if (!empty($gaps)) {
$date_gap = array_search(min($gaps), $gap_weight);
}
}
}
return $date_gap;
}
protected function parseSearchFacets(array $response, QueryInterface $query) {
$result = [];
$index_fields = $this
->getIndexFields($query);
$facets = $query
->getOption('search_api_facets');
if (!empty($facets) && isset($response['facets'])) {
foreach ($response['facets'] as $facet_id => $facet_data) {
if (isset($facets[$facet_id])) {
$facet_info = $facets[$facet_id];
$facet_min_count = $facet_info['min_count'];
$field_id = $facet_info['field'];
$field_type = search_api_extract_inner_type($index_fields[$field_id]['type']);
if ($field_type === 'date') {
foreach ($facet_data['entries'] as $entry) {
if ($entry['count'] >= $facet_min_count) {
$result[$facet_id][] = [
'count' => $entry['count'],
'filter' => '"' . $entry['time'] / 1000 . '"',
];
}
}
}
else {
foreach ($facet_data['terms'] as $term) {
if ($term['count'] >= $facet_min_count) {
$result[$facet_id][] = [
'count' => $term['count'],
'filter' => '"' . $term['term'] . '"',
];
}
}
}
}
}
}
return $result;
}
protected function preQuery(QueryInterface $query) {
}
protected function postQuery(ResultSetInterface $results, QueryInterface $query, $response) {
}
public function supportsDataType($type) {
$data_types = $this
->getSupportedDataTypes();
return in_array($type, $data_types);
}
public function __sleep() {
return array();
}
}