You are here

public function MigrateSourceSQL::performRewind in Migrate 7.2

Same name and namespace in other branches
  1. 6.2 plugins/sources/sql.inc \MigrateSourceSQL::performRewind()

Implementation of MigrateSource::performRewind().

We could simply execute the query and be functionally correct, but we will take advantage of the PDO-based API to optimize the query up-front.

File

plugins/sources/sql.inc, line 275
Define a MigrateSource for importing from Drupal connections.

Class

MigrateSourceSQL
Implementation of MigrateSource, to handle imports from Drupal connections.

Code

public function performRewind() {
  $this->result = NULL;
  $this->query = clone $this->originalQuery;
  $this->batch = 0;

  // Get the key values, for potential use in joining to the map table, or
  // enforcing idlist.
  $keys = array();
  foreach ($this->activeMap
    ->getSourceKey() as $field_name => $field_schema) {
    if (isset($field_schema['alias'])) {
      $field_name = $field_schema['alias'] . '.' . $field_name;
    }
    $keys[] = $field_name;
  }

  // The rules for determining what conditions to add to the query are as
  // follows (applying first applicable rule).
  // 1. If idlist is provided, then only process items in that list (AND key
  // IN (idlist)). Only applicable with single-value keys.
  if ($this->idList) {
    $simple_ids = array();
    $compound_ids = array();
    $key_count = count($keys);
    foreach ($this->idList as $id) {

      // Look for multi-key separator. If there is only 1 key, ignore.
      if (strpos($id, $this->multikeySeparator) === FALSE || $key_count == 1) {
        $simple_ids[] = $id;
        continue;
      }
      $compound_ids[] = explode($this->multikeySeparator, $id);
    }

    // Check for compunded ids. If present add them with subsequent
    // OR statements.
    if (!empty($compound_ids)) {
      $condition = db_or();
      if (!empty($simple_ids)) {
        $condition
          ->condition($keys[0], $simple_ids, 'IN');
      }
      foreach ($compound_ids as $values) {
        $temp_and = db_and();
        foreach ($values as $pos => $value) {
          $temp_and
            ->condition($keys[$pos], $value);
        }
        $condition
          ->condition($temp_and);
      }
      $this->query
        ->condition($condition);
    }
    else {
      $this->query
        ->condition($keys[0], $simple_ids, 'IN');
    }
  }
  else {

    // 2. If the map is joinable, join it. We will want to accept all rows
    // which are either not in the map, or marked in the map as NEEDS_UPDATE.
    // Note that if highwater fields are in play, we want to accept all rows
    // above the highwater mark in addition to those selected by the map
    // conditions, so we need to OR them together (but AND with any existing
    // conditions in the query). So, ultimately the SQL condition will look
    // like (original conditions) AND (map IS NULL OR map needs update
    // OR above highwater).
    $conditions = db_or();
    $condition_added = FALSE;
    if ($this->mapJoinable) {

      // Build the join to the map table. Because the source key could have
      // multiple fields, we need to build things up.
      $count = 1;
      foreach ($this->activeMap
        ->getSourceKey() as $field_name => $field_schema) {
        if (isset($field_schema['alias'])) {
          $field_name = $field_schema['alias'] . '.' . $field_name;
        }
        $map_key = 'sourceid' . $count++;
        if (!isset($map_join)) {
          $map_join = '';
        }
        else {
          $map_join .= ' AND ';
        }
        $map_join .= "{$field_name} = map.{$map_key}";
      }
      $alias = $this->query
        ->leftJoin($this->activeMap
        ->getQualifiedMapTable(), 'map', $map_join);
      $conditions
        ->isNull($alias . '.sourceid1');
      $conditions
        ->condition($alias . '.needs_update', MigrateMap::STATUS_NEEDS_UPDATE);
      $condition_added = TRUE;

      // And as long as we have the map table, add its data to the row.
      $count = 1;
      foreach ($this->activeMap
        ->getSourceKey() as $field_name => $field_schema) {
        $map_key = 'sourceid' . $count++;
        $this->query
          ->addField($alias, $map_key, "migrate_map_{$map_key}");
      }
      $count = 1;
      foreach ($this->activeMap
        ->getDestinationKey() as $field_name => $field_schema) {
        $map_key = 'destid' . $count++;
        $this->query
          ->addField($alias, $map_key, "migrate_map_{$map_key}");
      }
      $this->query
        ->addField($alias, 'needs_update', 'migrate_map_needs_update');
    }

    // 3. If we are using highwater marks, also include rows above the mark.
    // But, include all rows if the highwater mark is not set.
    if (isset($this->highwaterField['name']) && $this->activeMigration
      ->getHighwater() !== '') {

      // But, if there are any existing items marked as needing update which
      // fall below the highwater mark, and map_joinable is FALSE, those
      // items will be skipped. Thus, in that case do not add the highwater
      // optimization to the query.
      $add_highwater_condition = TRUE;
      if (!$this->mapJoinable) {
        $count_needs_update = db_query('SELECT COUNT(*) FROM {' . $this->activeMap
          ->getQualifiedMapTable() . '} WHERE needs_update = 1')
          ->fetchField();
        if ($count_needs_update > 0) {
          $add_highwater_condition = FALSE;
        }
      }
      if ($add_highwater_condition) {
        if (isset($this->highwaterField['alias'])) {
          $highwater = $this->highwaterField['alias'] . '.' . $this->highwaterField['name'];
        }
        else {
          $highwater = $this->highwaterField['name'];
        }

        // If highwaterField is an aggregate function add
        // as a having condition.
        if (isset($this->highwaterField['aggregate'])) {
          $this->query
            ->havingCondition($highwater, $this->activeMigration
            ->getHighwater(), '>');
        }
        else {
          $conditions
            ->condition($highwater, $this->activeMigration
            ->getHighwater(), '>');
          $condition_added = TRUE;
        }
      }
    }
    if ($condition_added) {
      $this->query
        ->condition($conditions);
    }

    // 4. Download data in batches for performance.
    if ($this->batchSize > 0) {
      $this->query
        ->range($this->batch * $this->batchSize, $this->batchSize);
    }
  }

  // Save our fixed-up query so getNextBatch() matches it.
  $this->alteredQuery = clone $this->query;
  migrate_instrument_start('MigrateSourceSQL execute');
  $this->result = $this->query
    ->execute();
  migrate_instrument_stop('MigrateSourceSQL execute');
}