sqlmap.inc in Migrate 7.2
Same filename and directory in other branches
Defines a Drupal db-based implementation of MigrateMap.
File
plugins/sources/sqlmap.incView source
<?php
/**
* @file
* Defines a Drupal db-based implementation of MigrateMap.
*/
class MigrateSQLMap extends MigrateMap {
/**
* Names of tables created for tracking the migration.
*
* @var string
*/
protected $mapTable, $messageTable;
public function getMapTable() {
return $this->mapTable;
}
public function getMessageTable() {
return $this->messageTable;
}
/**
* Qualifying the map table name with the database name makes cross-db joins
* possible. Note that, because prefixes are applied after we do this (i.e.,
* it will prefix the string we return), we do not qualify the table if it has
* a prefix. This will work fine when the source data is in the default
* (prefixed) database (in particular, for simpletest), but not if the primary
* query is in an external database.
*
* @return string
*/
public function getQualifiedMapTable() {
$options = $this->connection
->getConnectionOptions();
$prefix = $this->connection
->tablePrefix($this->mapTable);
if ($prefix) {
return $this->mapTable;
}
else {
$dbname = $options['database'];
if (!empty($options['driver']) && $options['driver'] == 'mysql') {
// Backtick the db name on mysql to ensure dbs with hyphens work.
$dbname = "`{$dbname}`";
}
return $dbname . '.' . $this->mapTable;
}
}
/**
* sourceKey and destinationKey arrays are keyed by the field names; values
* are the Drupal schema definition for the field.
*
* @var array
*/
public function getSourceKey() {
return $this->sourceKey;
}
public function getDestinationKey() {
return $this->destinationKey;
}
/**
* Drupal connection object on which to create the map/message tables.
*
* @var DatabaseConnection
*/
protected $connection;
public function getConnection() {
return $this->connection;
}
/**
* We don't need to check the tables more than once per request.
*
* @var boolean
*/
protected $ensured;
/**
* Provide caching for Source or Desination Map Lookups.
*/
protected $cacheMapLookups;
/**
* Constructor.
*
* @param string $machine_name
* The unique reference to the migration that we are mapping.
* @param array $source_key
* The database schema for the source key.
* @param array $destination_key
* The database schema for the destination key.
* @param string $connection_key
* Optional - The connection used to create the mapping tables. By default
* this is the destination (Drupal). If it's not possible to make joins
* between the destination database and your source database you can specify
* a different connection to create the mapping tables on.
* @param array $options
* Optional - Options applied to this source.
*/
public function __construct($machine_name, array $source_key, array $destination_key, $connection_key = 'default', $options = array()) {
if (isset($options['track_last_imported'])) {
$this->trackLastImported = TRUE;
}
if (isset($options['cache_map_lookups'])) {
$this->cacheMapLookups = $options['cache_map_lookups'];
}
else {
$this->cacheMapLookups = FALSE;
}
$this->connection = Database::getConnection('default', $connection_key);
// Default generated table names, limited to 63 characters.
$prefixLength = strlen($this->connection
->tablePrefix());
$this->mapTable = 'migrate_map_' . drupal_strtolower($machine_name);
$this->mapTable = drupal_substr($this->mapTable, 0, 63 - $prefixLength);
$this->messageTable = 'migrate_message_' . drupal_strtolower($machine_name);
$this->messageTable = drupal_substr($this->messageTable, 0, 63 - $prefixLength);
$this->sourceKey = $source_key;
$this->destinationKey = $destination_key;
// Build the source and destination key maps.
$this->sourceKeyMap = array();
$count = 1;
foreach ($source_key as $field => $schema) {
$this->sourceKeyMap[$field] = 'sourceid' . $count++;
}
$this->destinationKeyMap = array();
$count = 1;
foreach ($destination_key as $field => $schema) {
$this->destinationKeyMap[$field] = 'destid' . $count++;
}
$this
->ensureTables();
}
/**
* Create the map and message tables if they don't already exist.
*/
protected function ensureTables() {
if (!$this->ensured) {
if (!$this->connection
->schema()
->tableExists($this->mapTable)) {
// Generate appropriate schema info for the map and message tables,
// and map from the source field names to the map/msg field names.
$count = 1;
$source_key_schema = array();
$pks = array();
foreach ($this->sourceKey as $field_schema) {
$mapkey = 'sourceid' . $count++;
$source_key_schema[$mapkey] = $field_schema;
$source_key_schema[$mapkey]['not null'] = TRUE;
$pks[] = $mapkey;
}
$fields = $source_key_schema;
// Add destination keys to map table.
// TODO: How do we discover the destination schema?
$count = 1;
foreach ($this->destinationKey as $field_schema) {
// Allow dest key fields to be NULL (for IGNORED/FAILED cases).
$field_schema['not null'] = FALSE;
$field_schema['default'] = NULL;
$mapkey = 'destid' . $count++;
$fields[$mapkey] = $field_schema;
}
$fields['needs_update'] = array(
'type' => 'int',
'size' => 'tiny',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => MigrateMap::STATUS_IMPORTED,
'description' => 'Indicates current status of the source row',
);
$fields['rollback_action'] = array(
'type' => 'int',
'size' => 'tiny',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => MigrateMap::ROLLBACK_DELETE,
'description' => 'Flag indicating what to do for this item on rollback',
);
$fields['last_imported'] = array(
'type' => 'int',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => 0,
'description' => 'UNIX timestamp of the last time this row was imported',
);
$fields['hash'] = array(
'type' => 'varchar',
'length' => '32',
'not null' => FALSE,
'description' => 'Hash of source row data, for detecting changes',
);
$schema = array(
'description' => t('Mappings from source key to destination key'),
'fields' => $fields,
'primary key' => $pks,
);
$this->connection
->schema()
->createTable($this->mapTable, $schema);
// Now for the message table
$fields = array();
$fields['msgid'] = array(
'type' => 'serial',
'unsigned' => TRUE,
'not null' => TRUE,
);
$fields += $source_key_schema;
$fields['level'] = array(
'type' => 'int',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => 1,
);
$fields['message'] = array(
'type' => 'text',
'size' => 'medium',
'not null' => TRUE,
);
$schema = array(
'description' => t('Messages generated during a migration process'),
'fields' => $fields,
'primary key' => array(
'msgid',
),
'indexes' => array(
'sourcekey' => $pks,
),
);
$this->connection
->schema()
->createTable($this->messageTable, $schema);
}
else {
// Add any missing columns to the map table.
if (!$this->connection
->schema()
->fieldExists($this->mapTable, 'rollback_action')) {
$this->connection
->schema()
->addField($this->mapTable, 'rollback_action', array(
'type' => 'int',
'size' => 'tiny',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => 0,
'description' => 'Flag indicating what to do for this item on rollback',
));
}
if (!$this->connection
->schema()
->fieldExists($this->mapTable, 'hash')) {
$this->connection
->schema()
->addField($this->mapTable, 'hash', array(
'type' => 'varchar',
'length' => '32',
'not null' => FALSE,
'description' => 'Hash of source row data, for detecting changes',
));
}
}
$this->ensured = TRUE;
}
}
/**
* Retrieve a row from the map table, given a source ID.
*
* @param array $source_id
*/
public function getRowBySource(array $source_id) {
migrate_instrument_start('mapRowBySource');
$query = $this->connection
->select($this->mapTable, 'map')
->fields('map');
foreach ($this->sourceKeyMap as $key_name) {
$query = $query
->condition("map.{$key_name}", array_shift($source_id), '=');
}
$result = $query
->execute();
migrate_instrument_stop('mapRowBySource');
return $result
->fetchAssoc();
}
/**
* Retrieve a row from the map table, given a destination ID.
*
* @param array $source_id
*/
public function getRowByDestination(array $destination_id) {
migrate_instrument_start('getRowByDestination');
$query = $this->connection
->select($this->mapTable, 'map')
->fields('map');
foreach ($this->destinationKeyMap as $key_name) {
$query = $query
->condition("map.{$key_name}", array_shift($destination_id), '=');
}
$result = $query
->execute();
migrate_instrument_stop('getRowByDestination');
return $result
->fetchAssoc();
}
/**
* Retrieve an array of map rows marked as needing update.
*
* @param int $count
* Maximum rows to return; defaults to 10,000.
*
* @return array
* Array of map row objects with needs_update==1.
*/
public function getRowsNeedingUpdate($count) {
$rows = array();
$result = $this->connection
->select($this->mapTable, 'map')
->fields('map')
->condition('needs_update', MigrateMap::STATUS_NEEDS_UPDATE)
->range(0, $count)
->execute();
foreach ($result as $row) {
$rows[] = $row;
}
return $rows;
}
/**
* Given a (possibly multi-field) destination key, return the (possibly
* multi-field) source key mapped to it.
*
* @param array $destination_id
* Array of destination key values.
*
* @return array
* Array of source key values, or NULL on failure.
*/
public function lookupSourceID(array $destination_id) {
migrate_instrument_start('lookupSourceID');
// Try a cache lookup if enabled.
if ($this->cacheMapLookups) {
$cache =& drupal_static($this->mapTable . '_sourceIDCache');
$serialized = json_encode($destination_id);
if (isset($cache[$serialized])) {
migrate_instrument_stop('lookupSourceID');
return $cache[$serialized];
}
}
$query = $this->connection
->select($this->mapTable, 'map')
->fields('map', $this->sourceKeyMap);
foreach ($this->destinationKeyMap as $key_name) {
$query = $query
->condition("map.{$key_name}", array_shift($destination_id), '=');
}
$result = $query
->execute();
$source_id = $result
->fetchAssoc();
// Store the id in a cache if enabled.
if ($this->cacheMapLookups) {
$cache[$serialized] = $destination_id;
}
migrate_instrument_stop('lookupSourceID');
return $source_id;
}
/**
* Given a (possibly multi-field) source key, return the (possibly
* multi-field) destination key it is mapped to.
*
* @param array $source_id
* Array of source key values.
*
* @return array
* Array of destination key values, or NULL on failure.
*/
public function lookupDestinationID(array $source_id) {
migrate_instrument_start('lookupDestinationID');
// Try a cache lookup if enabled.
if ($this->cacheMapLookups) {
$cache =& drupal_static($this->mapTable . '_destinationIDCache');
$serialized = json_encode($source_id);
if (isset($cache[$serialized])) {
migrate_instrument_stop('lookupDestinationID');
return $cache[$serialized];
}
}
$query = $this->connection
->select($this->mapTable, 'map')
->fields('map', $this->destinationKeyMap);
foreach ($this->sourceKeyMap as $key_name) {
$query = $query
->condition("map.{$key_name}", array_shift($source_id), '=');
}
$result = $query
->execute();
$destination_id = $result
->fetchAssoc();
// Store the id in a cache if enabled.
if ($this->cacheMapLookups) {
$cache[$serialized] = $destination_id;
}
migrate_instrument_stop('lookupDestinationID');
return $destination_id;
}
/**
* Called upon import of one record, we record a mapping from the source key
* to the destination key. Also may be called, setting the third parameter to
* NEEDS_UPDATE, to signal an existing record should be remigrated.
*
* @param stdClass $source_row
* The raw source data. We use the key map derived from the source object
* to get the source key values.
* @param array $dest_ids
* The destination key values.
* @param int $needs_update
* Status of the source row in the map. Defaults to STATUS_IMPORTED.
* @param int $rollback_action
* How to handle the destination object on rollback. Defaults to
* ROLLBACK_DELETE.
* $param string $hash
* If hashing is enabled, the hash of the raw source row.
*/
public function saveIDMapping(stdClass $source_row, array $dest_ids, $needs_update = MigrateMap::STATUS_IMPORTED, $rollback_action = MigrateMap::ROLLBACK_DELETE, $hash = NULL) {
migrate_instrument_start('saveIDMapping');
// Construct the source key.
$keys = array();
foreach ($this->sourceKeyMap as $field_name => $key_name) {
// A NULL key value will fail.
if (is_null($source_row->{$field_name})) {
Migration::displayMessage(t('Could not save to map table due to NULL value for key field !field', array(
'!field' => $field_name,
)));
migrate_instrument_stop('saveIDMapping');
return;
}
$keys[$key_name] = $source_row->{$field_name};
}
$fields = array(
'needs_update' => (int) $needs_update,
'rollback_action' => (int) $rollback_action,
'hash' => $hash,
);
$count = 1;
if (!empty($dest_ids)) {
foreach ($dest_ids as $dest_id) {
$fields['destid' . $count++] = $dest_id;
}
}
if ($this->trackLastImported) {
$fields['last_imported'] = time();
}
$this->connection
->merge($this->mapTable)
->key($keys)
->fields($fields)
->execute();
migrate_instrument_stop('saveIDMapping');
}
/**
* Record a message in the migration's message table.
*
* @param array $source_key
* Source ID of the record in error.
* @param string $message
* The message to record.
* @param int $level
* Optional message severity (defaults to MESSAGE_ERROR).
*/
public function saveMessage($source_key, $message, $level = Migration::MESSAGE_ERROR) {
// Source IDs as arguments.
$count = 1;
if (is_array($source_key)) {
foreach ($source_key as $key_value) {
$fields['sourceid' . $count++] = $key_value;
// If any key value is not set, we can't save - print out and abort.
if (!isset($key_value)) {
Migration::displayMessage($message);
return;
}
}
$fields['level'] = $level;
$fields['message'] = $message;
$this->connection
->insert($this->messageTable)
->fields($fields)
->execute();
}
else {
// TODO: What else can we do?
// Display Message with correct level instead of default level 'error'.
MigrationBase::saveMessage($message, $level);
}
}
/**
* Prepares this migration to run as an update - that is, in addition to
* unmigrated content (source records not in the map table) being imported,
* previously-migrated content will also be updated in place.
*/
public function prepareUpdate() {
$this->connection
->update($this->mapTable)
->fields(array(
'needs_update' => MigrateMap::STATUS_NEEDS_UPDATE,
))
->execute();
}
/**
* Returns a count of records in the map table (i.e., the number of
* source records which have been processed for this migration).
*
* @return int
*/
public function processedCount() {
$query = $this->connection
->select($this->mapTable);
$query
->addExpression('COUNT(*)', 'count');
$count = $query
->execute()
->fetchField();
return $count;
}
/**
* Returns a count of imported records in the map table.
*
* @return int
*/
public function importedCount() {
$query = $this->connection
->select($this->mapTable);
$query
->addExpression('COUNT(*)', 'count');
$query
->condition('needs_update', array(
MigrateMap::STATUS_IMPORTED,
MigrateMap::STATUS_NEEDS_UPDATE,
), 'IN');
$count = $query
->execute()
->fetchField();
return $count;
}
/**
* Returns a count of records which are marked as needing update.
*
* @return int
*/
public function updateCount() {
$query = $this->connection
->select($this->mapTable);
$query
->addExpression('COUNT(*)', 'count');
$query
->condition('needs_update', MigrateMap::STATUS_NEEDS_UPDATE);
$count = $query
->execute()
->fetchField();
return $count;
}
/**
* Get the number of source records which failed to import.
*
* @return int
* Number of records errored out.
*/
public function errorCount() {
$query = $this->connection
->select($this->mapTable);
$query
->addExpression('COUNT(*)', 'count');
$query
->condition('needs_update', MigrateMap::STATUS_FAILED);
$count = $query
->execute()
->fetchField();
return $count;
}
/**
* Get the number of messages saved.
*
* @return int
* Number of messages.
*/
public function messageCount() {
$query = $this->connection
->select($this->messageTable);
$query
->addExpression('COUNT(*)', 'count');
$count = $query
->execute()
->fetchField();
return $count;
}
/**
* Delete the map entry and any message table entries for the specified
* source row.
*
* @param array $source_key
*/
public function delete(array $source_key, $messages_only = FALSE) {
if (!$messages_only) {
$map_query = $this->connection
->delete($this->mapTable);
}
$message_query = $this->connection
->delete($this->messageTable);
$count = 1;
foreach ($source_key as $key_value) {
if (!$messages_only) {
$map_query
->condition('sourceid' . $count, $key_value);
}
$message_query
->condition('sourceid' . $count, $key_value);
$count++;
}
if (!$messages_only) {
$map_query
->execute();
}
$message_query
->execute();
}
/**
* Delete the map entry and any message table entries for the specified
* destination row.
*
* @param array $destination_key
*/
public function deleteDestination(array $destination_key) {
$map_query = $this->connection
->delete($this->mapTable);
$message_query = $this->connection
->delete($this->messageTable);
$source_key = $this
->lookupSourceID($destination_key);
if (!empty($source_key)) {
$count = 1;
foreach ($destination_key as $key_value) {
$map_query
->condition('destid' . $count, $key_value);
$count++;
}
$map_query
->execute();
$count = 1;
foreach ($source_key as $key_value) {
$message_query
->condition('sourceid' . $count, $key_value);
$count++;
}
$message_query
->execute();
}
}
/**
* Set the specified row to be updated, if it exists.
*/
public function setUpdate(array $source_key) {
$query = $this->connection
->update($this->mapTable)
->fields(array(
'needs_update' => MigrateMap::STATUS_NEEDS_UPDATE,
));
$count = 1;
foreach ($source_key as $key_value) {
$query
->condition('sourceid' . $count++, $key_value);
}
$query
->execute();
}
/**
* Delete all map and message table entries specified.
*
* @param array $source_keys
* Each array member is an array of key fields for one source row.
*/
public function deleteBulk(array $source_keys) {
// If we have a single-column key, we can shortcut it.
if (count($this->sourceKey) == 1) {
$sourceids = array();
foreach ($source_keys as $source_key) {
$sourceids[] = $source_key;
}
$this->connection
->delete($this->mapTable)
->condition('sourceid1', $sourceids, 'IN')
->execute();
$this->connection
->delete($this->messageTable)
->condition('sourceid1', $sourceids, 'IN')
->execute();
}
else {
foreach ($source_keys as $source_key) {
$map_query = $this->connection
->delete($this->mapTable);
$message_query = $this->connection
->delete($this->messageTable);
$count = 1;
foreach ($source_key as $key_value) {
$map_query
->condition('sourceid' . $count, $key_value);
$message_query
->condition('sourceid' . $count++, $key_value);
}
$map_query
->execute();
$message_query
->execute();
}
}
}
/**
* Clear all messages from the message table.
*/
public function clearMessages() {
$this->connection
->truncate($this->messageTable)
->execute();
}
/**
* Remove the associated map and message tables.
*/
public function destroy() {
$this->connection
->schema()
->dropTable($this->mapTable);
$this->connection
->schema()
->dropTable($this->messageTable);
}
protected $result = NULL;
protected $currentRow = NULL;
protected $currentKey = array();
public function getCurrentKey() {
return $this->currentKey;
}
/**
* Implementation of Iterator::rewind() - called before beginning a foreach
* loop. TODO: Support idlist, itemlimit.
*/
public function rewind() {
$this->currentRow = NULL;
$fields = array();
foreach ($this->sourceKeyMap as $field) {
$fields[] = $field;
}
foreach ($this->destinationKeyMap as $field) {
$fields[] = $field;
}
$this->result = $this->connection
->select($this->mapTable, 'map')
->fields('map', $fields)
->execute();
$this
->next();
}
/**
* Implementation of Iterator::current() - called when entering a loop
* iteration, returning the current row.
*/
public function current() {
return $this->currentRow;
}
/**
* Implementation of Iterator::key - called when entering a loop iteration,
* returning the key of the current row. It must be a scalar - we will
* serialize to fulfill the requirement, but using getCurrentKey() is
* preferable.
*/
public function key() {
return serialize($this->currentKey);
}
/**
* Implementation of Iterator::next() - called at the bottom of the loop
* implicitly, as well as explicitly from rewind().
*/
public function next() {
$this->currentRow = $this->result
->fetchObject();
$this->currentKey = array();
if (!is_object($this->currentRow)) {
$this->currentRow = NULL;
}
else {
foreach ($this->sourceKeyMap as $map_field) {
$this->currentKey[$map_field] = $this->currentRow->{$map_field};
// Leave only destination fields.
unset($this->currentRow->{$map_field});
}
}
}
/**
* Implementation of Iterator::valid() - called at the top of the loop,
* returning TRUE to process the loop and FALSE to terminate it.
*/
public function valid() {
// TODO: Check numProcessed against itemlimit
return !is_null($this->currentRow);
}
}
Classes
Name | Description |
---|---|
MigrateSQLMap | @file Defines a Drupal db-based implementation of MigrateMap. |