You are here

function apdqc_query in Asynchronous Prefetch Database Query Cache 7

Runs a query in the database.

Parameters

array $tables: Array of tables accessed in this query.

array $cids: Array of cache ids used in this query.

string $query: The query to run.

array $options: Various options to control how this query is ran. async: TRUE to make query be asynchronous & non blocking. fetch_all: TRUE to return fetch_all(MYSQLI_ASSOC) on the query result. log: FALSE to not log this query inside of devel. get_affected_rows: TRUE returns the affected rows. get_mysqli: TRUE returns the $mysqli object.

Return value

mixed Returns the string "NO DB" when it can't connect to the database. Returns an int when get_affected_rows is set to TRUE. Returns the mysqli object when get_mysqli is set to TRUE. Returns the result array when fetch_all is set to TRUE. Returns -1 when async is set to TRUE.

21 calls to apdqc_query()
apdqc.session.inc in ./apdqc.session.inc
User session handling functions.
APDQCache::clear in ./apdqc.cache.inc
Implements DrupalCacheInterface::clear().
APDQCache::garbageCollection in ./apdqc.cache.inc
Generic garbage collection method.
APDQCache::getMultiple in ./apdqc.cache.inc
Implements DrupalCacheInterface::getMultiple().
APDQCache::isEmpty in ./apdqc.cache.inc
Implements DrupalCacheInterface::isEmpty().

... See full list

3 string references to 'apdqc_query'
apdqc_admin_change_table_collation_queries in ./apdqc.admin.inc
Convert the table to the specified collation.
apdqc_convert_cache_index in ./apdqc.admin.inc
Converts a database index from one form to another.
apdqc_requirements in ./apdqc.install
Implements hook_requirements().

File

./apdqc.mysql.inc, line 699
APDQC Database interface code for MySQL database servers.

Code

function apdqc_query(array $tables, array $cids, $query, array $options = array()) {

  // Add in defaults.
  $options += array(
    'async' => FALSE,
    'fetch_all' => FALSE,
    'log' => TRUE,
    'get_affected_rows' => FALSE,
    'get_mysqli' => FALSE,
  );
  if (!empty($options['log']) && variable_get('devel_query_display', 0)) {
    $logger = Database::getConnection()
      ->getLogger();
  }

  // Start timer if DB logger is enabled.
  if (!empty($logger)) {
    $query_start = microtime(TRUE);
  }
  $mysqli = apdqc_get_db_object($tables, $cids, $options);
  if (empty($mysqli)) {
    return "NO DB";
  }
  if (strpos($query, 'SELECT ') === 0) {

    // Get the mysql database type.
    static $mysql_db_type;
    if (!isset($mysql_db_type)) {
      $mysql_db_type =& drupal_static('apdqc_mysql_db_type');
      if (stripos($mysql_db_type, 'mariadb') !== FALSE) {
        static $version_alt;
        $version_alt = $mysqli->server_info;
        $pos_a = stripos($version_alt, 'mariadb') - 7;
        $pos_b = stripos($version_alt, '-');
        if ($pos_a !== FALSE && $pos_b !== FALSE && $pos_a > $pos_b) {
          $version_alt = substr($version_alt, $pos_b + 1, $pos_a);
        }
      }
    }

    // Use a SELECT statement timeout if this is MySQL 5.7.4 or higher.
    $alter_select = FALSE;
    if ($mysqli->server_version >= 50704 && $mysqli->server_version < 50708) {
      $alter_select = TRUE;
    }
    elseif ($mysqli->server_version >= 50614) {
      if (stripos($mysql_db_type, 'percona') !== FALSE) {
        $alter_select = TRUE;
      }
    }
    if ($alter_select) {
      $query = str_replace('SELECT ', 'SELECT MAX_STATEMENT_TIME=2000 ', $query);
    }
    else {
      if ($mysqli->server_version >= 50708) {
        $query = str_replace('SELECT ', 'SELECT /*+ MAX_EXECUTION_TIME(2000) */ ', $query);
      }
      elseif (stripos($mysql_db_type, 'mariadb') !== FALSE && version_compare($version_alt, '10.1.2', '>=')) {
        $query = str_replace('SELECT ', 'SET STATEMENT max_statement_time=2 FOR SELECT ', $query);
      }
    }
  }

  // Clear static cache on insert or delete & disable function for the rest of
  // this request.
  if (strpos($query, 'INSERT ') === 0 || strpos($query, 'DELETE ') === 0 || strpos($query, 'TRUNCATE ') === 0) {
    $apdqc_async_data =& drupal_static('apdqc_async_data');
    $do_not_use_async_data =& drupal_static('apdqc_async_data_do_not_use_async');
    $do_not_run_prefetch_array =& drupal_static('apdqc_run_prefetch_array');
    foreach ($tables as $table) {
      $apdqc_async_data[$table] = array();
      $do_not_use_async_data[$table] = TRUE;
      $do_not_run_prefetch_array[$table] = TRUE;
    }
  }

  // Catch mysql disconnect errors with our own error handler.
  drupal_static_reset('_apdqc_query_error_handler');
  set_error_handler('_apdqc_query_error_handler');

  // About to query the semaphore table, set tx_isolation to READ-UNCOMMITTED.
  if ($tables[0] == 'semaphore' || $tables[0] == 'sessions') {
    $mysqli
      ->query("SET SESSION tx_isolation='READ-UNCOMMITTED'");
  }

  // Run query.
  if ($options['async']) {
    $results = $mysqli
      ->query($query, MYSQLI_ASYNC);
  }
  else {
    $results = $mysqli
      ->query($query);
  }

  // Recover if query failed.
  $apdqc_errormsg =& drupal_static('_apdqc_query_error_handler');
  if (!empty($apdqc_errormsg)) {

    // Restore the Drupal error handler.
    restore_error_handler();
    $db_info =& drupal_static('apdqc_get_db_object');

    // Check this connection.
    $mysqli = apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async'], TRUE);
    if (empty($mysqli)) {
      $mysqli = apdqc_get_db_object($tables, $cids, $options);
    }
    if (!empty($mysqli)) {

      // Run query again.
      if ($options['async']) {
        $results = $mysqli
          ->query($query, MYSQLI_ASYNC);
      }
      else {
        $results = $mysqli
          ->query($query);
      }
    }
    else {
      watchdog('apdqc', 'This query failed: @query', array(
        '@query' => $query,
      ));
    }
  }

  // Restore the Drupal error handler.
  restore_error_handler();

  // Get affected_rows if requested to do so.
  if (!$options['async'] && $options['get_affected_rows']) {
    $results = $mysqli->affected_rows;
  }

  // Stop timer & write to the log if DB logger is enabled.
  if (!empty($logger)) {
    $query_end = microtime(TRUE);
    $affected_rows = $mysqli->affected_rows;
    $extra_data = '';
    if ($options['async']) {
      $extra_data .= 'ASYNC ';
    }
    else {
      if ($affected_rows == -1) {
        $extra_data .= 'ERROR ';
      }
      elseif (empty($affected_rows)) {
        $extra_data .= 'MISS ';
      }
      else {
        $extra_data .= 'HIT ';
      }
    }
    if (variable_get('apdqc_verbose_devel_output', APDQC_VERBOSE_DEVEL_OUTPUT) || !$options['async']) {
      $extra_data .= implode(', ', $tables) . ' Rows affected: ' . $affected_rows . ' Error code: ' . $mysqli->sqlstate . ' ';
      require_once 'apdqc.log.inc';
      $extra_data .= 'thread_id:' . $mysqli->thread_id . ' ';
      $statement = new ApdqcFakeDatabaseStatement($query, $extra_data);
      $logger
        ->log($statement, array(), $query_end - $query_start);
    }
  }
  if ($options['get_mysqli']) {
    $return = $mysqli;
  }
  if (empty($return)) {
    if (!$options['get_affected_rows'] && $options['fetch_all']) {
      $result = array();
      if (!empty($results) && $results instanceof mysqli_result) {
        $result = $results
          ->fetch_all(MYSQLI_ASSOC);
      }
      $return = $result;
    }
    else {
      $return = $results;
    }
  }

  // Done quering the semaphore table, set tx_isolation back to READ-COMMITTED.
  if ($tables[0] == 'semaphore' || $tables[0] == 'sessions') {
    $mysqli
      ->query("SET SESSION tx_isolation='READ-COMMITTED'");
  }
  return $return;
}