function apdqc_get_db_object in Asynchronous Prefetch Database Query Cache 7
Return a mysqli object that is ready to be used.
Parameters
array $tables: An array of table names.
array $cids: An array of cache IDs.
array $options: An array of options controlling this function. reap: Wait for all async queries to finish. fast_get: Skip most checks and just return a mysqli object. async: If TRUE then this connection is being used for an async query.
Return value
mysqli returns a mysqli object on success or FALSE on failure.
8 calls to apdqc_get_db_object()
- apdqc.lock.inc in ./
apdqc.lock.inc - A database-mediated implementation of a locking mechanism.
- apdqc_admin_change_table_collation_queries in ./
apdqc.admin.inc - Convert the table to the specified collation.
- apdqc_async_data in ./
apdqc.mysql.inc - Used to get & parse async cached data.
- apdqc_convert_cache_index in ./
apdqc.admin.inc - Converts a database index from one form to another.
- apdqc_cron in ./
apdqc.module - Implements hook_cron().
6 string references to 'apdqc_get_db_object'
- apdqc.lock.inc in ./
apdqc.lock.inc - A database-mediated implementation of a locking mechanism.
- ApdqcFakeDatabaseStatement::findCallerfunction in ./
apdqc.log.inc - Determine the routine that called this query.
- apdqc_admin_change_table_collation_queries in ./
apdqc.admin.inc - Convert the table to the specified collation.
- apdqc_convert_cache_index in ./
apdqc.admin.inc - Converts a database index from one form to another.
- apdqc_cron in ./
apdqc.module - Implements hook_cron().
File
- ./
apdqc.mysql.inc, line 224 - APDQC Database interface code for MySQL database servers.
Code
function apdqc_get_db_object(array $tables = array(), array $cids = array(), array $options = array()) {
$options += array(
'reap' => FALSE,
'fast_get' => FALSE,
'async' => TRUE,
);
// Bail out if not mysql that is async capable.
$mysqli = FALSE;
if (!function_exists('mysqli_init') || !defined('MYSQLI_ASYNC')) {
return $mysqli;
}
// Use the advanced drupal_static() pattern for $db_info.
static $db_info;
if (!isset($db_info)) {
$db_info =& drupal_static(__FUNCTION__);
// 'var' stores variables about the mysql database.
if (!isset($db_info['var'])) {
$db_info['var'] = array();
}
$db_info['var'] += array(
'max_connections' => 90,
'wait_timeout' => 90,
'connect_timeout' => 2,
'innodb_lock_wait_timeout' => 10,
// 33554432 = 32MB.
'max_allowed_packet' => 33554432,
);
// 'connection' stores the info needed to make a new connection to mysql.
if (!isset($db_info['connection'])) {
// Use default connection info.
if (class_exists('Database')) {
$connection_info = Database::getConnectionInfo();
$db_info['connection'] = reset($connection_info);
}
else {
$db_info['connection'] = $GLOBALS['databases']['default']['default'];
}
if (empty($db_info['connection']['port'])) {
$db_info['connection']['port'] = NULL;
}
else {
$db_info['connection']['port'] = (int) $db_info['connection']['port'];
}
if (empty($db_info['connection']['unix_socket'])) {
$db_info['connection']['unix_socket'] = NULL;
}
if (empty($db_info['connection']['password'])) {
$db_info['connection']['password'] = NULL;
}
$mysql_db_type =& drupal_static('apdqc_mysql_db_type');
if (!isset($mysql_db_type)) {
// Default to MySQL.
$mysql_db_type = 'MySQL';
if (isset($GLOBALS['databases']['default']['default']['mysql_db_type'])) {
$mysql_db_type = $GLOBALS['databases']['default']['default']['mysql_db_type'];
}
}
}
// 'pool' stores a collection of open mysql connections.
if (!isset($db_info['pool'])) {
$db_info['pool'] = array();
}
}
if (!empty($options['fast_get']) && !empty($db_info['pool'])) {
$values = reset($db_info['pool']);
return $values[0];
}
// Make sure a table/cid pair is used by the same connection in order to avoid
// record level deadlocks with async queries.
if (!empty($db_info['pool']) && !empty($tables) && !empty($cids)) {
$match = FALSE;
$new_data = FALSE;
foreach ($db_info['pool'] as $key => $values) {
// Skip if not an async query.
if (empty($values[3])) {
continue;
}
// Match the table.
$intersect = array_intersect($tables, $values[1]);
if (!empty($intersect)) {
$intersect = array();
// Match the cache id.
$done = FALSE;
if (isset($cids['*'])) {
// Whole table query.
$intersect = array(
TRUE,
);
$done = TRUE;
}
elseif (isset($values[2]['*'])) {
// Whole table query.
$not_intersect = array_intersect($values[2]['*'], $cids);
$missing_cids = array_diff($not_intersect, $values[2]['*']);
if (!empty($missing_cids)) {
$intersect = array(
TRUE,
);
}
$done = TRUE;
}
if (!$done) {
// Check for like queries.
foreach ($values[2] as $old_cid) {
$like_pos = strpos($old_cid, '%');
if ($like_pos !== FALSE) {
foreach ($cids as $new_cid) {
if (substr($old_cid, 0, $like_pos) === substr($new_cid, 0, $like_pos)) {
$intersect = array(
TRUE,
);
$done = TRUE;
break 2;
}
}
}
}
}
if (!$done) {
// Match cids.
$intersect = array_intersect($cids, $values[2]);
}
// If the cache id is matched.
if (!empty($intersect)) {
// Wait for the query to finish.
apdqc_reap_async_query($db_info, $values[0], $cids, $tables);
$new_data = TRUE;
$match = $values[0];
// Report that the connection is ready.
$db_info['pool'][$key][3] = FALSE;
}
}
}
if (!empty($options['reap'])) {
return $new_data;
}
if (!empty($match)) {
apdqc_mysqli_ping($match, $db_info, $tables, $cids, $options['async']);
return $match;
}
}
// Try to reuse an old connection.
if (!empty($db_info['pool'])) {
$mysqli_pool = array();
foreach ($db_info['pool'] as $values) {
$mysqli_pool[] = $values[0];
// If query was not async, use it.
if (empty($values[3])) {
$mysqli = $values[0];
apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
return $mysqli;
}
}
$links = $errors = $reject = $mysqli_pool;
$ready = @mysqli_poll($links, $errors, $reject, 0, 1);
if (!empty($reject)) {
// A non async connection is ready; use the first one.
$mysqli = reset($reject);
apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
return $mysqli;
}
if (!empty($links)) {
// An async connection is ready; use the ready one.
if (isset($links[$ready - 1])) {
$mysqli = $links[$ready - 1];
}
else {
$mysqli = reset($links);
}
apdqc_reap_async_query($db_info, $mysqli, $cids, $tables);
apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
return $mysqli;
}
// All current connections are in use.
if (count($db_info['pool']) < variable_get('apdqc_max_connections_per_thread', APDQC_MAX_CONNECTIONS_PER_THREAD)) {
// Create a new DB connection.
if (count($db_info['pool']) < variable_get('apdqc_max_connections_before_check', APDQC_MAX_CONNECTIONS_BEFORE_CHECK)) {
$mysqli = apdqc_mysqli_new_connection($db_info);
}
else {
$mysqli = apdqc_mysqli_new_connection($db_info, TRUE);
}
if (!empty($mysqli)) {
$db_info['pool'][$mysqli->thread_id] = array(
$mysqli,
$tables,
$cids,
$options['async'],
);
}
return $mysqli;
}
else {
// Wait for a db connection to be ready.
$ready = FALSE;
while (!$ready) {
$mysqli_pool = array();
foreach ($db_info['pool'] as $values) {
$mysqli_pool[] = $values[0];
}
$links = $errors = $reject = $mysqli_pool;
$ready = @mysqli_poll($links, $errors, $reject, 0, 5000);
if (!$ready && !empty($reject)) {
$ready = TRUE;
}
}
if (!empty($reject)) {
// A non async connection is ready; use the first one.
$mysqli = reset($reject);
apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
return $mysqli;
}
if (!empty($links)) {
// An async connection is ready; use the ready one.
if (isset($links[$ready - 1])) {
$mysqli = $links[$ready - 1];
}
else {
$mysqli = reset($links);
}
apdqc_reap_async_query($db_info, $mysqli, $cids, $tables);
apdqc_mysqli_ping($mysqli, $db_info, $tables, $cids, $options['async']);
return $mysqli;
}
}
}
if (empty($db_info['pool'])) {
$mysqli = apdqc_mysqli_new_connection($db_info);
if (!empty($mysqli)) {
$db_info['pool'][$mysqli->thread_id] = array(
$mysqli,
$tables,
$cids,
$options['async'],
);
}
}
return $mysqli;
}