You are here

class Apache_Solr_Service_Balancer in Apache Solr Search 5

Reference Implementation for using multiple Solr services in a distribution. Functionality includes: routing of read / write operations failover (on selection) for multiple read servers

Hierarchy

Expanded class hierarchy of Apache_Solr_Service_Balancer

File

SolrPhpClient/Apache/Solr/Service/Balancer.php, line 31

View source
class Apache_Solr_Service_Balancer {
  protected $_createDocuments = true;
  protected $_readableServices = array();
  protected $_writeableServices = array();
  protected $_currentReadService = null;
  protected $_currentWriteService = null;
  protected $_readPingTimeout = 2;
  protected $_writePingTimeout = 4;

  // Configuration for server selection backoff intervals
  protected $_useBackoff = false;

  // Set to true to use more resillient write server selection
  protected $_backoffLimit = 600;

  // 10 minute default maximum
  protected $_backoffEscalation = 2.0;

  // Rate at which to increase backoff period
  protected $_defaultBackoff = 2.0;

  // Default backoff interval

  /**
   * Escape a value for special query characters such as ':', '(', ')', '*', '?', etc.
   *
   * NOTE: inside a phrase fewer characters need escaped, use {@link Apache_Solr_Service::escapePhrase()} instead
   *
   * @param string $value
   * @return string
   */
  public static function escape($value) {
    return Apache_Solr_Service::escape($value);
  }

  /**
   * Escape a value meant to be contained in a phrase for special query characters
   *
   * @param string $value
   * @return string
   */
  public static function escapePhrase($value) {
    return Apache_Solr_Service::escapePhrase($value);
  }

  /**
   * Convenience function for creating phrase syntax from a value
   *
   * @param string $value
   * @return string
   */
  public static function phrase($value) {
    return Apache_Solr_Service::phrase($value);
  }

  /**
   * Constructor. Takes arrays of read and write service instances or descriptions
   *
   * @param array $readableServices
   * @param array $writeableServices
   */
  public function __construct($readableServices = array(), $writeableServices = array()) {

    //setup readable services
    foreach ($readableServices as $service) {
      $this
        ->addReadService($service);
    }

    //setup writeable services
    foreach ($writeableServices as $service) {
      $this
        ->addWriteService($service);
    }
  }
  public function setReadPingTimeout($timeout) {
    $this->_readPingTimeout = $timeout;
  }
  public function setWritePingTimeout($timeout) {
    $this->_writePingTimeout = $timeout;
  }
  public function setUseBackoff($enable) {
    $this->_useBackoff = $enable;
  }

  /**
   * Generates a service ID
   *
   * @param string $host
   * @param integer $port
   * @param string $path
   * @return string
   */
  protected function _getServiceId($host, $port, $path) {
    return $host . ':' . $port . $path;
  }

  /**
   * Adds a service instance or service descriptor (if it is already
   * not added)
   *
   * @param mixed $service
   *
   * @throws Exception If service descriptor is not valid
   */
  public function addReadService($service) {
    if ($service instanceof Apache_Solr_Service) {
      $id = $this
        ->_getServiceId($service
        ->getHost(), $service
        ->getPort(), $service
        ->getPath());
      $this->_readableServices[$id] = $service;
    }
    else {
      if (is_array($service)) {
        if (isset($service['host']) && isset($service['port']) && isset($service['path'])) {
          $id = $this
            ->_getServiceId((string) $service['host'], (int) $service['port'], (string) $service['path']);
          $this->_readableServices[$id] = $service;
        }
        else {
          throw new Exception('A Readable Service description array does not have all required elements of host, port, and path');
        }
      }
    }
  }

  /**
   * Removes a service instance or descriptor from the available services
   *
   * @param mixed $service
   *
   * @throws Exception If service descriptor is not valid
   */
  public function removeReadService($service) {
    $id = '';
    if ($service instanceof Apache_Solr_Service) {
      $id = $this
        ->_getServiceId($service
        ->getHost(), $service
        ->getPort(), $service
        ->getPath());
    }
    else {
      if (is_array($service)) {
        if (isset($service['host']) && isset($service['port']) && isset($service['path'])) {
          $id = $this
            ->_getServiceId((string) $service['host'], (int) $service['port'], (string) $service['path']);
        }
        else {
          throw new Exception('A Readable Service description array does not have all required elements of host, port, and path');
        }
      }
      else {
        if (is_string($service)) {
          $id = $service;
        }
      }
    }
    if ($id && isset($this->_readableServices[$id])) {
      unset($this->_readableServices[$id]);
    }
  }

  /**
   * Adds a service instance or service descriptor (if it is already
   * not added)
   *
   * @param mixed $service
   *
   * @throws Exception If service descriptor is not valid
   */
  public function addWriteService($service) {
    if ($service instanceof Apache_Solr_Service) {
      $id = $this
        ->_getServiceId($service
        ->getHost(), $service
        ->getPort(), $service
        ->getPath());
      $this->_writeableServices[$id] = $service;
    }
    else {
      if (is_array($service)) {
        if (isset($service['host']) && isset($service['port']) && isset($service['path'])) {
          $id = $this
            ->_getServiceId((string) $service['host'], (int) $service['port'], (string) $service['path']);
          $this->_writeableServices[$id] = $service;
        }
        else {
          throw new Exception('A Writeable Service description array does not have all required elements of host, port, and path');
        }
      }
    }
  }

  /**
   * Removes a service instance or descriptor from the available services
   *
   * @param mixed $service
   *
   * @throws Exception If service descriptor is not valid
   */
  public function removeWriteService($service) {
    $id = '';
    if ($service instanceof Apache_Solr_Service) {
      $id = $this
        ->_getServiceId($service
        ->getHost(), $service
        ->getPort(), $service
        ->getPath());
    }
    else {
      if (is_array($service)) {
        if (isset($service['host']) && isset($service['port']) && isset($service['path'])) {
          $id = $this
            ->_getServiceId((string) $service['host'], (int) $service['port'], (string) $service['path']);
        }
        else {
          throw new Exception('A Readable Service description array does not have all required elements of host, port, and path');
        }
      }
      else {
        if (is_string($service)) {
          $id = $service;
        }
      }
    }
    if ($id && isset($this->_writeableServices[$id])) {
      unset($this->_writeableServices[$id]);
    }
  }

  /**
   * Iterate through available read services and select the first with a ping
   * that satisfies configured timeout restrictions (or the default)
   *
   * @return Apache_Solr_Service
   *
   * @throws Exception If there are no read services that meet requirements
   */
  protected function _selectReadService($forceSelect = false) {
    if (!$this->_currentReadService || !isset($this->_readableServices[$this->_currentReadService]) || $forceSelect) {
      if ($this->_currentReadService && isset($this->_readableServices[$this->_currentReadService]) && $forceSelect) {

        // we probably had a communication error, ping the current read service, remove it if it times out
        if ($this->_readableServices[$this->_currentReadService]
          ->ping($this->_readPingTimeout) === false) {
          $this
            ->removeReadService($this->_currentReadService);
        }
      }
      if (count($this->_readableServices)) {

        // select one of the read services at random
        $ids = array_keys($this->_readableServices);
        $id = $ids[rand(0, count($ids) - 1)];
        $service = $this->_readableServices[$id];
        if (is_array($service)) {

          //convert the array definition to a client object
          $service = new Apache_Solr_Service($service['host'], $service['port'], $service['path']);
          $this->_readableServices[$id] = $service;
        }
        $service
          ->setCreateDocuments($this->_createDocuments);
        $this->_currentReadService = $id;
      }
      else {
        throw new Exception('No read services were available');
      }
    }
    return $this->_readableServices[$this->_currentReadService];
  }

  /**
   * Iterate through available write services and select the first with a ping
   * that satisfies configured timeout restrictions (or the default)
   *
   * @return Apache_Solr_Service
   *
   * @throws Exception If there are no write services that meet requirements
   */
  protected function _selectWriteService($forceSelect = false) {
    if ($this->_useBackoff) {
      return $this
        ->_selectWriteServiceSafe($forceSelect);
    }
    if (!$this->_currentWriteService || !isset($this->_writeableServices[$this->_currentWriteService]) || $forceSelect) {
      if ($this->_currentWriteService && isset($this->_writeableServices[$this->_currentWriteService]) && $forceSelect) {

        // we probably had a communication error, ping the current read service, remove it if it times out
        if ($this->_writeableServices[$this->_currentWriteService]
          ->ping($this->_writePingTimeout) === false) {
          $this
            ->removeWriteService($this->_currentWriteService);
        }
      }
      if (count($this->_writeableServices)) {

        // select one of the read services at random
        $ids = array_keys($this->_writeableServices);
        $id = $ids[rand(0, count($ids) - 1)];
        $service = $this->_writeableServices[$id];
        if (is_array($service)) {

          //convert the array definition to a client object
          $service = new Apache_Solr_Service($service['host'], $service['port'], $service['path']);
          $this->_writeableServices[$id] = $service;
        }
        $this->_currentWriteService = $id;
      }
      else {
        throw new Exception('No write services were available');
      }
    }
    return $this->_writeableServices[$this->_currentWriteService];
  }

  /**
   * Iterate through available write services and select the first with a ping
   * that satisfies configured timeout restrictions (or the default).  The
   * timeout period will increase until a connection is made or the limit is
   * reached.   This will allow for increased reliability with heavily loaded
   * server(s).
   *
   * @return Apache_Solr_Service
   *
   * @throws Exception If there are no write services that meet requirements
   */
  protected function _selectWriteServiceSafe($forceSelect = false) {
    if (!$this->_currentWriteService || !isset($this->_writeableServices[$this->_currentWriteService]) || $forceSelect) {
      if (count($this->_writeableServices)) {
        $backoff = $this->_defaultBackoff;
        do {

          // select one of the read services at random
          $ids = array_keys($this->_writeableServices);
          $id = $ids[rand(0, count($ids) - 1)];
          $service = $this->_writeableServices[$id];
          if (is_array($service)) {

            //convert the array definition to a client object
            $service = new Apache_Solr_Service($service['host'], $service['port'], $service['path']);
            $this->_writeableServices[$id] = $service;
          }
          $this->_currentWriteService = $id;
          $backoff *= $this->_backoffEscalation;
          if ($backoff > $this->_backoffLimit) {
            throw new Exception('No write services were available.  All timeouts exceeded.');
          }
        } while ($this->_writeableServices[$this->_currentWriteService]
          ->ping($backoff) === false);
      }
      else {
        throw new Exception('No write services were available');
      }
    }
    return $this->_writeableServices[$this->_currentWriteService];
  }
  public function setCreateDocuments($createDocuments) {
    $this->_createDocuments = (bool) $createDocuments;
    if ($this->_currentReadService) {
      $service = $this
        ->_selectReadService();
      $service
        ->setCreateDocuments($createDocuments);
    }
  }
  public function getCreateDocuments() {
    return $this->_createDocuments;
  }

  /**
   * Raw Add Method. Takes a raw post body and sends it to the update service.  Post body
   * should be a complete and well formed "add" xml document.
   *
   * @param string $rawPost
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function add($rawPost) {
    $service = $this
      ->_selectWriteService();
    do {
      try {
        return $service
          ->add($rawPost);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectWriteService(true);
    } while ($service);
    return false;
  }

  /**
   * Add a Solr Document to the index
   *
   * @param Apache_Solr_Document $document
   * @param boolean $allowDups
   * @param boolean $overwritePending
   * @param boolean $overwriteCommitted
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function addDocument(Apache_Solr_Document $document, $allowDups = false, $overwritePending = true, $overwriteCommitted = true) {
    $service = $this
      ->_selectWriteService();
    do {
      try {
        return $service
          ->addDocument($document, $allowDups, $overwritePending, $overwriteCommitted);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectWriteService(true);
    } while ($service);
    return false;
  }

  /**
   * Add an array of Solr Documents to the index all at once
   *
   * @param array $documents Should be an array of Apache_Solr_Document instances
   * @param boolean $allowDups
   * @param boolean $overwritePending
   * @param boolean $overwriteCommitted
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function addDocuments($documents, $allowDups = false, $overwritePending = true, $overwriteCommitted = true) {
    $service = $this
      ->_selectWriteService();
    do {
      try {
        return $service
          ->addDocuments($documents, $allowDups, $overwritePending, $overwriteCommitted);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectWriteService(true);
    } while ($service);
    return false;
  }

  /**
   * Send a commit command.  Will be synchronous unless both wait parameters are set
   * to false.
   *
   * @param boolean $waitFlush
   * @param boolean $waitSearcher
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function commit($optimize = true, $waitFlush = true, $waitSearcher = true, $timeout = 3600) {
    $service = $this
      ->_selectWriteService();
    do {
      try {
        return $service
          ->commit($optimize, $waitFlush, $waitSearcher, $timeout);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectWriteService(true);
    } while ($service);
    return false;
  }

  /**
   * Raw Delete Method. Takes a raw post body and sends it to the update service. Body should be
   * a complete and well formed "delete" xml document
   *
   * @param string $rawPost
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function delete($rawPost) {
    $service = $this
      ->_selectWriteService();
    do {
      try {
        return $service
          ->delete($rawPost);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectWriteService(true);
    } while ($service);
    return false;
  }

  /**
   * Create a delete document based on document ID
   *
   * @param string $id
   * @param boolean $fromPending
   * @param boolean $fromCommitted
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function deleteById($id, $fromPending = true, $fromCommitted = true) {
    $service = $this
      ->_selectWriteService();
    do {
      try {
        return $service
          ->deleteById($id, $fromPending, $fromCommitted);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectWriteService(true);
    } while ($service);
    return false;
  }

  /**
   * Create a delete document based on a query and submit it
   *
   * @param string $rawQuery
   * @param boolean $fromPending
   * @param boolean $fromCommitted
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function deleteByQuery($rawQuery, $fromPending = true, $fromCommitted = true) {
    $service = $this
      ->_selectWriteService();
    do {
      try {
        return $service
          ->deleteByQuery($rawQuery, $fromPending, $fromCommitted);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectWriteService(true);
    } while ($service);
    return false;
  }

  /**
   * Send an optimize command.  Will be synchronous unless both wait parameters are set
   * to false.
   *
   * @param boolean $waitFlush
   * @param boolean $waitSearcher
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function optimize($waitFlush = true, $waitSearcher = true) {
    $service = $this
      ->_selectWriteService();
    do {
      try {
        return $service
          ->optimize($waitFlush, $waitSearcher);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectWriteService(true);
    } while ($service);
    return false;
  }

  /**
   * Simple Search interface
   *
   * @param string $query The raw query string
   * @param int $offset The starting offset for result documents
   * @param int $limit The maximum number of result documents to return
   * @param array $params key / value pairs for query parameters, use arrays for multivalued parameters
   * @return Apache_Solr_Response
   *
   * @throws Exception If an error occurs during the service call
   */
  public function search($query, $offset = 0, $limit = 10, $params = array()) {
    $service = $this
      ->_selectReadService();
    do {
      try {
        return $service
          ->search($query, $offset, $limit, $params);
      } catch (Exception $e) {
        if ($e
          ->getCode() != 0) {

          //IF NOT COMMUNICATION ERROR
          throw $e;
        }
      }
      $service = $this
        ->_selectReadService(true);
    } while ($service);
    return false;
  }

}

Members

Namesort descending Modifiers Type Description Overrides
Apache_Solr_Service_Balancer::$_backoffEscalation protected property
Apache_Solr_Service_Balancer::$_backoffLimit protected property
Apache_Solr_Service_Balancer::$_createDocuments protected property
Apache_Solr_Service_Balancer::$_currentReadService protected property
Apache_Solr_Service_Balancer::$_currentWriteService protected property
Apache_Solr_Service_Balancer::$_defaultBackoff protected property
Apache_Solr_Service_Balancer::$_readableServices protected property
Apache_Solr_Service_Balancer::$_readPingTimeout protected property
Apache_Solr_Service_Balancer::$_useBackoff protected property
Apache_Solr_Service_Balancer::$_writeableServices protected property
Apache_Solr_Service_Balancer::$_writePingTimeout protected property
Apache_Solr_Service_Balancer::add public function Raw Add Method. Takes a raw post body and sends it to the update service. Post body should be a complete and well formed "add" xml document.
Apache_Solr_Service_Balancer::addDocument public function Add a Solr Document to the index
Apache_Solr_Service_Balancer::addDocuments public function Add an array of Solr Documents to the index all at once
Apache_Solr_Service_Balancer::addReadService public function Adds a service instance or service descriptor (if it is already not added)
Apache_Solr_Service_Balancer::addWriteService public function Adds a service instance or service descriptor (if it is already not added)
Apache_Solr_Service_Balancer::commit public function Send a commit command. Will be synchronous unless both wait parameters are set to false.
Apache_Solr_Service_Balancer::delete public function Raw Delete Method. Takes a raw post body and sends it to the update service. Body should be a complete and well formed "delete" xml document
Apache_Solr_Service_Balancer::deleteById public function Create a delete document based on document ID
Apache_Solr_Service_Balancer::deleteByQuery public function Create a delete document based on a query and submit it
Apache_Solr_Service_Balancer::escape public static function Escape a value for special query characters such as ':', '(', ')', '*', '?', etc.
Apache_Solr_Service_Balancer::escapePhrase public static function Escape a value meant to be contained in a phrase for special query characters
Apache_Solr_Service_Balancer::getCreateDocuments public function
Apache_Solr_Service_Balancer::optimize public function Send an optimize command. Will be synchronous unless both wait parameters are set to false.
Apache_Solr_Service_Balancer::phrase public static function Convenience function for creating phrase syntax from a value
Apache_Solr_Service_Balancer::removeReadService public function Removes a service instance or descriptor from the available services
Apache_Solr_Service_Balancer::removeWriteService public function Removes a service instance or descriptor from the available services
Apache_Solr_Service_Balancer::search public function Simple Search interface
Apache_Solr_Service_Balancer::setCreateDocuments public function
Apache_Solr_Service_Balancer::setReadPingTimeout public function
Apache_Solr_Service_Balancer::setUseBackoff public function
Apache_Solr_Service_Balancer::setWritePingTimeout public function
Apache_Solr_Service_Balancer::_getServiceId protected function Generates a service ID
Apache_Solr_Service_Balancer::_selectReadService protected function Iterate through available read services and select the first with a ping that satisfies configured timeout restrictions (or the default)
Apache_Solr_Service_Balancer::_selectWriteService protected function Iterate through available write services and select the first with a ping that satisfies configured timeout restrictions (or the default)
Apache_Solr_Service_Balancer::_selectWriteServiceSafe protected function Iterate through available write services and select the first with a ping that satisfies configured timeout restrictions (or the default). The timeout period will increase until a connection is made or the limit is reached. This will allow for…
Apache_Solr_Service_Balancer::__construct public function Constructor. Takes arrays of read and write service instances or descriptions