class Schema in Drupal driver for SQL Server and SQL Azure 3.0.x
Same name and namespace in other branches
- 8.2 drivers/lib/Drupal/Driver/Database/sqlsrv/Schema.php \Drupal\Driver\Database\sqlsrv\Schema
- 8 drivers/lib/Drupal/Driver/Database/sqlsrv/Schema.php \Drupal\Driver\Database\sqlsrv\Schema
Hierarchy
- class \Drupal\Core\Database\Schema implements PlaceholderInterface
- class \Drupal\Driver\Database\sqlsrv\Schema
Expanded class hierarchy of Schema
1 string reference to 'Schema'
- Tasks::getFormOptions in drivers/
lib/ Drupal/ Driver/ Database/ sqlsrv/ Install/ Tasks.php - Return driver specific configuration options.
File
- drivers/
lib/ Drupal/ Driver/ Database/ sqlsrv/ Schema.php, line 13
Namespace
Drupal\Driver\Database\sqlsrvView source
class Schema extends DatabaseSchema {
/**
* The database connection.
*
* @var \Drupal\Driver\Database\sqlsrv\Connection
*/
protected $connection;
/**
* Default schema for SQL Server databases.
*
* @var string
*/
protected $defaultSchema;
/**
* Maximum length of a comment in SQL Server.
*
* @var int
*/
const COMMENT_MAX_BYTES = 7500;
/**
* Maximum length of a Primary Key.
*
* @var int
*/
const PRIMARY_KEY_BYTES = 900;
/**
* Maximum length of a clustered index.
*
* @var int
*/
const CLUSTERED_INDEX_BYTES = 900;
/**
* Maximum length of a non-clustered index.
*
* @var int
*/
const NONCLUSTERED_INDEX_BYTES = 1700;
/**
* Maximum index length with XML field.
*
* @var int
*/
const XML_INDEX_BYTES = 128;
// Name for the technical column used for computed key sor technical primary
// key.
// IMPORTANT: They both start with "__" because the statement class will
// remove those columns from the final result set.
/**
* Computed primary key name.
*
* @var string
*/
const COMPUTED_PK_COLUMN_NAME = '__pkc';
/**
* Computed primary key index.
*
* @var string
*/
const COMPUTED_PK_COLUMN_INDEX = '__ix_pkc';
/**
* Technical primary key name.
*
* @var string
*/
const TECHNICAL_PK_COLUMN_NAME = '__pk';
/**
* Version information for the SQL Server engine.
*
* @var array
*/
protected $engineVersion;
/**
* Should we cache table schema?
*
* @var bool
*/
private $cacheSchema;
/**
* Table schema.
*
* @var mixed
*/
private $columnInformation = [];
/**
* {@inheritdoc}
*/
public function getFieldTypeMap() {
// Put :normal last so it gets preserved by array_flip. This makes
// it much easier for modules (such as schema.module) to map
// database types back into schema types.
$utf8_string_types = [
'varchar:normal' => 'varchar',
'char:normal' => 'char',
'text:tiny' => 'varchar(255)',
'text:small' => 'varchar(255)',
'text:medium' => 'varchar(max)',
'text:big' => 'varchar(max)',
'text:normal' => 'varchar(max)',
];
$ucs2_string_types = [
'varchar:normal' => 'nvarchar',
'char:normal' => 'nchar',
'text:tiny' => 'nvarchar(255)',
'text:small' => 'nvarchar(255)',
'text:medium' => 'nvarchar(max)',
'text:big' => 'nvarchar(max)',
'text:normal' => 'nvarchar(max)',
];
$standard_types = [
'varchar_ascii:normal' => 'varchar(255)',
'serial:tiny' => 'smallint',
'serial:small' => 'smallint',
'serial:medium' => 'int',
'serial:big' => 'bigint',
'serial:normal' => 'int',
'int:tiny' => 'smallint',
'int:small' => 'smallint',
'int:medium' => 'int',
'int:big' => 'bigint',
'int:normal' => 'int',
'float:tiny' => 'real',
'float:small' => 'real',
'float:medium' => 'real',
'float:big' => 'float(53)',
'float:normal' => 'real',
'numeric:normal' => 'numeric',
'blob:big' => 'varbinary(max)',
'blob:normal' => 'varbinary(max)',
'date:normal' => 'date',
'datetime:normal' => 'datetime2(0)',
'time:normal' => 'time(0)',
];
$standard_types += $this
->isUtf8() ? $utf8_string_types : $ucs2_string_types;
return $standard_types;
}
/**
* {@inheritdoc}
*/
public function renameTable($table, $new_name) {
if (!$this
->tableExists($table)) {
throw new SchemaObjectDoesNotExistException(t("Cannot rename %table to %table_new: table %table doesn't exist.", [
'%table' => $table,
'%table_new' => $new_name,
]));
}
if ($this
->tableExists($new_name)) {
throw new SchemaObjectExistsException(t("Cannot rename %table to %table_new: table %table_new already exists.", [
'%table' => $table,
'%table_new' => $new_name,
]));
}
$old_table_info = $this
->getPrefixInfo($table);
$new_table_info = $this
->getPrefixInfo($new_name);
// We don't support renaming tables across schemas (yet).
if ($old_table_info['schema'] != $new_table_info['schema']) {
throw new \PDOException(t('Cannot rename a table across schema.'));
}
$this->connection
->queryDirect('EXEC sp_rename :old, :new', [
':old' => $old_table_info['schema'] . '.' . $old_table_info['table'],
':new' => $new_table_info['table'],
]);
// Constraint names are global in SQL Server, so we need to rename them
// when renaming the table. For some strange reason, indexes are local to
// a table.
$objects = $this->connection
->queryDirect('SELECT name FROM sys.objects WHERE parent_object_id = OBJECT_ID(:table)', [
':table' => $new_table_info['schema'] . '.' . $new_table_info['table'],
]);
foreach ($objects as $object) {
if (preg_match('/^' . preg_quote($old_table_info['table']) . '_(.*)$/', $object->name, $matches)) {
$this->connection
->queryDirect('EXEC sp_rename :old, :new, :type', [
':old' => $old_table_info['schema'] . '.' . $object->name,
':new' => $new_table_info['table'] . '_' . $matches[1],
':type' => 'OBJECT',
]);
}
}
$this
->resetColumnInformation($table);
}
/**
* {@inheritdoc}
*/
public function dropTable($table) {
if (!$this
->tableExists($table)) {
return FALSE;
}
$this->connection
->queryDirect('DROP TABLE {' . $table . '}');
$this
->resetColumnInformation($table);
return TRUE;
}
/**
* {@inheritdoc}
*/
public function fieldExists($table, $field) {
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
return $this->connection
->queryDirect('SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = :table AND column_name = :name', [
':table' => $prefixInfo['table'],
':name' => $field,
])
->fetchField() !== FALSE;
}
/**
* {@inheritdoc}
*/
public function addField($table, $field, $spec, $keys_new = []) {
if (!$this
->tableExists($table)) {
throw new SchemaObjectDoesNotExistException(t("Cannot add field @table.@field: table doesn't exist.", [
'@field' => $field,
'@table' => $table,
]));
}
if ($this
->fieldExists($table, $field)) {
throw new SchemaObjectExistsException(t("Cannot add field @table.@field: field already exists.", [
'@field' => $field,
'@table' => $table,
]));
}
// Fields that are part of a PRIMARY KEY must be added as NOT NULL.
$is_primary_key = isset($keys_new['primary key']) && in_array($field, $keys_new['primary key'], TRUE);
if ($is_primary_key) {
$this
->ensureNotNullPrimaryKey($keys_new['primary key'], [
$field => $spec,
]);
}
$transaction = $this->connection
->startTransaction();
// Prepare the specifications.
$spec = $this
->processField($spec);
// Use already prefixed table name.
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$table_prefixed = $prefixInfo['table'];
if ($this
->findPrimaryKeyColumns($table) !== [] && isset($keys_new['primary key']) && in_array($field, $keys_new['primary key'])) {
$this
->cleanUpPrimaryKey($table);
}
// If the field is declared NOT NULL, we have to first create it NULL insert
// the initial data (or populate default values) and then switch to NOT
// NULL.
$fixnull = FALSE;
if (!empty($spec['not null'])) {
$fixnull = TRUE;
$spec['not null'] = FALSE;
}
// Create the field.
// Because the default values of fields can contain string literals
// with braces, we CANNOT allow the driver to prefix tables because the
// algorithm to do so is a crappy str_replace.
$query = "ALTER TABLE {{$table}} ADD ";
$query .= $this
->createFieldSql($table, $field, $spec);
$this->connection
->queryDirect($query, []);
$this
->resetColumnInformation($table);
// Load the initial data.
if (isset($spec['initial_from_field'])) {
if (isset($spec['initial'])) {
$expression = 'COALESCE(' . $spec['initial_from_field'] . ', :default_initial_value)';
$arguments = [
':default_initial_value' => $spec['initial'],
];
}
else {
$expression = $spec['initial_from_field'];
$arguments = [];
}
$this->connection
->update($table)
->expression($field, $expression, $arguments)
->execute();
}
elseif (isset($spec['initial'])) {
$this->connection
->update($table)
->fields([
$field => $spec['initial'],
])
->execute();
}
// Switch to NOT NULL now.
if ($fixnull === TRUE) {
// There is no warranty that the old data did not have NULL values, we
// need to populate nulls with the default value because this won't be
// done by MSSQL by default.
if (isset($spec['default'])) {
$default_expression = $this
->defaultValueExpression($spec['sqlsrv_type'], $spec['default']);
$sql = "UPDATE {{$table}} SET {$field}={$default_expression} WHERE {$field} IS NULL";
$this->connection
->queryDirect($sql);
}
// Now it's time to make this non-nullable.
$spec['not null'] = TRUE;
$field_sql = $this
->createFieldSql($table, $field, $spec, TRUE);
$this->connection
->queryDirect("ALTER TABLE {{$table}} ALTER COLUMN {$field_sql}");
$this
->resetColumnInformation($table);
}
$this
->recreateTableKeys($table, $keys_new);
if (isset($spec['description'])) {
$this->connection
->queryDirect($this
->createCommentSql($spec['description'], $table, $field));
}
}
/**
* {@inheritdoc}
*
* Should this be in a Transaction?
*/
public function dropField($table, $field) {
if (!$this
->fieldExists($table, $field)) {
return FALSE;
}
$primary_key_fields = $this
->findPrimaryKeyColumns($table);
if (in_array($field, $primary_key_fields)) {
// Let's drop the PK.
$this
->cleanUpPrimaryKey($table);
$this
->createTechnicalPrimaryColumn($table);
}
// Drop the related objects.
$this
->dropFieldRelatedObjects($table, $field);
// Drop field comments.
if ($this
->getComment($table, $field) !== FALSE) {
$this->connection
->queryDirect($this
->deleteCommentSql($table, $field));
}
$this->connection
->query('ALTER TABLE {' . $table . '} DROP COLUMN ' . $field);
$this
->resetColumnInformation($table);
return TRUE;
}
/**
* {@inheritdoc}
*/
public function fieldSetDefault($table, $field, $default) {
@trigger_error('fieldSetDefault() is deprecated in drupal:8.7.0 and will be removed before drupal:9.0.0. Instead, call ::changeField() passing a full field specification. See https://www.drupal.org/node/2999035', E_USER_DEPRECATED);
if (!$this
->fieldExists($table, $field)) {
throw new SchemaObjectDoesNotExistException(t("Cannot set default value of field %table.%field: field doesn't exist.", [
'%table' => $table,
'%field' => $field,
]));
}
$default = $this
->escapeDefaultValue($default);
// Try to remove any existing default first.
try {
$this
->fieldSetNoDefault($table, $field);
} catch (\Exception $e) {
}
// Create the new default.
$this->connection
->query('ALTER TABLE [{' . $table . '}] ADD CONSTRAINT {' . $table . '}_' . $field . '_df DEFAULT ' . $default . ' FOR [' . $field . ']');
$this
->resetColumnInformation($table);
}
/**
* {@inheritdoc}
*/
public function fieldSetNoDefault($table, $field) {
@trigger_error('fieldSetNoDefault() is deprecated in drupal:8.7.0 and will be removed before drupal:9.0.0. Instead, call ::changeField() passing a full field specification. See https://www.drupal.org/node/2999035', E_USER_DEPRECATED);
if (!$this
->fieldExists($table, $field)) {
throw new SchemaObjectDoesNotExistException(t("Cannot remove default value of field %table.%field: field doesn't exist.", [
'%table' => $table,
'%field' => $field,
]));
}
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$constraint_name = $prefixInfo['table'] . '_' . $field . '_df';
$this
->dropConstraint($table, $constraint_name, FALSE);
}
/**
* {@inheritdoc}
*/
public function indexExists($table, $name) {
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
return (bool) $this->connection
->query('SELECT 1 FROM sys.indexes WHERE object_id = OBJECT_ID(:table) AND name = :name', [
':table' => $prefixInfo['table'],
':name' => $name . '_idx',
])
->fetchField();
}
/**
* {@inheritdoc}
*/
public function addPrimaryKey($table, $fields) {
if (!$this
->tableExists($table)) {
throw new SchemaObjectDoesNotExistException(t("Cannot add primary key to table %table: table doesn't exist.", [
'%table' => $table,
]));
}
if ($primary_key_name = $this
->primaryKeyName($table)) {
if ($this
->isTechnicalPrimaryKey($primary_key_name)) {
// Destroy the existing technical primary key.
$this->connection
->queryDirect('ALTER TABLE {' . $table . '} DROP CONSTRAINT [' . $primary_key_name . ']');
$this
->resetColumnInformation($table);
$this
->cleanUpTechnicalPrimaryColumn($table);
}
else {
throw new SchemaObjectExistsException(t("Cannot add primary key to table %table: primary key already exists.", [
'%table' => $table,
]));
}
}
// The size limit of the primary key depends on the
// coexistence with an XML field.
if ($this
->tableHasXmlIndex($table)) {
$this
->createPrimaryKey($table, $fields, self::XML_INDEX_BYTES);
}
else {
$this
->createPrimaryKey($table, $fields);
}
return TRUE;
}
/**
* {@inheritdoc}
*/
public function dropPrimaryKey($table) {
if (!$this
->primaryKeyName($table)) {
return FALSE;
}
$this
->cleanUpPrimaryKey($table);
return TRUE;
}
/**
* {@inheritdoc}
*/
protected function findPrimaryKeyColumns($table) {
if (!$this
->tableExists($table)) {
return FALSE;
}
// Use already prefixed table name.
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$query = "SELECT column_name FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC " . "INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU " . "ON TC.CONSTRAINT_TYPE = 'PRIMARY KEY' AND " . "TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME AND " . "KU.table_name=:table AND column_name != '__pk' AND column_name != '__pkc' " . "ORDER BY KU.ORDINAL_POSITION";
$result = $this->connection
->query($query, [
':table' => $prefixInfo['table'],
])
->fetchAllAssoc('column_name');
return array_keys($result);
}
/**
* {@inheritdoc}
*/
public function addUniqueKey($table, $name, $fields) {
if (!$this
->tableExists($table)) {
throw new SchemaObjectDoesNotExistException(t("Cannot add unique key %name to table %table: table doesn't exist.", [
'%table' => $table,
'%name' => $name,
]));
}
if ($this
->uniqueKeyExists($table, $name)) {
throw new SchemaObjectExistsException(t("Cannot add unique key %name to table %table: unique key already exists.", [
'%table' => $table,
'%name' => $name,
]));
}
$this
->createTechnicalPrimaryColumn($table);
// Then, build a expression based on the columns.
$column_expression = [];
foreach ($fields as $field) {
if (is_array($field)) {
$column_expression[] = 'SUBSTRING(CAST(' . $field[0] . ' AS varbinary(max)),1,' . $field[1] . ')';
}
else {
$column_expression[] = 'CAST(' . $field . ' AS varbinary(max))';
}
}
$column_expression = implode(' + ', $column_expression);
// Build a computed column based on the expression that replaces NULL
// values with the globally unique identifier generated previously.
// This is (very) unlikely to result in a collision with any actual value
// in the columns of the unique key.
$this->connection
->query("ALTER TABLE {{$table}} ADD __unique_{$name} AS CAST(HashBytes('MD4', COALESCE({$column_expression}, CAST(" . self::TECHNICAL_PK_COLUMN_NAME . " AS varbinary(max)))) AS varbinary(16))");
$this->connection
->query("CREATE UNIQUE INDEX {$name}_unique ON {{$table}} (__unique_{$name})");
$this
->resetColumnInformation($table);
}
/**
* {@inheritdoc}
*/
public function dropUniqueKey($table, $name) {
if (!$this
->uniqueKeyExists($table, $name)) {
return FALSE;
}
$this->connection
->query("DROP INDEX {$name}_unique ON {{$table}}");
$this->connection
->query("ALTER TABLE {{$table}} DROP COLUMN __unique_{$name}");
$this
->resetColumnInformation($table);
// Try to clean-up the technical primary key if possible.
$this
->cleanUpTechnicalPrimaryColumn($table);
return TRUE;
}
/**
* {@inheritdoc}
*/
public function addIndex($table, $name, $fields, array $spec = []) {
if (!$this
->tableExists($table)) {
throw new SchemaObjectDoesNotExistException(t("Cannot add index %name to table %table: table doesn't exist.", [
'%table' => $table,
'%name' => $name,
]));
}
if ($this
->indexExists($table, $name)) {
throw new SchemaObjectExistsException(t("Cannot add index %name to table %table: index already exists.", [
'%table' => $table,
'%name' => $name,
]));
}
$xml_field = NULL;
foreach ($fields as $field) {
if (isset($info['columns'][$field]['type']) && $info['columns'][$field]['type'] == 'xml') {
$xml_field = $field;
break;
}
}
$sql = $this
->createIndexSql($table, $name, $fields, $xml_field);
$pk_fields = $this
->introspectPrimaryKeyFields($table);
$size = $this
->calculateClusteredIndexRowSizeBytes($table, $pk_fields, TRUE);
if (!empty($xml_field)) {
// We can create an XML field, but the current primary key index
// size needs to be under 128bytes.
if ($size > self::XML_INDEX_BYTES) {
// Alright the compress the index.
$this
->compressPrimaryKeyIndex($table, self::XML_INDEX_BYTES);
}
$this->connection
->query($sql);
$this
->resetColumnInformation($table);
}
elseif ($size <= self::NONCLUSTERED_INDEX_BYTES) {
$this->connection
->query($sql);
$this
->resetColumnInformation($table);
}
// If the field is too large, do not create an index.
}
/**
* {@inheritdoc}
*/
public function dropIndex($table, $name) {
if (!$this
->indexExists($table, $name)) {
return FALSE;
}
$expand = FALSE;
if (($index = $this
->tableHasXmlIndex($table)) && $index == $name . '_idx') {
$expand = TRUE;
}
$this->connection
->query('DROP INDEX ' . $name . '_idx ON [{' . $table . '}]');
$this
->resetColumnInformation($table);
// If we just dropped an XML index, we can re-expand the original primary
// key index.
if ($expand) {
$this
->compressPrimaryKeyIndex($table);
}
return TRUE;
}
/**
* {@inheritdoc}
*/
public function introspectIndexSchema($table) {
if (!$this
->tableExists($table)) {
throw new SchemaObjectDoesNotExistException("The table {$table} doesn't exist.");
}
$index_schema = [
'primary key' => $this
->findPrimaryKeyColumns($table),
'unique keys' => [],
'indexes' => [],
];
$column_information = $this
->queryColumnInformation($table);
foreach ($column_information['indexes'] as $key => $values) {
if ($values['is_primary_key'] !== 1 && $values['data_space_id'] == 1 && $values['is_unique'] == 0) {
foreach ($values['columns'] as $num => $stats) {
$index_schema['indexes'][substr($key, 0, -4)][] = $stats['name'];
}
}
}
foreach ($column_information['columns'] as $name => $spec) {
if (substr($name, 0, 9) == '__unique_' && $column_information['indexes'][substr($name, 9) . '_unique']['is_unique'] == 1) {
$definition = $spec['definition'];
$matches = [];
preg_match_all("/CONVERT\\(\\[varbinary\\]\\(max\\),\\[([a-zA-Z0-9_]*)\\]/", $definition, $matches);
foreach ($matches[1] as $match) {
if ($match != '__pk') {
$index_schema['unique keys'][substr($name, 9)][] = $match;
}
}
}
}
return $index_schema;
}
/**
* {@inheritdoc}
*/
public function changeField($table, $field, $field_new, $spec, $keys_new = []) {
if (!$this
->fieldExists($table, $field)) {
throw new SchemaObjectDoesNotExistException(t("Cannot change the definition of field %table.%name: field doesn't exist.", [
'%table' => $table,
'%name' => $field,
]));
}
if ($field != $field_new && $this
->fieldExists($table, $field_new)) {
throw new SchemaObjectExistsException(t("Cannot rename field %table.%name to %name_new: target field already exists.", [
'%table' => $table,
'%name' => $field,
'%name_new' => $field_new,
]));
}
if (isset($keys_new['primary key']) && in_array($field_new, $keys_new['primary key'], TRUE)) {
$this
->ensureNotNullPrimaryKey($keys_new['primary key'], [
$field_new => $spec,
]);
}
// Check if we need to drop field comments.
$drop_field_comment = FALSE;
if ($this
->getComment($table, $field) !== FALSE) {
$drop_field_comment = TRUE;
}
// SQL Server supports transactional DDL, so we can just start a transaction
// here and pray for the best.
/** @var Transaction $transaction */
$transaction = $this->connection
->startTransaction();
// Prepare the specifications.
$spec = $this
->processField($spec);
/*
* IMPORTANT NOTE: To maintain database portability, you have to explicitly
* recreate all indices and primary keys that are using the changed field.
* That means that you ohave to drop all affected keys and indexes with
* db_drop_{primary_key,unique_key,index}() before calling
* db_change_field().
*
* @see https://api.drupal.org/api/drupal/includes!database!database.inc/function/db_change_field/7
*
* What we are going to do in the SQL Server Driver is a best-effort try to
* preserve original keys if they do not conflict with the keys_new
* parameter, and if the callee has done it's job (droping constraints/keys)
* then they will of course not be recreated.
*
* Introspect the schema and save the current primary key if the column
* we are modifying is part of it. Make sure the schema is FRESH.
*/
$primary_key_fields = $this
->findPrimaryKeyColumns($table);
if (in_array($field, $primary_key_fields)) {
// Let's drop the PK.
$this
->cleanUpPrimaryKey($table);
}
// If there is a generated unique key for this field, we will need to
// add it back in when we are done.
$unique_key = $this
->uniqueKeyExists($table, $field);
// Drop the related objects.
$this
->dropFieldRelatedObjects($table, $field);
if ($drop_field_comment) {
$this->connection
->queryDirect($this
->deleteCommentSql($table, $field));
}
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
// Start by renaming the current column.
$this->connection
->queryDirect('EXEC sp_rename :old, :new, :type', [
':old' => $prefixInfo['table'] . '.' . $field,
':new' => $field . '_old',
':type' => 'COLUMN',
]);
$this
->resetColumnInformation($table);
// If the new column does not allow nulls, we need to
// create it first as nullable, then either migrate
// data from previous column or populate default values.
$fixnull = FALSE;
if (!empty($spec['not null'])) {
$fixnull = TRUE;
$spec['not null'] = FALSE;
}
// Create a new field.
$this
->addField($table, $field_new, $spec);
// Don't need to do this if there is no data
// Cannot do this it column is serial.
if ($spec['type'] != 'serial') {
$new_data_type = $this
->createDataType($table, $field_new, $spec);
// Migrate the data over.
// Explicitly cast the old value to the new value to avoid conversion
// errors.
$sql = "UPDATE {{$table}} SET {$field_new}=CAST({$field}_old AS {$new_data_type})";
$this->connection
->queryDirect($sql);
$this
->resetColumnInformation($table);
}
// Switch to NOT NULL now.
if ($fixnull === TRUE) {
// There is no warranty that the old data did not have NULL values, we
// need to populate nulls with the default value because this won't be
// done by MSSQL by default.
if (!empty($spec['default'])) {
$default_expression = $this
->defaultValueExpression($spec['sqlsrv_type'], $spec['default']);
$sql = "UPDATE {{$table}} SET {$field_new} = {$default_expression} WHERE {$field_new} IS NULL";
$this->connection
->queryDirect($sql);
$this
->resetColumnInformation($table);
}
// Now it's time to make this non-nullable.
$spec['not null'] = TRUE;
$field_sql = $this
->createFieldSql($table, $field_new, $spec, TRUE);
$sql = "ALTER TABLE {{$table}} ALTER COLUMN {$field_sql}";
$this->connection
->queryDirect($sql);
$this
->resetColumnInformation($table);
}
// Recreate the primary key if no new primary key has been sent along with
// the change field.
if (in_array($field, $primary_key_fields) && (!isset($keys_new['primary keys']) || empty($keys_new['primary keys']))) {
// The new primary key needs to have the new column name, and be in the
// same order.
if ($field !== $field_new) {
$primary_key_fields[array_search($field, $primary_key_fields)] = $field_new;
}
$keys_new['primary key'] = $primary_key_fields;
}
// Recreate the unique constraint if it existed.
if ($unique_key && (!isset($keys_new['unique keys']) || !in_array($field_new, $keys_new['unique keys']))) {
$keys_new['unique keys'][$field] = [
$field_new,
];
}
// Drop the old field.
$this
->dropField($table, $field . '_old');
// Add the new keys.
$this
->recreateTableKeys($table, $keys_new);
}
/**
* {@inheritdoc}
*
* Adding abilty to pass schema in configuration.
*/
public function __construct($connection) {
parent::__construct($connection);
$options = $connection
->getConnectionOptions();
if (isset($options['schema'])) {
$this->defaultSchema = $options['schema'];
}
$this->cacheSchema = $options['cache_schema'] ?? FALSE;
}
/**
* {@inheritdoc}
*
* Temporary tables and regular tables cannot be verified in the same way.
*/
public function tableExists($table) {
if (empty($table)) {
return FALSE;
}
// Temporary tables and regular tables cannot be verified in the same way.
$query = NULL;
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$args = [];
if ($this->connection
->isTemporaryTable($table)) {
$query = "SELECT 1 FROM tempdb.sys.tables WHERE [object_id] = OBJECT_ID(:table)";
$args = [
':table' => 'tempdb.[' . $this
->getDefaultSchema() . '].[' . $prefixInfo['table'] . ']',
];
}
else {
$query = "SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE [table_name] = :table";
$args = [
':table' => $prefixInfo['table'],
];
}
return (bool) $this->connection
->queryDirect($query, $args)
->fetchField();
}
/**
* Drupal specific functions.
*
* Returns a list of functions that are not available by default on SQL
* Server, but used in Drupal Core or contributed modules because they are
* available in other databases such as MySQL.
*
* @return array
* List of functions.
*/
public function drupalSpecificFunctions() {
$functions = [
'SUBSTRING',
'SUBSTRING_INDEX',
'GREATEST',
'MD5',
'LPAD',
'REGEXP',
'IF',
'CONNECTION_ID',
];
return $functions;
}
/**
* Return active default Schema.
*/
public function getDefaultSchema() {
if (!isset($this->defaultSchema)) {
$result = $this->connection
->queryDirect("SELECT SCHEMA_NAME()")
->fetchField();
$this->defaultSchema = $result;
}
return $this->defaultSchema;
}
/**
* Database introspection: fetch technical information about a table.
*
* @return array
* An array with the following structure:
* - blobs[]: Array of column names that should be treated as blobs in this
* table.
* - identities[]: Array of column names that are identities in this table.
* - identity: The name of the identity column
* - columns[]: An array of specification details for the columns
* - name: Column name.
* - max_length: Maximum length.
* - precision: Precision.
* - collation_name: Collation.
* - is_nullable: Is nullable.
* - is_ansi_padded: Is ANSI padded.
* - is_identity: Is identity.
* - definition: If a computed column, the computation formulae.
* - default_value: Default value for the column (if any).
*/
public function queryColumnInformation($table) {
if (empty($table) || !$this
->tableExists($table)) {
return [];
}
if ($this->cacheSchema && isset($this->columnInformation[$table])) {
return $this->columnInformation[$table];
}
$table_info = $this
->getPrefixInfo($table);
// We could adapt the current code to support temporary table introspection,
// but for now this is not supported.
if ($this->connection
->isTemporaryTable($table)) {
throw new \Exception('Temporary table introspection is not supported.');
}
$info = [];
// Don't use {} around information_schema.columns table.
$sql = "SELECT sysc.name, sysc.max_length, sysc.precision, sysc.collation_name,\n sysc.is_nullable, sysc.is_ansi_padded, sysc.is_identity, sysc.is_computed, TYPE_NAME(sysc.user_type_id) as type,\n syscc.definition, sm.[text] as default_value\n FROM sys.columns AS sysc\n INNER JOIN sys.syscolumns AS sysc2 ON sysc.object_id = sysc2.id and sysc.name = sysc2.name\n LEFT JOIN sys.computed_columns AS syscc ON sysc.object_id = syscc.object_id AND sysc.name = syscc.name\n LEFT JOIN sys.syscomments sm ON sm.id = sysc2.cdefault\n WHERE sysc.object_id = OBJECT_ID(:table)";
$args = [
':table' => $table_info['schema'] . '.' . $table_info['table'],
];
$result = $this->connection
->queryDirect($sql, $args);
foreach ($result as $column) {
if ($column->type == 'varbinary') {
$info['blobs'][$column->name] = TRUE;
}
$info['columns'][$column->name] = (array) $column;
// Provide a clean list of columns that excludes the ones internally
// created by the database driver.
if (!(isset($column->name[1]) && substr($column->name, 0, 2) == "__")) {
$info['columns_clean'][$column->name] = (array) $column;
}
}
// If we have computed columns, it is important to know what other columns
// they depend on!
$column_names = array_keys($info['columns']);
$column_regex = implode('|', $column_names);
foreach ($info['columns'] as &$column) {
$dependencies = [];
if (!empty($column['definition'])) {
$matches = [];
if (preg_match_all("/\\[[{$column_regex}\\]]*\\]/", $column['definition'], $matches) > 0) {
$dependencies = array_map(function ($m) {
return trim($m, "[]");
}, array_shift($matches));
}
}
$column['dependencies'] = array_flip($dependencies);
}
// Don't use {} around system tables.
$result = $this->connection
->queryDirect('SELECT name FROM sys.identity_columns WHERE object_id = OBJECT_ID(:table)', [
':table' => $table_info['schema'] . '.' . $table_info['table'],
]);
unset($column);
$info['identities'] = [];
$info['identity'] = NULL;
foreach ($result as $column) {
$info['identities'][$column->name] = $column->name;
$info['identity'] = $column->name;
}
// Now introspect information about indexes.
$result = $this->connection
->queryDirect("select tab.[name] as [table_name],\n idx.[name] as [index_name],\n allc.[name] as [column_name],\n idx.[type_desc],\n idx.[is_unique],\n idx.[data_space_id],\n idx.[ignore_dup_key],\n idx.[is_primary_key],\n idx.[is_unique_constraint],\n idx.[fill_factor],\n idx.[is_padded],\n idx.[is_disabled],\n idx.[is_hypothetical],\n idx.[allow_row_locks],\n idx.[allow_page_locks],\n idxc.[is_descending_key],\n idxc.[is_included_column],\n idxc.[index_column_id],\n idxc.[key_ordinal]\n FROM sys.[tables] as tab\n INNER join sys.[indexes] idx ON tab.[object_id] = idx.[object_id]\n INNER join sys.[index_columns] idxc ON idx.[object_id] = idxc.[object_id] and idx.[index_id] = idxc.[index_id]\n INNER join sys.[all_columns] allc ON tab.[object_id] = allc.[object_id] and idxc.[column_id] = allc.[column_id]\n WHERE tab.object_id = OBJECT_ID(:table)\n ORDER BY tab.[name], idx.[index_id], idxc.[index_column_id]\n ", [
':table' => $table_info['schema'] . '.' . $table_info['table'],
]);
foreach ($result as $index_column) {
if (!isset($info['indexes'][$index_column->index_name])) {
$ic = clone $index_column;
// Only retain index specific details.
unset($ic->column_name);
unset($ic->index_column_id);
unset($ic->is_descending_key);
unset($ic->table_name);
unset($ic->key_ordinal);
$info['indexes'][$index_column->index_name] = (array) $ic;
if ($index_column->is_primary_key) {
$info['primary_key_index'] = $ic->index_name;
}
}
$index =& $info['indexes'][$index_column->index_name];
$index['columns'][$index_column->key_ordinal] = [
'name' => $index_column->column_name,
'is_descending_key' => $index_column->is_descending_key,
'key_ordinal' => $index_column->key_ordinal,
];
// Every columns keeps track of what indexes it is part of.
$info['columns'][$index_column->column_name]['indexes'][] = $index_column->index_name;
if (isset($info['columns_clean'][$index_column->column_name])) {
$info['columns_clean'][$index_column->column_name]['indexes'][] = $index_column->index_name;
}
}
if ($this->cacheSchema) {
$this->columnInformation[$table] = $info;
}
return $info;
}
/**
* Unset cached table schema.
*/
public function resetColumnInformation($table) {
unset($this->columnInformation[$table]);
}
/**
* {@inheritdoc}
*/
public function createTable($name, $table) {
// Build the table and its unique keys in a transaction, and fail the whole
// creation in case of an error.
$transaction = $this->connection
->startTransaction();
parent::createTable($name, $table);
// If the spec had a primary key, set it now after all fields have been
// created. We are creating the keys after creating the table so that
// createPrimaryKey is able to introspect column definition from the
// database to calculate index sizes. This adds quite quite some overhead,
// but is only noticeable during table creation.
if (!empty($table['primary key']) && is_array($table['primary key'])) {
$this
->ensureNotNullPrimaryKey($table['primary key'], $table['fields']);
$this
->createPrimaryKey($name, $table['primary key']);
}
// Now all the unique keys.
if (isset($table['unique keys']) && is_array($table['unique keys'])) {
foreach ($table['unique keys'] as $key_name => $key) {
$this
->addUniqueKey($name, $key_name, $key);
}
}
unset($transaction);
// Create the indexes but ignore any error during the creation. We do that
// do avoid pulling the carpet under modules that try to implement indexes
// with invalid data types (long columns), before we come up with a better
// solution.
if (isset($table['indexes']) && is_array($table['indexes'])) {
foreach ($table['indexes'] as $key_name => $key) {
try {
$this
->addIndex($name, $key_name, $key);
} catch (\Exception $e) {
// Log the exception but do not rollback the transaction.
if ($this
->tableExists('watchdog')) {
watchdog_exception('database', $e);
}
}
}
}
}
/**
* Remove comments from an SQL statement.
*
* @param mixed $sql
* SQL statement to remove the comments from.
* @param mixed $comments
* Comments removed from the statement.
*
* @return string
* SQL statement without comments.
*
* @see http://stackoverflow.com/questions/9690448/regular-expression-to-remove-comments-from-sql-statement
*/
public function removeSqlComments($sql, &$comments = NULL) {
$sqlComments = '@(([\'"]).*?[^\\\\]\\2)|((?:\\#|--).*?$|/\\*(?:[^/*]|/(?!\\*)|\\*(?!/)|(?R))*\\*\\/)\\s*|(?<=;)\\s+@ms';
/* Commented version
$sqlComments = '@
(([\'"]).*?[^\\\]\2) # $1 : Skip single & double quoted expressions
|( # $3 : Match comments
(?:\#|--).*?$ # - Single line comments
| # - Multi line (nested) comments
/\* # . comment open marker
(?: [^/*] # . non comment-marker characters
|/(?!\*) # . ! not a comment open
|\*(?!/) # . ! not a comment close
|(?R) # . recursive case
)* # . repeat eventually
\*\/ # . comment close marker
)\s* # Trim after comments
|(?<=;)\s+ # Trim after semi-colon
@msx';
*/
$uncommentedSQL = trim(preg_replace($sqlComments, '$1', $sql));
if (is_array($comments)) {
preg_match_all($sqlComments, $sql, $comments);
$comments = array_filter($comments[3]);
}
return $uncommentedSQL;
}
/**
* Returns an array of current connection user options.
*
* Textsize 2147483647
* language us_english
* dateformat mdy
* datefirst 7
* lock_timeout -1
* quoted_identifier SET
* arithabort SET
* ansi_null_dflt_on SET
* ansi_warnings SET
* ansi_padding SET
* ansi_nulls SET
* concat_null_yields_null SET
* isolation level read committed.
*
* @return mixed
* User options.
*/
public function userOptions() {
$result = $this->connection
->queryDirect('DBCC UserOptions')
->fetchAllKeyed();
return $result;
}
/**
* Retrieve Engine Version information.
*
* @return array
* Engine version.
*/
public function engineVersion() {
if (!isset($this->engineVersion)) {
$this->engineVersion = $this->connection
->queryDirect(<<<EOF
SELECT CONVERT (varchar,SERVERPROPERTY('productversion')) AS VERSION,
CONVERT (varchar,SERVERPROPERTY('productlevel')) AS LEVEL,
CONVERT (varchar,SERVERPROPERTY('edition')) AS EDITION
EOF
)
->fetchAssoc();
}
return $this->engineVersion;
}
/**
* Retrieve Major Engine Version Number as integer.
*
* @return int
* Engine Version Number.
*/
public function engineVersionNumber() {
$version = $this
->EngineVersion();
$start = strpos($version['VERSION'], '.');
return intval(substr($version['VERSION'], 0, $start));
}
/**
* Find if a table function exists.
*
* @param string $function
* Name of the function.
*
* @return bool
* True if the function exists, false otherwise.
*/
public function functionExists($function) {
// FN = Scalar Function
// IF = Inline Table Function
// TF = Table Function
// FS | AF = Assembly (CLR) Scalar Function
// FT | AT = Assembly (CLR) Table Valued Function.
return $this->connection
->queryDirect("SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID('" . $function . "') AND type in (N'FN', N'IF', N'TF', N'FS', N'FT', N'AF')")
->fetchField() !== FALSE;
}
/**
* Check if CLR is enabled.
*
* Required to run GROUP_CONCAT.
*
* @return bool
* Is CLR enabled?
*/
public function clrEnabled() {
return $this->connection
->queryDirect("SELECT CONVERT(int, [value]) as [enabled] FROM sys.configurations WHERE name = 'clr enabled'")
->fetchField() !== 1;
}
/**
* Check if a column is of variable length.
*/
private function isVariableLengthType($type) {
$types = [
'nvarchar' => TRUE,
'ntext' => TRUE,
'varchar' => TRUE,
'varbinary' => TRUE,
'image' => TRUE,
];
return isset($types[$type]);
}
/**
* Load field spec.
*
* Retrieve an array of field specs from
* an array of field names.
*
* @param array $fields
* Table fields.
* @param mixed $table
* Table name.
*/
private function loadFieldsSpec(array $fields, $table) {
$result = [];
$info = $this
->queryColumnInformation($table);
foreach ($fields as $field) {
$result[$field] = $info['columns'][$field];
}
return $result;
}
/**
* Estimates the row size of a clustered index.
*
* @see https://msdn.microsoft.com/en-us/library/ms178085.aspx
*/
public function calculateClusteredIndexRowSizeBytes($table, $fields, $unique = TRUE) {
// The fields must already be in the database to retrieve their real size.
$info = $this
->queryColumnInformation($table);
// Specify the number of fixed-length and variable-length columns
// and calculate the space that is required for their storage.
$num_cols = count($fields);
$num_variable_cols = 0;
$max_var_size = 0;
$max_fixed_size = 0;
foreach ($fields as $field) {
if ($this
->isVariableLengthType($info['columns'][$field]['type'])) {
$num_variable_cols++;
$max_var_size += $info['columns'][$field]['max_length'];
}
else {
$max_fixed_size += $info['columns'][$field]['max_length'];
}
}
// If the clustered index is nonunique, account for the uniqueifier column.
if (!$unique) {
$num_cols++;
$num_variable_cols++;
$max_var_size += 4;
}
// Part of the row, known as the null bitmap, is reserved to manage column
// nullability. Calculate its size.
$null_bitmap = 2 + ($num_cols + 7) / 8;
// Calculate the variable-length data size.
$variable_data_size = empty($num_variable_cols) ? 0 : 2 + $num_variable_cols * 2 + $max_var_size;
// Calculate total row size.
$row_size = $max_fixed_size + $variable_data_size + $null_bitmap + 4;
return $row_size;
}
/**
* Recreate primary key.
*
* Drops the current primary key and creates
* a new one. If the previous primary key
* was an internal primary key, it tries to cleant it up.
*
* @param string $table
* Table name.
* @param mixed $fields
* Array of fields.
*/
protected function recreatePrimaryKey($table, $fields) {
// Drop the existing primary key if exists, if it was a TPK
// it will get completely dropped.
$this
->cleanUpPrimaryKey($table);
$this
->createPrimaryKey($table, $fields);
}
/**
* Create primary key.
*
* Create a Primary Key for the table, does not drop
* any prior primary keys neither it takes care of cleaning
* technical primary column. Only call this if you are sure
* the table does not currently hold a primary key.
*
* @param string $table
* Table name.
* @param mixed $fields
* Array of fields.
* @param int $limit
* Size limit.
*/
private function createPrimaryKey($table, $fields, $limit = 900) {
// To be on the safe side, on the most restrictive use case the limit
// for a primary key clustered index is of 128 bytes (usually 900).
// @see https://web.archive.org/web/20140510074940/http://blogs.msdn.com/b/jgalla/archive/2005/08/18/453189.aspx
// If that is going to be exceeded, use a computed column.
$csv_fields = $this
->createKeySql($fields);
$size = $this
->calculateClusteredIndexRowSizeBytes($table, $this
->createKeySql($fields, TRUE));
$result = [];
$index = FALSE;
// Add support for nullable columns in a primary key.
$nullable = FALSE;
$field_specs = $this
->loadFieldsSpec($fields, $table);
foreach ($field_specs as $field) {
if ($field['is_nullable'] == TRUE) {
$nullable = TRUE;
break;
}
}
if ($nullable || $size >= $limit) {
// Use a computed column instead, and create a custom index.
$result[] = self::COMPUTED_PK_COLUMN_NAME . " AS (CONVERT(VARCHAR(32), HASHBYTES('MD5', CONCAT('',{$csv_fields})), 2)) PERSISTED NOT NULL";
$result[] = "CONSTRAINT {{$table}}_pkey PRIMARY KEY CLUSTERED (" . self::COMPUTED_PK_COLUMN_NAME . ")";
$index = TRUE;
}
else {
$result[] = "CONSTRAINT {{$table}}_pkey PRIMARY KEY CLUSTERED ({$csv_fields})";
}
$this->connection
->queryDirect('ALTER TABLE [{' . $table . '}] ADD ' . implode(' ', $result));
$this
->resetColumnInformation($table);
// If we relied on a computed column for the Primary Key,
// at least index the fields with a regular index.
if ($index) {
$this
->addIndex($table, self::COMPUTED_PK_COLUMN_INDEX, $fields);
}
}
/**
* Create technical primary key index SQL.
*
* Create the SQL needed to add a new technical primary key based on a
* computed column.
*
* @param string $table
* Table name.
*
* @return string
* SQL string.
*/
private function createTechnicalPrimaryKeyIndexSql($table) {
$result = [];
$result[] = self::TECHNICAL_PK_COLUMN_NAME . " UNIQUEIDENTIFIER DEFAULT NEWID() NOT NULL";
$result[] = "CONSTRAINT {{$table}}_pkey_technical PRIMARY KEY CLUSTERED (" . self::TECHNICAL_PK_COLUMN_NAME . ")";
return implode(' ', $result);
}
/**
* Generate SQL to create a new table from a Drupal schema definition.
*
* @param string $name
* The name of the table to create.
* @param array $table
* A Schema API table definition array.
*
* @return array
* A collection of SQL statements to create the table.
*/
protected function createTableSql($name, array $table) {
$statements = [];
$sql_fields = [];
foreach ($table['fields'] as $field_name => $field) {
$sql_fields[] = $this
->createFieldSql($name, $field_name, $this
->processField($field));
if (isset($field['description'])) {
$statements[] = $this
->createCommentSQL($field['description'], $name, $field_name);
}
}
$sql = "CREATE TABLE {{$name}} (" . PHP_EOL;
$sql .= implode("," . PHP_EOL, $sql_fields);
$sql .= PHP_EOL . ")";
array_unshift($statements, $sql);
if (!empty($table['description'])) {
$statements[] = $this
->createCommentSql($table['description'], $name);
}
return $statements;
}
/**
* Create Field SQL.
*
* Create an SQL string for a field to be used in table creation or
* alteration.
*
* Before passing a field out of a schema definition into this
* function it has to be processed by _db_process_field().
*
* @param string $table
* The name of the table.
* @param string $name
* Name of the field.
* @param mixed $spec
* The field specification, as per the schema data structure format.
* @param bool $skip_checks
* Skip checks.
*
* @return string
* The SQL statement to create the field.
*/
protected function createFieldSql($table, $name, $spec, $skip_checks = FALSE) {
$sql = $this->connection
->escapeField($name) . ' ';
$sql .= $this
->createDataType($table, $name, $spec);
$sqlsrv_type = $spec['sqlsrv_type'];
$sqlsrv_type_native = $spec['sqlsrv_type_native'];
$is_text = in_array($sqlsrv_type_native, [
'char',
'varchar',
'text',
'nchar',
'nvarchar',
'ntext',
]);
if ($is_text === TRUE) {
// If collation is set in the spec array, use it.
// Otherwise use the database default.
if (isset($spec['binary'])) {
$default_collation = $this
->getCollation();
if ($spec['binary'] === TRUE) {
$sql .= ' COLLATE ' . preg_replace("/_C[IS]_/", "_CS_", $default_collation);
}
elseif ($spec['binary'] === FALSE) {
$sql .= ' COLLATE ' . preg_replace("/_C[IS]_/", "_CI_", $default_collation);
}
}
}
if (isset($spec['not null']) && $spec['not null']) {
$sql .= ' NOT NULL';
}
if (!$skip_checks) {
if (isset($spec['default'])) {
$default = $this
->defaultValueExpression($sqlsrv_type, $spec['default']);
$sql .= " CONSTRAINT {{$table}_{$name}_df} DEFAULT {$default}";
}
if (!empty($spec['identity'])) {
$sql .= ' IDENTITY';
}
if (!empty($spec['unsigned'])) {
$sql .= ' CHECK (' . $this->connection
->escapeField($name) . ' >= 0)';
}
}
return $sql;
}
/**
* Create the data type from a field specification.
*/
protected function createDataType($table, $name, $spec) {
$sqlsrv_type = $spec['sqlsrv_type'];
$sqlsrv_type_native = $spec['sqlsrv_type_native'];
$lengthable = in_array($sqlsrv_type_native, [
'char',
'varchar',
'nchar',
'nvarchar',
]);
if (!empty($spec['length']) && $lengthable) {
$length = $spec['length'];
if (is_int($length) && $this
->isUtf8()) {
// Do we need to check if this exceeds the max length?
// If so, use varchar(max).
$length *= 3;
}
return $sqlsrv_type_native . '(' . $length . ')';
}
elseif (in_array($sqlsrv_type_native, [
'numeric',
'decimal',
]) && isset($spec['precision']) && isset($spec['scale'])) {
// Maximum precision for SQL Server 2008 or greater is 38.
// For previous versions it's 28.
if ($spec['precision'] > 38) {
// Logs an error.
\Drupal::logger('sqlsrv')
->warning("Field '@field' in table '@table' has had it's precision dropped from @precision to 38", [
'@field' => $name,
'@table' => $table,
'@precision' => $spec['precision'],
]);
$spec['precision'] = 38;
}
return $sqlsrv_type_native . '(' . $spec['precision'] . ', ' . $spec['scale'] . ')';
}
else {
return $sqlsrv_type;
}
}
/**
* Get the SQL expression for a default value.
*
* @param string $sqlsr_type
* Database data type.
* @param mixed $default
* Default value.
*
* @return string
* An SQL Default expression.
*/
private function defaultValueExpression($sqlsr_type, $default) {
// The actual expression depends on the target data type as it might require
// conversions.
$result = is_string($default) ? $this->connection
->quote($default) : $default;
if (Utils::GetMSSQLType($sqlsr_type) == 'varbinary') {
$default = addslashes($default);
$result = "CONVERT({$sqlsr_type}, '{$default}')";
}
return $result;
}
/**
* Create key SQL.
*
* Returns a list of field names comma separated ready
* to be used in a SQL Statement.
*
* @param array $fields
* Array of field names.
* @param bool $as_array
* Return an array or a string?
*
* @return array|string
* The comma separated fields, or an array of fields
*/
protected function createKeySql(array $fields, $as_array = FALSE) {
$ret = [];
foreach ($fields as $field) {
if (is_array($field)) {
$ret[] = $field[0];
}
else {
$ret[] = $field;
}
}
if ($as_array) {
return $ret;
}
return implode(', ', $ret);
}
/**
* Returns the SQL needed to create an index.
*
* Supports XML indexes. Incomplete.
*
* @param string $table
* Table to create the index on.
* @param string $name
* Name of the index.
* @param array $fields
* Fields to be included in the Index.
* @param mixed $xml_field
* The xml field.
*
* @return string
* SQL string.
*/
protected function createIndexSql($table, $name, array $fields, $xml_field) {
// Get information about current columns.
$info = $this
->queryColumnInformation($table);
// Flatten $fields array if neccesary.
$fields = $this
->createKeySql($fields, TRUE);
// XML indexes can only have 1 column.
if (!empty($xml_field) && isset($fields[1])) {
throw new \Exception("Cannot include an XML field on a multiple column index.");
}
// No more than one XML index per table.
if ($xml_field && $this
->tableHasXmlIndex($table)) {
throw new \Exception("Only one primary clustered XML index is allowed per table.");
}
if (empty($xml_field)) {
$fields_csv = implode(', ', $fields);
return "CREATE INDEX {$name}_idx ON [{{$table}}] ({$fields_csv})";
}
else {
return "CREATE PRIMARY XML INDEX {$name}_idx ON [{{$table}}] ({$xml_field})";
}
}
/**
* Set database-engine specific properties for a field.
*
* @param mixed $field
* A field description array, as specified in the schema documentation.
*/
protected function processField($field) {
$field['size'] = $field['size'] ?? 'normal';
if (isset($field['type']) && ($field['type'] == 'serial' || $field['type'] == 'int') && isset($field['unsigned']) && $field['unsigned'] === TRUE && $field['size'] == 'normal') {
$field['size'] = 'big';
}
// Set the correct database-engine specific datatype.
// In case one is already provided, force it to lowercase.
if (isset($field['sqlsrv_type'])) {
$field['sqlsrv_type'] = mb_strtolower($field['sqlsrv_type']);
}
else {
$map = $this
->getFieldTypeMap();
$field['sqlsrv_type'] = $map[$field['type'] . ':' . $field['size']];
}
$field['sqlsrv_type_native'] = Utils::GetMSSQLType($field['sqlsrv_type']);
if (isset($field['type']) && $field['type'] == 'serial') {
$field['identity'] = TRUE;
}
return $field;
}
/**
* Compress Primary key Index.
*
* Sometimes the size of a table's primary key index needs
* to be reduced to allow for Primary XML Indexes.
*
* @param string $table
* Table name.
* @param int $limit
* Limit size.
*/
public function compressPrimaryKeyIndex($table, $limit = 900) {
// Introspect the schema and save the current primary key if the column
// we are modifying is part of it.
$primary_key_fields = $this
->introspectPrimaryKeyFields($table);
// SQL Server supports transactional DDL, so we can just start a transaction
// here and pray for the best.
$transaction = $this->connection
->startTransaction();
// Clear current Primary Key.
$this
->cleanUpPrimaryKey($table);
// Recreate the Primary Key with the given limit size.
$this
->createPrimaryKey($table, $primary_key_fields, $limit);
}
/**
* Return size information for current database.
*
* @return mixed
* Size info.
*/
public function getSizeInfo() {
$sql = <<<EOF
SELECT
DB_NAME(db.database_id) DatabaseName,
(CAST(mfrows.RowSize AS FLOAT)*8)/1024 RowSizeMB,
(CAST(mflog.LogSize AS FLOAT)*8)/1024 LogSizeMB,
(CAST(mfstream.StreamSize AS FLOAT)*8)/1024 StreamSizeMB,
(CAST(mftext.TextIndexSize AS FLOAT)*8)/1024 TextIndexSizeMB
FROM sys.databases db
LEFT JOIN (SELECT database_id, SUM(size) RowSize FROM sys.master_files WHERE type = 0 GROUP BY database_id, type) mfrows ON mfrows.database_id = db.database_id
LEFT JOIN (SELECT database_id, SUM(size) LogSize FROM sys.master_files WHERE type = 1 GROUP BY database_id, type) mflog ON mflog.database_id = db.database_id
LEFT JOIN (SELECT database_id, SUM(size) StreamSize FROM sys.master_files WHERE type = 2 GROUP BY database_id, type) mfstream ON mfstream.database_id = db.database_id
LEFT JOIN (SELECT database_id, SUM(size) TextIndexSize FROM sys.master_files WHERE type = 4 GROUP BY database_id, type) mftext ON mftext.database_id = db.database_id
WHERE DB_NAME(db.database_id) = :database
EOF;
// Database is defaulted from active connection.
$options = $this->connection
->getConnectionOptions();
$database = $options['database'];
return $this->connection
->query($sql, [
':database' => $database,
])
->fetchObject();
}
/**
* Get database information from sys.databases.
*
* @return mixed
* Database info.
*/
public function getDatabaseInfo() {
static $result;
if (isset($result)) {
return $result;
}
$sql = <<<EOF
select name
, db.snapshot_isolation_state
, db.snapshot_isolation_state_desc
, db.is_read_committed_snapshot_on
, db.recovery_model
, db.recovery_model_desc
, db.collation_name
from sys.databases db
WHERE DB_NAME(db.database_id) = :database
EOF;
// Database is defaulted from active connection.
$options = $this->connection
->getConnectionOptions();
$database = $options['database'];
$result = $this->connection
->queryDirect($sql, [
':database' => $database,
])
->fetchObject();
return $result;
}
/**
* Get the collation.
*
* Get the collation of current connection whether
* it has or not a database defined in it.
*
* @param string $table
* Table name.
* @param string $column
* Column name.
*
* @return string
* Collation type.
*/
public function getCollation($table = NULL, $column = NULL) {
// No table or column provided, then get info about
// database (if exists) or server default collation.
if (empty($table) && empty($column)) {
// Database is defaulted from active connection.
$options = $this->connection
->getConnectionOptions();
$database = $options['database'];
if (!empty($database)) {
// Default collation for specific table.
// CONVERT defaults to returning only 30 chars.
$sql = "SELECT CONVERT (varchar(50), DATABASEPROPERTYEX('{$database}', 'collation'))";
return $this->connection
->queryDirect($sql)
->fetchField();
}
else {
// Server default collation.
$sql = "SELECT SERVERPROPERTY ('collation') as collation";
return $this->connection
->queryDirect($sql)
->fetchField();
}
}
$sql = <<<EOF
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, COLLATION_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = ':schema'
AND TABLE_NAME = ':table'
AND COLUMN_NAME = ':column'
EOF;
$params = [];
$params[':schema'] = $this
->getDefaultSchema();
$params[':table'] = $table;
$params[':column'] = $column;
$result = $this->connection
->queryDirect($sql, $params)
->fetchObject();
return $result->COLLATION_NAME;
}
/**
* Get the list of fields participating in the Primary Key.
*
* @param string $table
* Table name.
*
* @return string[]
* Fields participating in the Primary Key.
*/
public function introspectPrimaryKeyFields($table) {
$data = $this
->queryColumnInformation($table);
// All primary keys have a default index,
// use that to see if we have a primary key
// before iterating.
if (!isset($data['primary_key_index']) || !isset($data['indexes'][$data['primary_key_index']])) {
return [];
}
$result = [];
$index = $data['indexes'][$data['primary_key_index']];
foreach ($index['columns'] as $column) {
if ($column['name'] != self::COMPUTED_PK_COLUMN_NAME) {
$result[$column['name']] = $column['name'];
}
// Get full column definition.
$c = $data['columns'][$column['name']];
// If this column depends on other columns
// the other columns are also part of the index!
// We don't support nested computed columns here.
foreach ($c['dependencies'] as $name => $order) {
$result[$name] = $name;
}
}
return $result;
}
/**
* Re-create keys associated to a table.
*/
protected function recreateTableKeys($table, $new_keys) {
if (isset($new_keys['primary key'])) {
$this
->addPrimaryKey($table, $new_keys['primary key']);
}
if (isset($new_keys['unique keys'])) {
foreach ($new_keys['unique keys'] as $name => $fields) {
$this
->addUniqueKey($table, $name, $fields);
}
}
if (isset($new_keys['indexes'])) {
foreach ($new_keys['indexes'] as $name => $fields) {
$this
->addIndex($table, $name, $fields);
}
}
}
/**
* Drop a constraint.
*
* @param string $table
* Table name.
* @param string $name
* Constraint name.
* @param bool $check
* Check if the constraint exists?
*/
public function dropConstraint($table, $name, $check = TRUE) {
// Check if constraint exists.
if ($check) {
// Do Something.
}
$sql = 'ALTER TABLE {' . $table . '} DROP CONSTRAINT [' . $name . ']';
$this->connection
->query($sql);
$this
->resetColumnInformation($table);
}
/**
* Drop the related objects of a column (indexes, constraints, etc.).
*
* @param mixed $table
* Table name.
* @param mixed $field
* Field name.
*/
protected function dropFieldRelatedObjects($table, $field) {
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
// Fetch the list of indexes referencing this column.
$sql = 'SELECT DISTINCT i.name FROM sys.columns c INNER JOIN sys.index_columns ic ON ic.object_id = c.object_id AND ic.column_id = c.column_id INNER JOIN sys.indexes i ON i.object_id = ic.object_id AND i.index_id = ic.index_id WHERE i.is_primary_key = 0 AND i.is_unique_constraint = 0 AND c.object_id = OBJECT_ID(:table) AND c.name = :name';
$indexes = $this->connection
->query($sql, [
':table' => $prefixInfo['table'],
':name' => $field,
]);
foreach ($indexes as $index) {
$this->connection
->query('DROP INDEX [' . $index->name . '] ON {' . $table . '}');
$this
->resetColumnInformation($table);
}
// Fetch the list of check constraints referencing this column.
$sql = 'SELECT DISTINCT cc.name FROM sys.columns c INNER JOIN sys.check_constraints cc ON cc.parent_object_id = c.object_id AND cc.parent_column_id = c.column_id WHERE c.object_id = OBJECT_ID(:table) AND c.name = :name';
$constraints = $this->connection
->query($sql, [
':table' => $prefixInfo['table'],
':name' => $field,
]);
foreach ($constraints as $constraint) {
$this
->dropConstraint($table, $constraint->name, FALSE);
}
// Fetch the list of default constraints referencing this column.
$sql = 'SELECT DISTINCT dc.name FROM sys.columns c INNER JOIN sys.default_constraints dc ON dc.parent_object_id = c.object_id AND dc.parent_column_id = c.column_id WHERE c.object_id = OBJECT_ID(:table) AND c.name = :name';
$constraints = $this->connection
->query($sql, [
':table' => $prefixInfo['table'],
':name' => $field,
]);
foreach ($constraints as $constraint) {
$this
->dropConstraint($table, $constraint->name, FALSE);
}
// Drop any indexes on related computed columns when we have some.
if ($this
->uniqueKeyExists($table, $field)) {
$this
->dropUniqueKey($table, $field);
}
// If this column is part of a computed primary key, drop the key.
$data = $this
->queryColumnInformation($table);
if (isset($data['columns'][self::COMPUTED_PK_COLUMN_NAME]['dependencies'][$field])) {
$this
->cleanUpPrimaryKey($table);
}
}
/**
* Return the name of the primary key of a table if it exists.
*
* @param mixed $table
* Table name.
*/
protected function primaryKeyName($table) {
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$sql = 'SELECT name FROM sys.key_constraints WHERE parent_object_id = OBJECT_ID(:table) AND type = :type';
return $this->connection
->query($sql, [
':table' => $prefixInfo['table'],
':type' => 'PK',
])
->fetchField();
}
/**
* Check if a key is a technical primary key.
*
* @param string $key_name
* Key name.
*/
protected function isTechnicalPrimaryKey($key_name) {
return $key_name && preg_match('/_pkey_technical$/', $key_name);
}
/**
* Is the database configured as UTF8 character encoding?
*/
protected function isUtf8() {
$collation = $this
->getCollation();
return stristr($collation, '_UTF8') !== FALSE;
}
/**
* Add a primary column to the table.
*
* @param mixed $table
* Table name.
*/
protected function createTechnicalPrimaryColumn($table) {
if (!$this
->fieldExists($table, self::TECHNICAL_PK_COLUMN_NAME)) {
$this->connection
->query("ALTER TABLE {{$table}} ADD " . self::TECHNICAL_PK_COLUMN_NAME . " UNIQUEIDENTIFIER DEFAULT NEWID() NOT NULL");
$this
->resetColumnInformation($table);
}
}
/**
* Drop the primary key constraint.
*
* @param mixed $table
* Table name.
*/
protected function cleanUpPrimaryKey($table) {
// We are droping the constraint, but not the column.
$existing_primary_key = $this
->primaryKeyName($table);
if ($existing_primary_key !== FALSE) {
$this
->dropConstraint($table, $existing_primary_key, FALSE);
}
// We are using computed columns to store primary keys,
// try to remove it if it exists.
if ($this
->fieldExists($table, self::COMPUTED_PK_COLUMN_NAME)) {
// The TCPK has compensation indexes that need to be cleared.
$this
->dropIndex($table, self::COMPUTED_PK_COLUMN_INDEX);
$this
->dropField($table, self::COMPUTED_PK_COLUMN_NAME);
}
// Try to get rid of the TPC.
$this
->cleanUpTechnicalPrimaryColumn($table);
}
/**
* Tries to clean up the technical primary column.
*
* It will be deleted if:
* (a) It is not being used as the current primary key and...
* (b) There is no unique constraint because they depend on this column
* (see addUniqueKey())
*
* @param string $table
* Table name.
*/
protected function cleanUpTechnicalPrimaryColumn($table) {
// Get the number of remaining unique indexes on the table, that
// are not primary keys and prune the technical primary column if possible.
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$sql = 'SELECT COUNT(*) FROM sys.indexes WHERE object_id = OBJECT_ID(:table) AND is_unique = 1 AND is_primary_key = 0';
$args = [
':table' => $prefixInfo['table'],
];
$unique_indexes = $this->connection
->query($sql, $args)
->fetchField();
$primary_key_is_technical = $this
->isTechnicalPrimaryKey($this
->primaryKeyName($table));
if (!$unique_indexes && !$primary_key_is_technical) {
$this
->dropField($table, self::TECHNICAL_PK_COLUMN_NAME);
}
}
/**
* Find if an unique key exists.
*
* @param mixed $table
* Table name.
* @param mixed $name
* Index name.
*
* @return bool
* Does the key exist?
*/
protected function uniqueKeyExists($table, $name) {
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
return (bool) $this->connection
->query('SELECT 1 FROM sys.indexes WHERE object_id = OBJECT_ID(:table) AND name = :name', [
':table' => $prefixInfo['table'],
':name' => $name . '_unique',
])
->fetchField();
}
/**
* Check if a table already has an XML index.
*
* @param string $table
* Table name.
*
* @return mixed
* Name if exists, else FALSE.
*/
public function tableHasXmlIndex($table) {
$info = $this
->queryColumnInformation($table);
if (isset($info['indexes']) && is_array($info['indexes'])) {
foreach ($info['indexes'] as $name => $index) {
if (strcasecmp($index['type_desc'], 'XML') == 0) {
return $name;
}
}
}
return FALSE;
}
/**
* Create an SQL statement to delete a comment.
*/
protected function deleteCommentSql($table = NULL, $column = NULL) {
$schema = $this
->getDefaultSchema();
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$prefixed_table = $prefixInfo['table'];
$sql = "EXEC sp_dropextendedproperty @name=N'MS_Description'";
$sql .= ",@level0type = N'Schema', @level0name = '" . $schema . "'";
if (isset($table)) {
$sql .= ",@level1type = N'Table', @level1name = '{$prefixed_table}'";
if (isset($column)) {
$sql .= ",@level2type = N'Column', @level2name = '{$column}'";
}
}
return $sql;
}
/**
* Create the SQL statement to add a new comment.
*/
protected function createCommentSql($value, $table = NULL, $column = NULL) {
$schema = $this
->getDefaultSchema();
$value = $this
->prepareComment($value);
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$prefixed_table = $prefixInfo['table'];
$sql = "EXEC sp_addextendedproperty @name=N'MS_Description', @value={$value}";
$sql .= ",@level0type = N'Schema', @level0name = '{$schema}'";
if (isset($table)) {
$sql .= ",@level1type = N'Table', @level1name = '{$prefixed_table}'";
if (isset($column)) {
$sql .= ",@level2type = N'Column', @level2name = '{$column}'";
}
}
return $sql;
}
/**
* Retrieve a table or column comment.
*/
public function getComment($table, $column = NULL) {
$prefixInfo = $this
->getPrefixInfo($table, TRUE);
$prefixed_table = $prefixInfo['table'];
$schema = $this
->getDefaultSchema();
$column_string = isset($column) ? "'Column','{$column}'" : "NULL,NULL";
$sql = "SELECT value FROM fn_listextendedproperty ('MS_Description','Schema','{$schema}','Table','{$prefixed_table}',{$column_string})";
$comment = $this->connection
->query($sql)
->fetchField();
return $comment;
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
Schema:: |
private | property | Should we cache table schema? | |
Schema:: |
private | property | Table schema. | |
Schema:: |
protected | property |
The database connection. Overrides Schema:: |
|
Schema:: |
protected | property |
Default schema for SQL Server databases. Overrides Schema:: |
|
Schema:: |
protected | property | Version information for the SQL Server engine. | |
Schema:: |
protected | property | The placeholder counter. | |
Schema:: |
protected | property | A unique identifier for this query object. | |
Schema:: |
public | function |
Add a new field to a table. Overrides Schema:: |
|
Schema:: |
public | function |
Add an index. Overrides Schema:: |
|
Schema:: |
public | function |
Add a primary key. Overrides Schema:: |
|
Schema:: |
public | function |
Add a unique key. Overrides Schema:: |
|
Schema:: |
protected | function | Build a condition to match a table name against a standard information_schema. | 1 |
Schema:: |
public | function | Estimates the row size of a clustered index. | |
Schema:: |
public | function |
Change a field definition. Overrides Schema:: |
|
Schema:: |
protected | function | Drop the primary key constraint. | |
Schema:: |
protected | function | Tries to clean up the technical primary column. | |
Schema:: |
public | function | Check if CLR is enabled. | |
Schema:: |
constant | Maximum length of a clustered index. | ||
Schema:: |
constant | Maximum length of a comment in SQL Server. | ||
Schema:: |
public | function | Compress Primary key Index. | |
Schema:: |
constant | Computed primary key index. | ||
Schema:: |
constant | Computed primary key name. | ||
Schema:: |
protected | function | Create the SQL statement to add a new comment. | |
Schema:: |
protected | function | Create the data type from a field specification. | |
Schema:: |
protected | function | Create Field SQL. | |
Schema:: |
protected | function | Returns the SQL needed to create an index. | |
Schema:: |
protected | function | Create key SQL. | |
Schema:: |
private | function | Create primary key. | |
Schema:: |
public | function |
Create a new table from a Drupal table definition. Overrides Schema:: |
|
Schema:: |
protected | function | Generate SQL to create a new table from a Drupal schema definition. | |
Schema:: |
protected | function | Add a primary column to the table. | |
Schema:: |
private | function | Create technical primary key index SQL. | |
Schema:: |
private | function | Get the SQL expression for a default value. | |
Schema:: |
protected | function | Create an SQL statement to delete a comment. | |
Schema:: |
public | function | Drop a constraint. | |
Schema:: |
public | function |
Should this be in a Transaction? Overrides Schema:: |
|
Schema:: |
protected | function | Drop the related objects of a column (indexes, constraints, etc.). | |
Schema:: |
public | function |
Drop an index. Overrides Schema:: |
|
Schema:: |
public | function |
Drop the primary key. Overrides Schema:: |
|
Schema:: |
public | function |
Drop a table. Overrides Schema:: |
|
Schema:: |
public | function |
Drop a unique key. Overrides Schema:: |
|
Schema:: |
public | function | Drupal specific functions. | |
Schema:: |
public | function | Retrieve Engine Version information. | |
Schema:: |
public | function | Retrieve Major Engine Version Number as integer. | |
Schema:: |
protected | function | Ensures that all the primary key fields are correctly defined. | |
Schema:: |
protected | function | Return an escaped version of its parameter to be used as a default value on a column. | |
Schema:: |
public | function |
Check if a column exists in the given table. Overrides Schema:: |
|
Schema:: |
public | function | Return an array of field names from an array of key/index column specifiers. | |
Schema:: |
public | function | ||
Schema:: |
public | function | ||
Schema:: |
protected | function |
Finds the primary key columns of a table, from the database. Overrides Schema:: |
|
Schema:: |
public | function | Finds all tables that are like the specified base table name. | 2 |
Schema:: |
public | function | Find if a table function exists. | |
Schema:: |
public | function | Get the collation. | |
Schema:: |
public | function | Retrieve a table or column comment. | |
Schema:: |
public | function | Get database information from sys.databases. | |
Schema:: |
public | function | Return active default Schema. | |
Schema:: |
public | function |
Returns a mapping of Drupal schema field names to DB-native field types. Overrides Schema:: |
|
Schema:: |
protected | function | Get information about the table name and schema from the prefix. | 1 |
Schema:: |
public | function | Return size information for current database. | |
Schema:: |
public | function |
Checks if an index exists in the given table. Overrides Schema:: |
|
Schema:: |
public | function |
Finds the columns for the primary key, unique keys and indexes of a table. Overrides Schema:: |
|
Schema:: |
public | function | Get the list of fields participating in the Primary Key. | |
Schema:: |
protected | function | Check if a key is a technical primary key. | |
Schema:: |
protected | function | Is the database configured as UTF8 character encoding? | |
Schema:: |
private | function | Check if a column is of variable length. | |
Schema:: |
private | function | Load field spec. | |
Schema:: |
public | function |
Returns the next placeholder ID for the query. Overrides PlaceholderInterface:: |
|
Schema:: |
constant | Maximum length of a non-clustered index. | ||
Schema:: |
public | function | Create names for indexes, primary keys and constraints. | |
Schema:: |
public | function | Prepare a table or column comment for database query. | 1 |
Schema:: |
protected | function | Return the name of the primary key of a table if it exists. | |
Schema:: |
constant | Maximum length of a Primary Key. | ||
Schema:: |
protected | function | Set database-engine specific properties for a field. | |
Schema:: |
public | function | Database introspection: fetch technical information about a table. | |
Schema:: |
protected | function | Recreate primary key. | |
Schema:: |
protected | function | Re-create keys associated to a table. | |
Schema:: |
public | function | Remove comments from an SQL statement. | |
Schema:: |
public | function |
Rename a table. Overrides Schema:: |
|
Schema:: |
public | function | Unset cached table schema. | |
Schema:: |
public | function |
Temporary tables and regular tables cannot be verified in the same way. Overrides Schema:: |
|
Schema:: |
public | function | Check if a table already has an XML index. | |
Schema:: |
constant | Technical primary key name. | ||
Schema:: |
public | function |
Returns a unique identifier for this object. Overrides PlaceholderInterface:: |
|
Schema:: |
protected | function | Find if an unique key exists. | |
Schema:: |
public | function | Returns an array of current connection user options. | |
Schema:: |
constant | Maximum index length with XML field. | ||
Schema:: |
public | function | Implements the magic __clone function. | |
Schema:: |
public | function |
Adding abilty to pass schema in configuration. Overrides Schema:: |