You are here

function apdqc_get_db_object in Asynchronous Prefetch Database Query Cache 7

Return a mysqli object that is ready to be used.

Parameters

array $tables: An array of table names.

array $cids: An array of cache IDs.

array $options: An array of options controlling this function. reap: Wait for all async queries to finish. fast_get: Skip most checks and just return a mysqli object. async: If TRUE then this connection is being used for an async query.

Return value

mysqli returns a mysqli object on success or FALSE on failure.

8 calls to apdqc_get_db_object()
apdqc.lock.inc in ./apdqc.lock.inc
A database-mediated implementation of a locking mechanism.
apdqc_admin_change_table_collation_queries in ./apdqc.admin.inc
Convert the table to the specified collation.
apdqc_async_data in ./apdqc.mysql.inc
Used to get & parse async cached data.
apdqc_convert_cache_index in ./apdqc.admin.inc
Converts a database index from one form to another.
apdqc_cron in ./apdqc.module
Implements hook_cron().

... See full list

6 string references to 'apdqc_get_db_object'
apdqc.lock.inc in ./apdqc.lock.inc
A database-mediated implementation of a locking mechanism.
ApdqcFakeDatabaseStatement::findCallerfunction in ./apdqc.log.inc
Determine the routine that called this query.
apdqc_admin_change_table_collation_queries in ./apdqc.admin.inc
Convert the table to the specified collation.
apdqc_convert_cache_index in ./apdqc.admin.inc
Converts a database index from one form to another.
apdqc_cron in ./apdqc.module
Implements hook_cron().

... See full list

File

./apdqc.mysql.inc, line 224
APDQC Database interface code for MySQL database servers.

Code

function apdqc_get_db_object(array $tables = array(), array $cids = array(), array $options = array()) {
  $options += array(
    'reap' => FALSE,
    'fast_get' => FALSE,
    'async' => TRUE,
  );

  // Bail out if not mysql that is async capable.
  $mysqli = FALSE;
  if (!function_exists('mysqli_init') || !defined('MYSQLI_ASYNC')) {
    return $mysqli;
  }

  // Use the advanced drupal_static() pattern for $db_info.
  static $db_info;
  if (!isset($db_info)) {
    $db_info =& drupal_static(__FUNCTION__);

    // 'var' stores variables about the mysql database.
    if (!isset($db_info['var'])) {
      $db_info['var'] = array();
    }
    $db_info['var'] += array(
      'max_connections' => 90,
      'wait_timeout' => 90,
      'connect_timeout' => 2,
      'innodb_lock_wait_timeout' => 10,
      // 33554432 = 32MB.
      'max_allowed_packet' => 33554432,
    );

    // 'connection' stores the info needed to make a new connection to mysql.
    if (!isset($db_info['connection'])) {

      // Use default connection info.
      if (class_exists('Database')) {
        $connection_info = Database::getConnectionInfo();
        $db_info['connection'] = reset($connection_info);
      }
      else {
        $db_info['connection'] = $GLOBALS['databases']['default']['default'];
      }
      if (empty($db_info['connection']['port'])) {
        $db_info['connection']['port'] = NULL;
      }
      else {
        $db_info['connection']['port'] = (int) $db_info['connection']['port'];
      }
      if (empty($db_info['connection']['unix_socket'])) {
        $db_info['connection']['unix_socket'] = NULL;
      }
      if (empty($db_info['connection']['password'])) {
        $db_info['connection']['password'] = NULL;
      }
      $mysql_db_type =& drupal_static('apdqc_mysql_db_type');
      if (!isset($mysql_db_type)) {

        // Default to MySQL.
        $mysql_db_type = 'MySQL';
        if (isset($GLOBALS['databases']['default']['default']['mysql_db_type'])) {
          $mysql_db_type = $GLOBALS['databases']['default']['default']['mysql_db_type'];
        }
      }
    }

    // 'pool' stores a collection of open mysql connections.
    if (!isset($db_info['pool'])) {
      $db_info['pool'] = array();
    }
  }
  if (!empty($options['fast_get']) && !empty($db_info['pool'])) {
    $values = reset($db_info['pool']);
    return $values[0];
  }

  // Make sure a table/cid pair is used by the same connection in order to avoid
  // record level deadlocks with async queries.
  if (!empty($db_info['pool']) && !empty($tables) && !empty($cids)) {
    $match = FALSE;
    $new_data = FALSE;
    foreach ($db_info['pool'] as $key => $values) {

      // Skip if not an async query.
      if (empty($values[3])) {
        continue;
      }

      // Match the table.
      $intersect = array_intersect($tables, $values[1]);
      if (!empty($intersect)) {
        $intersect = array();

        // Match the cache id.
        $done = FALSE;
        if (isset($cids['*'])) {

          // Whole table query.
          $intersect = array(
            TRUE,
          );
          $done = TRUE;
        }
        elseif (isset($values[2]['*'])) {

          // Whole table query.
          $not_intersect = array_intersect($values[2]['*'], $cids);
          $missing_cids = array_diff($not_intersect, $values[2]['*']);
          if (!empty($missing_cids)) {
            $intersect = array(
              TRUE,
            );
          }
          $done = TRUE;
        }
        if (!$done) {

          // Check for like queries.
          foreach ($values[2] as $old_cid) {
            $like_pos = strpos($old_cid, '%');
            if ($like_pos !== FALSE) {
              foreach ($cids as $new_cid) {
                if (substr($old_cid, 0, $like_pos) === substr($new_cid, 0, $like_pos)) {
                  $intersect = array(
                    TRUE,
                  );
                  $done = TRUE;
                  break 2;
                }
              }
            }
          }
        }
        if (!$done) {

          // Match cids.
          $intersect = array_intersect($cids, $values[2]);
        }

        // If the cache id is matched.
        if (!empty($intersect)) {

          // Wait for the query to finish.
          apdqc_reap_async_query($db_info, $values[0], $cids, $tables);
          $new_data = TRUE;
          $match = $values[0];

          // Report that the connection is ready.
          $db_info['pool'][$key][3] = FALSE;
        }
      }
    }
    if (!empty($options['reap'])) {
      return $new_data;
    }
    if (!empty($match)) {
      apdqc_mysqli_ping($match, $db_info, $tables, $cids, $options['async']);
      return $match;
    }
  }

  // Try to reuse an old connection.
  if (!empty($db_info['pool'])) {
    $mysqli_pool = array();
    foreach ($db_info['pool'] as $values) {
      $mysqli_pool[] = $values[0];

      // If query was not async, use it.
      if (empty($values[3])) {
        $mysqli = $values[0];
        apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
        return $mysqli;
      }
    }
    $links = $errors = $reject = $mysqli_pool;
    $ready = @mysqli_poll($links, $errors, $reject, 0, 1);
    if (!empty($reject)) {

      // A non async connection is ready; use the first one.
      $mysqli = reset($reject);
      apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
      return $mysqli;
    }
    if (!empty($links)) {

      // An async connection is ready; use the ready one.
      if (isset($links[$ready - 1])) {
        $mysqli = $links[$ready - 1];
      }
      else {
        $mysqli = reset($links);
      }
      apdqc_reap_async_query($db_info, $mysqli, $cids, $tables);
      apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
      return $mysqli;
    }

    // All current connections are in use.
    if (count($db_info['pool']) < variable_get('apdqc_max_connections_per_thread', APDQC_MAX_CONNECTIONS_PER_THREAD)) {

      // Create a new DB connection.
      if (count($db_info['pool']) < variable_get('apdqc_max_connections_before_check', APDQC_MAX_CONNECTIONS_BEFORE_CHECK)) {
        $mysqli = apdqc_mysqli_new_connection($db_info);
      }
      else {
        $mysqli = apdqc_mysqli_new_connection($db_info, TRUE);
      }
      if (!empty($mysqli)) {
        $db_info['pool'][$mysqli->thread_id] = array(
          $mysqli,
          $tables,
          $cids,
          $options['async'],
        );
      }
      return $mysqli;
    }
    else {

      // Wait for a db connection to be ready.
      $ready = FALSE;
      while (!$ready) {
        $mysqli_pool = array();
        foreach ($db_info['pool'] as $values) {
          $mysqli_pool[] = $values[0];
        }
        $links = $errors = $reject = $mysqli_pool;
        $ready = @mysqli_poll($links, $errors, $reject, 0, 5000);
        if (!$ready && !empty($reject)) {
          $ready = TRUE;
        }
      }
      if (!empty($reject)) {

        // A non async connection is ready; use the first one.
        $mysqli = reset($reject);
        apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
        return $mysqli;
      }
      if (!empty($links)) {

        // An async connection is ready; use the ready one.
        if (isset($links[$ready - 1])) {
          $mysqli = $links[$ready - 1];
        }
        else {
          $mysqli = reset($links);
        }
        apdqc_reap_async_query($db_info, $mysqli, $cids, $tables);
        apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
        return $mysqli;
      }
    }
  }
  if (empty($db_info['pool'])) {
    $mysqli = apdqc_mysqli_new_connection($db_info);
    if (!empty($mysqli)) {
      $db_info['pool'][$mysqli->thread_id] = array(
        $mysqli,
        $tables,
        $cids,
        $options['async'],
      );
    }
  }
  return $mysqli;
}