public function MigrateSourceSQL::performRewind in Migrate 7.2
Same name and namespace in other branches
- 6.2 plugins/sources/sql.inc \MigrateSourceSQL::performRewind()
Implementation of MigrateSource::performRewind().
We could simply execute the query and be functionally correct, but we will take advantage of the PDO-based API to optimize the query up-front.
File
- plugins/
sources/ sql.inc, line 275 - Define a MigrateSource for importing from Drupal connections.
Class
- MigrateSourceSQL
- Implementation of MigrateSource, to handle imports from Drupal connections.
Code
public function performRewind() {
$this->result = NULL;
$this->query = clone $this->originalQuery;
$this->batch = 0;
// Get the key values, for potential use in joining to the map table, or
// enforcing idlist.
$keys = array();
foreach ($this->activeMap
->getSourceKey() as $field_name => $field_schema) {
if (isset($field_schema['alias'])) {
$field_name = $field_schema['alias'] . '.' . $field_name;
}
$keys[] = $field_name;
}
// The rules for determining what conditions to add to the query are as
// follows (applying first applicable rule).
// 1. If idlist is provided, then only process items in that list (AND key
// IN (idlist)). Only applicable with single-value keys.
if ($this->idList) {
$simple_ids = array();
$compound_ids = array();
$key_count = count($keys);
foreach ($this->idList as $id) {
// Look for multi-key separator. If there is only 1 key, ignore.
if (strpos($id, $this->multikeySeparator) === FALSE || $key_count == 1) {
$simple_ids[] = $id;
continue;
}
$compound_ids[] = explode($this->multikeySeparator, $id);
}
// Check for compunded ids. If present add them with subsequent
// OR statements.
if (!empty($compound_ids)) {
$condition = db_or();
if (!empty($simple_ids)) {
$condition
->condition($keys[0], $simple_ids, 'IN');
}
foreach ($compound_ids as $values) {
$temp_and = db_and();
foreach ($values as $pos => $value) {
$temp_and
->condition($keys[$pos], $value);
}
$condition
->condition($temp_and);
}
$this->query
->condition($condition);
}
else {
$this->query
->condition($keys[0], $simple_ids, 'IN');
}
}
else {
// 2. If the map is joinable, join it. We will want to accept all rows
// which are either not in the map, or marked in the map as NEEDS_UPDATE.
// Note that if highwater fields are in play, we want to accept all rows
// above the highwater mark in addition to those selected by the map
// conditions, so we need to OR them together (but AND with any existing
// conditions in the query). So, ultimately the SQL condition will look
// like (original conditions) AND (map IS NULL OR map needs update
// OR above highwater).
$conditions = db_or();
$condition_added = FALSE;
if ($this->mapJoinable) {
// Build the join to the map table. Because the source key could have
// multiple fields, we need to build things up.
$count = 1;
foreach ($this->activeMap
->getSourceKey() as $field_name => $field_schema) {
if (isset($field_schema['alias'])) {
$field_name = $field_schema['alias'] . '.' . $field_name;
}
$map_key = 'sourceid' . $count++;
if (!isset($map_join)) {
$map_join = '';
}
else {
$map_join .= ' AND ';
}
$map_join .= "{$field_name} = map.{$map_key}";
}
$alias = $this->query
->leftJoin($this->activeMap
->getQualifiedMapTable(), 'map', $map_join);
$conditions
->isNull($alias . '.sourceid1');
$conditions
->condition($alias . '.needs_update', MigrateMap::STATUS_NEEDS_UPDATE);
$condition_added = TRUE;
// And as long as we have the map table, add its data to the row.
$count = 1;
foreach ($this->activeMap
->getSourceKey() as $field_name => $field_schema) {
$map_key = 'sourceid' . $count++;
$this->query
->addField($alias, $map_key, "migrate_map_{$map_key}");
}
$count = 1;
foreach ($this->activeMap
->getDestinationKey() as $field_name => $field_schema) {
$map_key = 'destid' . $count++;
$this->query
->addField($alias, $map_key, "migrate_map_{$map_key}");
}
$this->query
->addField($alias, 'needs_update', 'migrate_map_needs_update');
}
// 3. If we are using highwater marks, also include rows above the mark.
// But, include all rows if the highwater mark is not set.
if (isset($this->highwaterField['name']) && $this->activeMigration
->getHighwater() !== '') {
// But, if there are any existing items marked as needing update which
// fall below the highwater mark, and map_joinable is FALSE, those
// items will be skipped. Thus, in that case do not add the highwater
// optimization to the query.
$add_highwater_condition = TRUE;
if (!$this->mapJoinable) {
$count_needs_update = db_query('SELECT COUNT(*) FROM {' . $this->activeMap
->getQualifiedMapTable() . '} WHERE needs_update = 1')
->fetchField();
if ($count_needs_update > 0) {
$add_highwater_condition = FALSE;
}
}
if ($add_highwater_condition) {
if (isset($this->highwaterField['alias'])) {
$highwater = $this->highwaterField['alias'] . '.' . $this->highwaterField['name'];
}
else {
$highwater = $this->highwaterField['name'];
}
// If highwaterField is an aggregate function add
// as a having condition.
if (isset($this->highwaterField['aggregate'])) {
$this->query
->havingCondition($highwater, $this->activeMigration
->getHighwater(), '>');
}
else {
$conditions
->condition($highwater, $this->activeMigration
->getHighwater(), '>');
$condition_added = TRUE;
}
}
}
if ($condition_added) {
$this->query
->condition($conditions);
}
// 4. Download data in batches for performance.
if ($this->batchSize > 0) {
$this->query
->range($this->batch * $this->batchSize, $this->batchSize);
}
}
// Save our fixed-up query so getNextBatch() matches it.
$this->alteredQuery = clone $this->query;
migrate_instrument_start('MigrateSourceSQL execute');
$this->result = $this->query
->execute();
migrate_instrument_stop('MigrateSourceSQL execute');
}