migration.inc in Migrate 6.2
Same filename and directory in other branches
Defines the base class for import/rollback processes.
File
includes/migration.incView source
<?php
/**
* @file
* Defines the base class for import/rollback processes.
*/
/**
* The base class for all import objects. This is where most of the smarts
* of the migrate module resides. Migrations are created by deriving from this
* class, and in the constructor (after calling parent::__construct()) initializing
* at a minimum the name, description, source, and destination properties. The constructor
* will also usually make several calls to addFieldMapping().
*/
abstract class Migration extends MigrationBase {
/**
* Source object for the migration, derived from MigrateSource.
*
* @var MigrateSource
*/
protected $source;
public function getSource() {
return $this->source;
}
/**
* Destination object for the migration, derived from MigrateDestination.
*
* @var MigrateDestination
*/
protected $destination;
public function getDestination() {
return $this->destination;
}
/**
* Map object tracking relationships between source and destination data
*
* @var MigrateMap
*/
protected $map;
public function getMap() {
return $this->map;
}
/**
* Indicate whether the primary system of record for this migration is the
* source, or the destination (Drupal). In the source case, migration of
* an existing object will completely replace the Drupal object with data from
* the source side. In the destination case, the existing Drupal object will
* be loaded, then changes from the source applied; also, rollback will not be
* supported.
*
* @var int
*/
const SOURCE = 1;
const DESTINATION = 2;
protected $systemOfRecord = Migration::SOURCE;
public function getSystemOfRecord() {
return $this->systemOfRecord;
}
/**
* Specify value of needs_update for current map row. Usually set by
* MigrateFieldHandler implementations.
*
* @var int
*/
public $needsUpdate = MigrateMap::STATUS_IMPORTED;
/**
* The default rollback action for this migration. Can be overridden on
* a per-row basis by setting $row->rollbackAction in prepareRow().
*
* @var int
*/
protected $defaultRollbackAction = MigrateMap::ROLLBACK_DELETE;
/**
* The rollback action to be saved for the current row.
*
* @var int
*/
public $rollbackAction;
/**
* Simple mappings between destination fields (keys) and source fields (values).
*
* @var array
*/
protected $fieldMappings = array();
public function getFieldMappings() {
return $this->fieldMappings;
}
/**
* An array of counts. Initially used for cache hit/miss tracking.
*
* @var array
*/
protected $counts = array();
/**
* When performing a bulkRollback(), the maximum number of items to pass in
* a single call. Can be overridden in derived class constructor.
*
* @var int
*/
protected $rollbackBatchSize = 50;
/**
* If present, an array with keys name and alias (optional). Name refers to
* the source columns used for tracking highwater marks. alias is an
* optional table alias.
*
* @var array
*/
protected $highwaterField = array();
public function getHighwaterField() {
return $this->highwaterField;
}
protected $currentSourceKey;
/**
* The object currently being constructed
* @var stdClass
*/
protected $destinationValues;
/**
* The current data row retrieved from the source.
* @var stdClass
*/
protected $sourceValues;
/**
* General initialization of a Migration object.
*/
public function __construct($group = NULL) {
parent::__construct($group);
}
/**
* Deregister a migration - remove all traces of it from the database (without
* touching any content which was created by this migration).
*
* We'd like to do this at uninstall time, but the implementing module is
* already disabled, so we can't instantiate it to get at the map. This can
* be done in hook_disable(), however.
*
* @param string $machine_name
*/
public static function deregisterMigration($machine_name) {
try {
// Remove map and message tables
$migration = self::getInstance($machine_name);
$migration->map
->destroy();
// TODO: Clear log entries? Or keep for historical purposes?
// Call the parent deregistration (which clears migrate_status) last, the
// above will reference it.
parent::deregisterMigration($machine_name);
} catch (Exception $e) {
// Fail silently if it's already gone
}
}
////////////////////////////////////////////////////////////////////
// Processing
/**
* Add a mapping for a destination field, specifying a source field and/or
* a default value.
*
* @param string $destinationField
* Name of the destination field.
* @param string $sourceField
* Name of the source field (optional).
* @param boolean $warn_on_override
* Set to FALSE to prevent warnings when there's an existing mapping
* for this destination field.
*/
public function addFieldMapping($destination_field, $source_field = NULL, $warn_on_override = TRUE) {
// Warn of duplicate mappings
if ($warn_on_override && !is_null($destination_field) && isset($this->fieldMappings[$destination_field])) {
self::displayMessage(t('!name addFieldMapping: !dest was previously mapped from !source, overridden', array(
'!name' => $this->machineName,
'!dest' => $destination_field,
'!source' => $this->fieldMappings[$destination_field]
->getSourceField(),
)), 'warning');
}
$mapping = new MigrateFieldMapping($destination_field, $source_field);
if (is_null($destination_field)) {
$this->fieldMappings[] = $mapping;
}
else {
$this->fieldMappings[$destination_field] = $mapping;
}
return $mapping;
}
/**
* Remove any existing mappings for a given destination or source field.
*
* @param string $destination_field
* Name of the destination field.
* @param string $source_field
* Name of the source field.
*/
public function removeFieldMapping($destination_field, $source_field = NULL) {
if (isset($destination_field)) {
unset($this->fieldMappings[$destination_field]);
}
if (isset($source_field)) {
foreach ($this->fieldMappings as $key => $mapping) {
if ($mapping
->getSourceField() == $source_field) {
unset($this->fieldMappings[$key]);
}
}
}
}
/**
* Shortcut for adding several fields which have the same name on both source
* and destination sides.
*
* @param array $fields
* List of field names to map.
*/
public function addSimpleMappings(array $fields) {
foreach ($fields as $field) {
$this
->addFieldMapping($field, $field);
}
}
/**
* Shortcut for adding several destination fields which are to be explicitly
* not migrated.
*
* @param array $fields
* List of fields to mark as not for migration.
*
* @param string $issue_group
* Issue group name to apply to the generated mappings (defaults to 'DNM').
*/
public function addUnmigratedDestinations(array $fields, $issue_group = NULL) {
if (!$issue_group) {
$issue_group = t('DNM');
}
foreach ($fields as $field) {
$this
->addFieldMapping($field)
->issueGroup($issue_group);
}
}
/**
* Shortcut for adding several source fields which are to be explicitly
* not migrated.
*
* @param array $fields
* List of fields to mark as not for migration.
*
* @param string $issue_group
* Issue group name to apply to the generated mappings (defaults to 'DNM').
*/
public function addUnmigratedSources(array $fields, $issue_group = NULL) {
if (!$issue_group) {
$issue_group = t('DNM');
}
foreach ($fields as $field) {
$this
->addFieldMapping(NULL, $field)
->issueGroup($issue_group);
}
}
/**
* Reports whether this migration process is complete (i.e., all available
* source rows have been processed).
*/
public function isComplete() {
$total = $this->source
->count(TRUE);
// If the source is uncountable, we have no way of knowing if it's
// complete, so stipulate that it is.
if ($total < 0) {
return TRUE;
}
$processed = $this
->processedCount();
return $total <= $processed;
}
/**
* Override MigrationBase::beginProcess, to make sure the map/message tables
* are present.
*
* @param int $newStatus
* Migration::STATUS_IMPORTING or Migration::STATUS_ROLLING_BACK
*/
protected function beginProcess($newStatus) {
parent::beginProcess($newStatus);
// Do some standard setup
if (isset($this->options['feedback']) && isset($this->options['feedback']['value']) && isset($this->options['feedback']['unit'])) {
$this->feedback = $this->options['feedback']['value'];
$this->feedback_unit = $this->options['feedback']['unit'];
if ($this->feedback_unit == 'item') {
$this->feedback_unit = 'items';
}
else {
if ($this->feedback_unit == 'second') {
$this->feedback_unit = 'seconds';
}
}
}
$this->lastfeedback = $this->starttime;
$this->total_processed = $this->total_successes = $this->processed_since_feedback = $this->successes_since_feedback = 0;
// Call pre-process methods
if ($this->status == Migration::STATUS_IMPORTING) {
$this
->preImport();
}
elseif ($this->status == Migration::STATUS_ROLLING_BACK) {
$this
->preRollback();
}
}
/**
* Override MigrationBase::endProcess, to call post hooks. Note that it must
* be public to be callable as the shutdown function.
*/
public function endProcess() {
// Call post-process methods
if ($this->status == Migration::STATUS_IMPORTING) {
$this
->postImport();
}
elseif ($this->status == Migration::STATUS_ROLLING_BACK) {
$this
->postRollback();
}
parent::endProcess();
}
/**
* Default implementations of pre/post import/rollback methods. These call
* the destination methods (if they exist) - when overriding, always
* call parent::preImport() etc.
*/
protected function preImport() {
if (method_exists($this->destination, 'preImport')) {
$this->destination
->preImport();
}
}
protected function preRollback() {
if (method_exists($this->destination, 'preRollback')) {
$this->destination
->preRollback();
}
}
protected function postImport() {
if (method_exists($this->destination, 'postImport')) {
$this->destination
->postImport();
}
}
protected function postRollback() {
if (method_exists($this->destination, 'postRollback')) {
$this->destination
->postRollback();
}
}
/**
* Perform a rollback operation - remove migrated items from the destination.
*/
protected function rollback() {
$return = MigrationBase::RESULT_COMPLETED;
$itemlimit = $this
->getItemLimit();
$idlist = $this
->getOption('idlist');
if ($idlist) {
// Make the IDs keys, to more easily identify them
$idlist = array_flip(explode(',', $idlist));
}
if (method_exists($this->destination, 'bulkRollback')) {
// Too many at once can lead to memory issues, so batch 'em up
$destids = array();
$sourceids = array();
$batch_count = 0;
foreach ($this->map as $destination_key) {
if ($this
->timeOptionExceeded()) {
break;
}
if (($return = $this
->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
break;
}
if ($itemlimit && $this->total_processed + $batch_count >= $itemlimit) {
break;
}
$this->currentSourceKey = $this->map
->getCurrentKey();
// If there's an idlist, skip anything not in the list
if ($idlist && !isset($idlist[$this->currentSourceKey['sourceid1']])) {
continue;
}
// Note that bulk rollback is only supported for single-column keys
$sourceids[] = $this->currentSourceKey;
if (!empty($destination_key->destid1)) {
$map_row = $this->map
->getRowByDestination((array) $destination_key);
if ($map_row['rollback_action'] == MigrateMap::ROLLBACK_DELETE) {
$destids[] = $destination_key->destid1;
}
}
$batch_count++;
if ($batch_count >= $this->rollbackBatchSize) {
try {
if ($this->systemOfRecord == Migration::SOURCE) {
if (!empty($destids)) {
migrate_instrument_start('destination bulkRollback');
$this->destination
->bulkRollback($destids);
migrate_instrument_stop('destination bulkRollback');
}
}
// Keep track in case of interruption
migrate_instrument_start('rollback map/message update');
$this->map
->deleteBulk($sourceids);
migrate_instrument_stop('rollback map/message update');
$this->total_successes += $batch_count;
$this->successes_since_feedback += $batch_count;
} catch (Exception $e) {
$this
->handleException($e, FALSE);
migrate_instrument_stop('bulkRollback');
migrate_instrument_stop('rollback map/message update');
}
$destids = array();
$sourceids = array();
// Will increment even if there was an exception... But we don't
// really have a way to know how many really were successfully rolled back
$this->total_processed += $batch_count;
$this->processed_since_feedback += $batch_count;
$batch_count = 0;
}
}
if ($batch_count > 0) {
if ($this->systemOfRecord == Migration::SOURCE) {
if (!empty($destids)) {
migrate_instrument_start('destination bulkRollback');
$this->destination
->bulkRollback($destids);
migrate_instrument_stop('destination bulkRollback');
}
$this->total_processed += $batch_count;
$this->total_successes += $batch_count;
$this->processed_since_feedback += $batch_count;
$this->successes_since_feedback += $batch_count;
}
migrate_instrument_start('rollback map/message update');
$this->map
->deleteBulk($sourceids);
migrate_instrument_stop('rollback map/message update');
}
}
else {
foreach ($this->map as $destination_key) {
if ($this
->timeOptionExceeded()) {
break;
}
if (($return = $this
->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
break;
}
if ($this
->itemOptionExceeded()) {
break;
}
$this->currentSourceKey = $this->map
->getCurrentKey();
// If there's an idlist, skip anything not in the list
if ($idlist && !isset($idlist[$this->currentSourceKey['sourceid1']])) {
continue;
}
// Rollback one record
try {
if ($this->systemOfRecord == Migration::SOURCE) {
// Skip when the destination key is null
$skip = FALSE;
foreach ($destination_key as $key_value) {
if (is_null($key_value)) {
$skip = TRUE;
break;
}
}
if (!$skip) {
$map_row = $this->map
->getRowByDestination((array) $destination_key);
if ($map_row['rollback_action'] == MigrateMap::ROLLBACK_DELETE) {
migrate_instrument_start('destination rollback');
$this->destination
->rollback((array) $destination_key);
migrate_instrument_stop('destination rollback');
}
}
}
migrate_instrument_start('rollback map/message update');
$this->map
->delete($this->currentSourceKey);
migrate_instrument_stop('rollback map/message update');
$this->total_successes++;
$this->successes_since_feedback++;
} catch (Exception $e) {
// TODO: At least count failures
continue;
}
$this->total_processed++;
$this->processed_since_feedback++;
}
}
$this->map
->clearMessages();
$this
->progressMessage($return);
// If we're using highwater marks, reset at completion of a full rollback
// TODO: What about partial rollbacks? Probably little we can do to make
// that work cleanly...
if ($this->highwaterField) {
$this
->saveHighwater('', TRUE);
}
return $return;
}
/**
* Perform an import operation - migrate items from source to destination.
*/
protected function import() {
$return = MigrationBase::RESULT_COMPLETED;
try {
$this->source
->rewind();
} catch (Exception $e) {
self::displayMessage(t('Migration failed with source plugin exception: !e', array(
'!e' => $e
->getMessage(),
)));
return MigrationBase::RESULT_FAILED;
}
while ($this->source
->valid()) {
$data_row = $this->source
->current();
$this->currentSourceKey = $this->source
->getCurrentKey();
// Wipe old messages
$this->map
->delete($this->currentSourceKey, TRUE);
$this->sourceValues = $data_row;
$this
->applyMappings();
try {
migrate_instrument_start('destination import', TRUE);
$ids = $this->destination
->import($this->destinationValues, $this->sourceValues);
migrate_instrument_stop('destination import');
if ($ids) {
$this->map
->saveIDMapping($this->sourceValues, $ids, $this->needsUpdate, $this->rollbackAction);
$this->successes_since_feedback++;
$this->total_successes++;
}
else {
$this->map
->saveIDMapping($this->sourceValues, array(), MigrateMap::STATUS_FAILED, $this->rollbackAction);
$message = t('New object was not saved, no error provided');
$this
->saveMessage($message);
self::displayMessage($message);
}
} catch (MigrateException $e) {
$this->map
->saveIDMapping($this->sourceValues, array(), MigrateMap::STATUS_FAILED, $this->rollbackAction);
$this
->saveMessage($e
->getMessage(), $e
->getLevel());
self::displayMessage($e
->getMessage());
} catch (Exception $e) {
$this->map
->saveIDMapping($this->sourceValues, array(), MigrateMap::STATUS_FAILED, $this->rollbackAction);
$this
->handleException($e);
}
$this->total_processed++;
$this->processed_since_feedback++;
if ($this->highwaterField) {
$this
->saveHighwater($this->sourceValues->{$this->highwaterField['name']});
}
// Reset row properties.
unset($this->sourceValues, $this->destinationValues);
$this->needsUpdate = MigrateMap::STATUS_IMPORTED;
// TODO: Temporary. Remove when http://drupal.org/node/375494 is committed.
// TODO: Should be done in MigrateDestinationEntity
if (!empty($this->destination->entityType)) {
entity_get_controller($this->destination->entityType)
->resetCache();
}
if ($this
->timeOptionExceeded()) {
break;
}
if (($return = $this
->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
break;
}
if ($this
->itemOptionExceeded()) {
break;
}
try {
$this->source
->next();
} catch (Exception $e) {
self::displayMessage(t('Migration failed with source plugin exception: !e', array(
'!e' => $e
->getMessage(),
)));
return MigrationBase::RESULT_FAILED;
}
}
$this
->progressMessage($return);
return $return;
}
/**
* Perform an analysis operation - report on field values in the source.
*
* @return array
* Array of analysis details - each element is keyed by field name and
* contains an array describing the field values.
*/
public function analyze() {
// The source needs this to find the map table.
self::$currentMigration = $this;
try {
$this->source
->rewind();
} catch (Exception $e) {
self::displayMessage(t('Migration analysis failed with source plugin exception: !e', array(
'!e' => $e
->getMessage(),
)));
self::$currentMigration = NULL;
return array();
}
// Get the documented fields first
$source_fields = $this->source
->fields();
$analysis = array();
$field_init = array(
'is_numeric' => TRUE,
'min_numeric' => NULL,
'max_numeric' => NULL,
'min_strlen' => 0,
'max_strlen' => 0,
'distinct_values' => array(),
);
foreach ($source_fields as $field_name => $description) {
// Ignore fields from the map table
if (substr($field_name, 0, strlen('migrate_map_')) == 'migrate_map_') {
continue;
}
$analysis[$field_name] = $field_init + array(
'description' => $description,
);
}
// For each data row...
while ($this->source
->valid()) {
$row = $this->source
->current();
// Cheat for XML migrations, which don't pick up the source values
// until applyMappings() applies the xpath()
if (is_a($this, 'XMLMigration')) {
$this->currentSourceKey = $this->source
->getCurrentKey();
$this->sourceValues = $row;
$this
->applyMappings();
$row = $this->sourceValues;
unset($row->xml);
}
// For each field in this row...
foreach ($row as $field_name => $raw_value) {
// Ignore fields from the map table
if (substr($field_name, 0, strlen('migrate_map_')) == 'migrate_map_') {
continue;
}
// It might be an array of values, check each value
if (!is_array($raw_value)) {
$raw_value = array(
$raw_value,
);
}
foreach ($raw_value as $field_value) {
// If this is an undocumented field, initialize it
if (!isset($analysis[$field_name])) {
$analysis[$field_name] = $field_init + array(
'description' => '',
);
}
// Ignore leading/trailing spaces in determing numerics
$trimmed_value = trim($field_value);
if (is_numeric($trimmed_value)) {
$trimmed_value = floatval($trimmed_value);
// First numeric value, initialize the min/max
if (is_null($analysis[$field_name]['min_numeric'])) {
$analysis[$field_name]['min_numeric'] = $trimmed_value;
$analysis[$field_name]['max_numeric'] = $trimmed_value;
}
else {
$analysis[$field_name]['min_numeric'] = min($trimmed_value, $analysis[$field_name]['min_numeric']);
$analysis[$field_name]['max_numeric'] = max($trimmed_value, $analysis[$field_name]['max_numeric']);
}
}
elseif ($trimmed_value !== '') {
// Empty strings are !is_numeric(), but treat as empty rather than
// assuming we don't have a numeric field
$analysis[$field_name]['is_numeric'] = FALSE;
}
$strlen = strlen($field_value);
// First string value, initialize both min and max
if ($analysis[$field_name]['max_strlen'] == 0) {
$analysis[$field_name]['min_strlen'] = $strlen;
$analysis[$field_name]['max_strlen'] = $strlen;
}
else {
$analysis[$field_name]['min_strlen'] = min($strlen, $analysis[$field_name]['min_strlen']);
$analysis[$field_name]['max_strlen'] = max($strlen, $analysis[$field_name]['max_strlen']);
}
// Track up to 10 distinct values
if (count($analysis[$field_name]['distinct_values']) <= 10) {
$analysis[$field_name]['distinct_values'][$trimmed_value]++;
}
}
}
try {
$this->source
->next();
} catch (Exception $e) {
self::displayMessage(t('Migration analysis failed with source plugin exception: !e. Partial results follow:', array(
'!e' => $e
->getMessage(),
)));
self::$currentMigration = NULL;
return $analysis;
}
}
self::$currentMigration = NULL;
return $analysis;
}
/**
* Default implementation of prepareKey. This method is called from the source
* plugin immediately after retrieving the raw data from the source - by
* default, it simply assigns the key values based on the field names passed
* to MigrateSQLMap(). Override this if you need to generate your own key
* (e.g., the source doesn't have a natural unique key). Be sure to also
* set any values you generate in $row.
*
* @param array $source_key
* @param object $row
*
* @return array
*/
public function prepareKey($source_key, $row) {
$key = array();
foreach ($source_key as $field_name => $field_schema) {
$key[$field_name] = $row->{$field_name};
}
return $key;
}
/**
* Default implementation of prepareRow(). This method is called from the source
* plugin upon first pulling the raw data from the source.
*
* @param $row
* Object containing raw source data.
* @return bool
* TRUE to process this row, FALSE to have the source skip it.
*/
public function prepareRow($row) {
$this->rollbackAction = $this->defaultRollbackAction;
return TRUE;
}
////////////////////////////////////////////////////////////////////
// Utility methods
/**
* Convenience function to return count of total source records
*
* @param boolean $refresh
* Pass TRUE to refresh the cached count.
*/
public function sourceCount($refresh = FALSE) {
try {
$count = $this->source
->count($refresh);
} catch (Exception $e) {
$count = t('N/A');
self::displayMessage($e
->getMessage());
}
return $count;
}
/**
* Get the number of source records processed.
* @return int
* Number of processed records.
*/
public function processedCount() {
try {
$count = $this->map
->processedCount();
} catch (Exception $e) {
$count = t('N/A');
self::displayMessage($e
->getMessage());
}
return $count;
}
/**
* Get the number of records successfully imported.
* @return int
* Number of imported records.
*/
public function importedCount() {
try {
$count = $this->map
->importedCount();
} catch (Exception $e) {
$count = t('N/A');
self::displayMessage($e
->getMessage());
}
return $count;
}
/**
* Get the number of records marked as needing update.
* @return int
*/
public function updateCount() {
try {
$count = $this->map
->updateCount();
} catch (Exception $e) {
$count = t('N/A');
self::displayMessage($e
->getMessage());
}
return $count;
}
/**
* Test whether we've exceeded the designated item limit.
*
* @return boolean
* TRUE if the threshold is exceeded, FALSE if not.
*/
protected function itemOptionExceeded() {
$itemlimit = $this
->getItemLimit();
if ($itemlimit && $this->total_processed >= $itemlimit) {
return TRUE;
}
return FALSE;
}
/**
* Get the number of source records which failed to import.
* TODO: Doesn't yet account for informationals, or multiple errors for
* a source record.
*
* @return int
* Number of records errored out.
*/
public function errorCount() {
return $this->map
->errorCount();
}
/**
* Get the number of messages associated with this migration
*
* @return int
* Number of messages.
*/
public function messageCount() {
return $this->map
->messageCount();
}
/**
* Prepares this migration to run as an update - that is, in addition to
* unmigrated content (source records not in the map table) being imported,
* previously-migrated content will also be updated in place.
*/
public function prepareUpdate() {
$this->map
->prepareUpdate();
}
/**
* Outputs a progress message, reflecting the current status of a migration process.
*
* @param int $result
* Status of the process, represented by one of the Migration::RESULT_* constants.
*/
protected function progressMessage($result) {
$time = microtime(TRUE) - $this->lastfeedback;
if ($time > 0) {
$perminute = round(60 * $this->processed_since_feedback / $time);
$time = round($time, 1);
}
else {
$perminute = '?';
}
if ($this->status == Migration::STATUS_IMPORTING) {
switch ($result) {
case Migration::RESULT_COMPLETED:
$basetext = "Processed !numitems (!created created, !updated updated, !failed failed, !ignored ignored) in !time sec (!perminute/min) - done with '!name'";
$type = 'completed';
break;
case Migration::RESULT_FAILED:
$basetext = "Processed !numitems (!created created, !updated updated, !failed failed, !ignored ignored) in !time sec (!perminute/min) - failure with '!name'";
$type = 'failed';
break;
case Migration::RESULT_INCOMPLETE:
$basetext = "Processed !numitems (!created created, !updated updated, !failed failed, !ignored ignored) in !time sec (!perminute/min) - continuing with '!name'";
$type = 'ok';
break;
case Migration::RESULT_STOPPED:
$basetext = "Processed !numitems (!created created, !updated updated, !failed failed, !ignored ignored) in !time sec (!perminute/min) - stopped '!name'";
$type = 'warning';
break;
}
}
else {
switch ($result) {
case Migration::RESULT_COMPLETED:
$basetext = "Rolled back !numitems in !time sec (!perminute/min) - done with '!name'";
$type = 'completed';
break;
case Migration::RESULT_FAILED:
$basetext = "Rolled back !numitems in !time sec (!perminute/min) - failure with '!name'";
$type = 'failed';
break;
case Migration::RESULT_INCOMPLETE:
$basetext = "Rolled back !numitems in !time sec (!perminute/min) - continuing with '!name'";
$type = 'ok';
break;
case Migration::RESULT_STOPPED:
$basetext = "Rolled back !numitems in !time sec (!perminute/min) - stopped '!name'";
$type = 'warning';
break;
}
}
$numitems = $this->processed_since_feedback + $this->source
->getIgnored();
$message = t($basetext, array(
'!numitems' => $numitems,
'!successes' => $this->successes_since_feedback,
'!failed' => $this->processed_since_feedback - $this->successes_since_feedback,
'!created' => $this->destination
->getCreated(),
'!updated' => $this->destination
->getUpdated(),
'!ignored' => $this->source
->getIgnored(),
'!time' => $time,
'!perminute' => $perminute,
'!name' => $this->machineName,
));
self::displayMessage($message, $type);
// Report on lookup_cache hit rate. Only visible at 'debug' level.
if ($result != Migration::RESULT_INCOMPLETE && !empty($this->counts['lookup_cache'])) {
foreach ($this->counts['lookup_cache'] as $name => $tallies) {
$tallies += array(
'hit' => 0,
'miss_hit' => 0,
'miss_miss' => 0,
);
// Set defaults to avoid NOTICE.
$sum = $tallies['hit'] + $tallies['miss_hit'] + $tallies['miss_miss'];
self::displayMessage(t('Lookup cache: !mn SM=!name !hit hit, !miss_hit miss_hit, !miss_miss miss_miss (!total total).', array(
'!mn' => $this->machineName,
'!name' => $name,
'!hit' => round(100 * $tallies['hit'] / $sum) . '%',
'!miss_hit' => round(100 * $tallies['miss_hit'] / $sum) . '%',
'!miss_miss' => round(100 * $tallies['miss_miss'] / $sum) . '%',
'!total' => $sum,
)), 'debug');
}
$this->counts['lookup_cache'] = array();
}
if ($result == Migration::RESULT_INCOMPLETE) {
$this->lastfeedback = time();
$this->processed_since_feedback = $this->successes_since_feedback = 0;
$this->source
->resetStats();
$this->destination
->resetStats();
}
}
/**
* Standard top-of-loop stuff, common between rollback and import - check
* for exceptional conditions, and display feedback.
*/
protected function checkStatus() {
if ($this
->memoryExceeded()) {
return MigrationBase::RESULT_INCOMPLETE;
}
if ($this
->timeExceeded()) {
return MigrationBase::RESULT_INCOMPLETE;
}
if ($this
->getStatus() == Migration::STATUS_STOPPING) {
return MigrationBase::RESULT_STOPPED;
}
// If feedback is requested, produce a progress message at the proper time
if (isset($this->feedback)) {
if ($this->feedback_unit == 'seconds' && time() - $this->lastfeedback >= $this->feedback || $this->feedback_unit == 'items' && $this->processed_since_feedback >= $this->feedback) {
$this
->progressMessage(MigrationBase::RESULT_INCOMPLETE);
}
}
return MigrationBase::RESULT_COMPLETED;
}
/**
* Apply field mappings to a data row received from the source, returning
* a populated destination object.
*/
protected function applyMappings() {
$this->destinationValues = new stdClass();
foreach ($this->fieldMappings as $mapping) {
$destination = $mapping
->getDestinationField();
// Skip mappings with no destination (source fields marked DNM)
if ($destination) {
$source = $mapping
->getSourceField();
$default = $mapping
->getDefaultValue();
// When updating existing items, make sure we don't create a destination
// field that is not mapped to anything (a source field or a default value)
if (!$source && !isset($default)) {
continue;
}
$destination_values = NULL;
// If there's a source mapping, and a source value in the data row, copy
// to the destination
if ($source && property_exists($this->sourceValues, $source)) {
$destination_values = $this->sourceValues->{$source};
}
elseif (!is_null($default)) {
$destination_values = $default;
}
// If there's a separator specified for this destination, then it
// will be populated as an array exploded from the source value
$separator = $mapping
->getSeparator();
if ($separator && isset($destination_values)) {
$destination_values = explode($separator, $destination_values);
}
// If a source migration is supplied, use the current value for this field
// to look up a destination ID from the provided migration
$source_migration = $mapping
->getSourceMigration();
if ($source_migration && isset($destination_values)) {
$destination_values = $this
->handleSourceMigration($source_migration, $destination_values, $default, $this);
}
// Call any designated callbacks
$callbacks = $mapping
->getCallbacks();
foreach ($callbacks as $callback) {
if (isset($destination_values)) {
$destination_values = call_user_func($callback, $destination_values);
}
}
// If specified, assure a unique value for this property.
$dedupe = $mapping
->getDedupe();
if ($dedupe && isset($destination_values)) {
$destination_values = $this
->handleDedupe($dedupe, $destination_values);
}
// Assign any arguments
if (isset($destination_values)) {
$arguments = $mapping
->getArguments();
if ($arguments) {
if (!is_array($destination_values)) {
$destination_values = array(
$destination_values,
);
}
// TODO: Stuffing arguments into the destination field is gross - can
// we come up with a better way to communicate them to the field
// handlers?
$destination_values['arguments'] = array();
foreach ($arguments as $argname => $destarg) {
if (is_array($destarg) && isset($destarg['source_field']) && property_exists($this->sourceValues, $destarg['source_field'])) {
$destination_values['arguments'][$argname] = $this->sourceValues->{$destarg}['source_field'];
}
elseif (is_array($destarg) && isset($destarg['default_value'])) {
$destination_values['arguments'][$argname] = $destarg['default_value'];
}
else {
$destination_values['arguments'][$argname] = $destarg;
}
}
}
}
// Are we dealing with the primary value of the destination field, or a
// subfield?
$destination = explode(':', $destination);
$destination_field = $destination[0];
if (isset($destination[1])) {
$subfield = $destination[1];
if (!is_array($this->destinationValues->{$destination_field})) {
$this->destinationValues->{$destination_field} = array(
$this->destinationValues->{$destination_field},
);
}
$this->destinationValues->{$destination_field}['arguments'][$subfield] = $destination_values;
}
else {
$this->destinationValues->{$destination_field} = $destination_values;
}
}
}
}
/**
* Look up a value migrated in another migration.
*
* @param mixed $source_migrations
* An array of source migrations, or string for a single migration.
* @param mixed $source_keys
* Key(s) to be looked up against the source migration(s). This may be a simple
* value (one single-field key), an array of values (multiple single-field keys
* to each be looked up), or an array of arrays (multiple multi-field keys to
* each be looked up).
* @param mixed $default
* The default value, if no ID was found.
* @param $migration
* The implementing migration.
* @return
* Destination value(s) from the source migration(s), as a single value if
* a single key was passed in, or an array of values if there were multiple
* keys to look up.
*/
protected function handleSourceMigration($source_migrations, $source_keys, $default = NULL, $migration = NULL) {
// Handle the source migration(s) as an array.
$source_migrations = (array) $source_migrations;
// We want to treat source keys consistently as an array of arrays (each
// representing one key).
if (is_array($source_keys)) {
if (is_array(reset($source_keys))) {
// Already an array of key arrays, fall through
}
else {
// An array of single-key values - make each one an array
$new_source_keys = array();
foreach ($source_keys as $source_key) {
$new_source_keys[] = array(
$source_key,
);
}
$source_keys = $new_source_keys;
}
}
else {
// A simple value - make it an array within an array
$source_keys = array(
array(
$source_keys,
),
);
}
// Instantiate each migration, and store back in the array.
foreach ($source_migrations as $key => $source_migration) {
$source_migrations[$key] = Migration::getInstance($source_migration);
}
$results = array();
// Each $source_key will be an array of key values
foreach ($source_keys as $source_key) {
// If any source keys are empty, skip this set
$continue = FALSE;
foreach ($source_key as $value) {
if (empty($value) && $value !== 0 && $value !== '0') {
$continue = TRUE;
break;
}
}
if ($continue || empty($source_key)) {
continue;
}
// Loop through each source migration, checking for an existing dest ID.
foreach ($source_migrations as $source_migration) {
// Break out of the loop as soon as a destination ID is found.
if ($destids = $source_migration
->getMap()
->lookupDestinationID($source_key)) {
break;
}
}
// If no destination ID was found, give each source migration a chance to
// create a stub.
if (!$destids) {
foreach ($source_migrations as $source_migration) {
// Break out of the loop if a stub was successfully created.
if ($destids = $source_migration
->createStubWrapper($source_key, $migration)) {
break;
}
}
}
if ($destids) {
// Assume that if the destination key is a single value, it
// should be passed as such
if (count($destids) == 1) {
$results[] = reset($destids);
}
else {
$results[] = $destids;
}
}
elseif (!is_null($default)) {
$results[] = $default;
}
}
// Return a single result if we had a single key
if (count($source_keys) > 1) {
return $results;
}
else {
return reset($results);
}
}
/**
* For fields which require uniqueness, assign a new unique value if necessary.
*
* @param array $dedupe
* An array with two keys, 'table' the name of the Drupal table and 'column'
* the column within that table where uniqueness must be maintained.
* @param $original
* The value coming in, which must be checked for uniqueness.
* @return string
* The value to use - either the original, or a variation created by appending
* a sequence number.
*/
protected function handleDedupe($dedupe, $original) {
// If we're remigrating a previously-existing value, simply running through
// our normal process will re-dedupe it - we must be sure to preserve the
// previously-written value. Note that this means that you cannot migrate
// changes to this field - the originally-migrated value will always
// remain, because we can't tell what the original was.
if (isset($this->sourceValues->migrate_map_destid1)) {
$key_field = key($this->destination
->getKeySchema());
$existing_value = db_select($dedupe['table'], 't')
->fields('t', array(
$dedupe['column'],
))
->range(0, 1)
->condition($key_field, $this->sourceValues->migrate_map_destid1)
->execute()
->fetchField();
// Note that if, for some reason, we don't find a value, fall through
// to the normal deduping process
if ($existing_value) {
return $existing_value;
}
}
$i = 1;
$candidate = $original;
while ($candidate_found = db_select($dedupe['table'], 't')
->fields('t', array(
$dedupe['column'],
))
->range(0, 1)
->condition($dedupe['column'], $candidate)
->execute()
->fetchField()) {
// We already have the candidate value. Find a non-existing value.
$i++;
// @TODO: support custom replacement pattern instead of just append.
$candidate = $original . '_' . $i;
}
if ($i > 1) {
$message = t('Replacing !column !original with !candidate', array(
'!column' => $dedupe['column'],
'!original' => $original,
'!candidate' => $candidate,
));
$migration = Migration::currentMigration();
$migration
->saveMessage($message, Migration::MESSAGE_INFORMATIONAL);
}
return $candidate;
}
/**
* If stub creation is enabled, try to create a stub and save the mapping.
*/
protected function createStubWrapper(array $source_key, $migration = NULL) {
if (method_exists($this, 'createStub')) {
$destids = $this
->createStub($migration, $source_key);
if ($destids) {
// Fake a data row with the source key in it
$map_source_key = $this->map
->getSourceKey();
$data_row = new stdClass();
$i = 0;
foreach ($map_source_key as $key => $definition) {
$data_row->{$key} = $source_key[$i++];
}
$this->map
->saveIDMapping($data_row, $destids, MigrateMap::STATUS_NEEDS_UPDATE, $this->defaultRollbackAction);
}
}
else {
$destids = NULL;
}
return $destids;
}
/**
* Pass messages through to the map class
*
* @param string $message
* The message to record.
* @param int $level
* Optional message severity (defaults to MESSAGE_ERROR).
*/
public function saveMessage($message, $level = MigrationBase::MESSAGE_ERROR) {
$this->map
->saveMessage($this->currentSourceKey, $message, $level);
}
/**
* Set the specified row to be updated, if it exists.
*/
public function setUpdate(array $source_key = NULL) {
if (!$source_key) {
$source_key = $this->currentSourceKey;
}
$this->map
->setUpdate($source_key);
}
}
/**
* Convenience class - deriving from this rather than directory from Migration
* ensures that a class will not be registered as a migration itself - it is
* the implementor's responsibility to register each instance of a dynamic
* migration class.
*/
abstract class DynamicMigration extends Migration {
/**
* Overrides default of FALSE
*/
public static function isDynamic() {
return TRUE;
}
}
Classes
Name | Description |
---|---|
DynamicMigration | Convenience class - deriving from this rather than directory from Migration ensures that a class will not be registered as a migration itself - it is the implementor's responsibility to register each instance of a dynamic migration class. |
Migration | The base class for all import objects. This is where most of the smarts of the migrate module resides. Migrations are created by deriving from this class, and in the constructor (after calling parent::__construct()) initializing at a minimum the name,… |