You are here

abstract class SqlBase in Drupal 10

Same name in this branch
  1. 10 core/modules/views/src/Plugin/views/pager/SqlBase.php \Drupal\views\Plugin\views\pager\SqlBase
  2. 10 core/modules/migrate/src/Plugin/migrate/source/SqlBase.php \Drupal\migrate\Plugin\migrate\source\SqlBase
Same name and namespace in other branches
  1. 8 core/modules/migrate/src/Plugin/migrate/source/SqlBase.php \Drupal\migrate\Plugin\migrate\source\SqlBase
  2. 9 core/modules/migrate/src/Plugin/migrate/source/SqlBase.php \Drupal\migrate\Plugin\migrate\source\SqlBase

Sources whose data may be fetched via a database connection.

Available configuration keys:

  • database_state_key: (optional) Name of the state key which contains an array with database connection information.
  • key: (optional) The database key name. Defaults to 'migrate'.
  • target: (optional) The database target name. Defaults to 'default'.
  • batch_size: (optional) Number of records to fetch from the database during each batch. If omitted, all records are fetched in a single query.
  • ignore_map: (optional) Source data is joined to the map table by default to improve migration performance. If set to TRUE, the map table will not be joined. Using expressions in the query may result in column aliases in the JOIN clause which would be invalid SQL. If you run into this, set ignore_map to TRUE.

For other optional configuration keys inherited from the parent class, refer to \Drupal\migrate\Plugin\migrate\source\SourcePluginBase.

About the source database determination:

  • If the source plugin configuration contains 'database_state_key', its value is taken as the name of a state key which contains an array with the database configuration.
  • Otherwise, if the source plugin configuration contains 'key', the database configuration with that name is used.
  • If both 'database_state_key' and 'key' are omitted in the source plugin configuration, the database connection named 'migrate' is used by default.
  • If all of the above steps fail, RequirementsException is thrown.

Drupal Database API supports multiple database connections. The connection parameters are defined in $databases array in settings.php or settings.local.php. It is also possible to modify the $databases array in runtime. For example, Migrate Drupal, which provides the migrations from Drupal 6 / 7, asks for the source database connection parameters in the UI and then adds the $databases['migrate'] connection in runtime before the migrations are executed.

As described above, the default source database is $databases['migrate']. If the source plugin needs another source connection, the database connection parameters should be added to the $databases array as, for instance, $databases['foo']. The source plugin can then use this connection by setting 'key' to 'foo' in its configuration.

For a complete example on migrating data from an SQL source, refer to https://www.drupal.org/docs/8/api/migrate-api/migrating-data-from-sql-so...

Hierarchy

Expanded class hierarchy of SqlBase

See also

https://www.drupal.org/docs/8/api/database-api

\Drupal\migrate_drupal\Plugin\migrate\source\DrupalSqlBase

8 files declare their use of SqlBase
DrupalSqlBase.php in core/modules/migrate_drupal/src/Plugin/migrate/source/DrupalSqlBase.php
HighWaterTest.php in core/modules/migrate/tests/modules/migrate_high_water_test/src/Plugin/migrate/source/HighWaterTest.php
MigrationPluginListTest.php in core/modules/migrate/tests/src/Kernel/Plugin/MigrationPluginListTest.php
QueryBatchTest.php in core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php
SqlBaseTest.php in core/modules/migrate/tests/src/Unit/SqlBaseTest.php
Contains \Drupal\Tests\migrate\Unit\SqlBaseTest.

... See full list

File

core/modules/migrate/src/Plugin/migrate/source/SqlBase.php, line 68

Namespace

Drupal\migrate\Plugin\migrate\source
View source
abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {

  /**
   * The query string.
   *
   * @var \Drupal\Core\Database\Query\SelectInterface
   */
  protected $query;

  /**
   * The database object.
   *
   * @var \Drupal\Core\Database\Connection
   */
  protected $database;

  /**
   * State service for retrieving database info.
   *
   * @var \Drupal\Core\State\StateInterface
   */
  protected $state;

  /**
   * The count of the number of batches run.
   *
   * @var int
   */
  protected $batch = 0;

  /**
   * Number of records to fetch from the database during each batch.
   *
   * A value of zero indicates no batching is to be done.
   *
   * @var int
   */
  protected $batchSize = 0;

  /**
   * {@inheritdoc}
   */
  public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
    parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
    $this->state = $state;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
    return new static($configuration, $plugin_id, $plugin_definition, $migration, $container
      ->get('state'));
  }

  /**
   * Prints the query string when the object is used as a string.
   *
   * @return string
   *   The query string.
   */
  public function __toString() {
    return (string) $this
      ->query();
  }

  /**
   * Gets the database connection object.
   *
   * @return \Drupal\Core\Database\Connection
   *   The database connection.
   */
  public function getDatabase() {
    if (!isset($this->database)) {

      // Look first for an explicit state key containing the configuration.
      if (isset($this->configuration['database_state_key'])) {
        $this->database = $this
          ->setUpDatabase($this->state
          ->get($this->configuration['database_state_key']));
      }
      elseif (isset($this->configuration['key'])) {
        $this->database = $this
          ->setUpDatabase($this->configuration);
      }
      elseif ($fallback_state_key = $this->state
        ->get('migrate.fallback_state_key')) {
        $this->database = $this
          ->setUpDatabase($this->state
          ->get($fallback_state_key));
      }
      else {
        $this->database = $this
          ->setUpDatabase([]);
      }
    }
    return $this->database;
  }

  /**
   * Gets a connection to the referenced database.
   *
   * This method will add the database connection if necessary.
   *
   * @param array $database_info
   *   Configuration for the source database connection. The keys are:
   *    'key' - The database connection key.
   *    'target' - The database connection target.
   *    'database' - Database configuration array as accepted by
   *      Database::addConnectionInfo.
   *
   * @return \Drupal\Core\Database\Connection
   *   The connection to use for this plugin's queries.
   *
   * @throws \Drupal\migrate\Exception\RequirementsException
   *   Thrown if no source database connection is configured.
   */
  protected function setUpDatabase(array $database_info) {
    if (isset($database_info['key'])) {
      $key = $database_info['key'];
    }
    else {

      // If there is no explicit database configuration at all, fall back to a
      // connection named 'migrate'.
      $key = 'migrate';
    }
    if (isset($database_info['target'])) {
      $target = $database_info['target'];
    }
    else {
      $target = 'default';
    }
    if (isset($database_info['database'])) {
      Database::addConnectionInfo($key, $target, $database_info['database']);
    }
    try {
      $connection = Database::getConnection($target, $key);
    } catch (ConnectionNotDefinedException $e) {

      // If we fell back to the magic 'migrate' connection and it doesn't exist,
      // treat the lack of the connection as a RequirementsException.
      if ($key == 'migrate') {
        throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
      }
      else {
        throw $e;
      }
    }
    return $connection;
  }

  /**
   * {@inheritdoc}
   */
  public function checkRequirements() {
    if ($this->pluginDefinition['requirements_met'] === TRUE) {
      $this
        ->getDatabase();
    }
  }

  /**
   * Wrapper for database select.
   */
  protected function select($table, $alias = NULL, array $options = []) {
    $options['fetch'] = \PDO::FETCH_ASSOC;
    return $this
      ->getDatabase()
      ->select($table, $alias, $options);
  }

  /**
   * Adds tags and metadata to the query.
   *
   * @return \Drupal\Core\Database\Query\SelectInterface
   *   The query with additional tags and metadata.
   */
  protected function prepareQuery() {
    $this->query = clone $this
      ->query();
    $this->query
      ->addTag('migrate');
    $this->query
      ->addTag('migrate_' . $this->migration
      ->id());
    $this->query
      ->addMetaData('migration', $this->migration);
    return $this->query;
  }

  /**
   * {@inheritdoc}
   */
  protected function initializeIterator() {

    // Initialize the batch size.
    if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {

      // Valid batch sizes are integers >= 0.
      if (is_int($this->configuration['batch_size']) && $this->configuration['batch_size'] >= 0) {
        $this->batchSize = $this->configuration['batch_size'];
      }
      else {
        throw new MigrateException("batch_size must be greater than or equal to zero");
      }
    }

    // If a batch has run the query is already setup.
    if ($this->batch == 0) {
      $this
        ->prepareQuery();

      // The rules for determining what conditions to add to the query are as
      // follows (applying first applicable rule):
      // 1. If the map is joinable, join it. We will want to accept all rows
      //    which are either not in the map, or marked in the map as NEEDS_UPDATE.
      //    Note that if high water fields are in play, we want to accept all rows
      //    above the high water mark in addition to those selected by the map
      //    conditions, so we need to OR them together (but AND with any existing
      //    conditions in the query). So, ultimately the SQL condition will look
      //    like (original conditions) AND (map IS NULL OR map needs update
      //    OR above high water).
      $conditions = $this->query
        ->orConditionGroup();
      $condition_added = FALSE;
      $added_fields = [];
      if ($this
        ->mapJoinable()) {

        // Build the join to the map table. Because the source key could have
        // multiple fields, we need to build things up.
        $count = 1;
        $map_join = '';
        $delimiter = '';
        foreach ($this
          ->getIds() as $field_name => $field_schema) {
          if (isset($field_schema['alias'])) {
            $field_name = $field_schema['alias'] . '.' . $this->query
              ->escapeField($field_name);
          }
          $map_join .= "{$delimiter}{$field_name} = map.sourceid" . $count++;
          $delimiter = ' AND ';
        }
        $alias = $this->query
          ->leftJoin($this->migration
          ->getIdMap()
          ->getQualifiedMapTableName(), 'map', $map_join);
        $conditions
          ->isNull($alias . '.sourceid1');
        $conditions
          ->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
        $condition_added = TRUE;

        // And as long as we have the map table, add its data to the row.
        $n = count($this
          ->getIds());
        for ($count = 1; $count <= $n; $count++) {
          $map_key = 'sourceid' . $count;
          $this->query
            ->addField($alias, $map_key, "migrate_map_{$map_key}");
          $added_fields[] = "{$alias}.{$map_key}";
        }
        if ($n = count($this->migration
          ->getDestinationIds())) {
          for ($count = 1; $count <= $n; $count++) {
            $map_key = 'destid' . $count++;
            $this->query
              ->addField($alias, $map_key, "migrate_map_{$map_key}");
            $added_fields[] = "{$alias}.{$map_key}";
          }
        }
        $this->query
          ->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
        $added_fields[] = "{$alias}.source_row_status";
      }

      // 2. If we are using high water marks, also include rows above the mark.
      //    But, include all rows if the high water mark is not set.
      if ($this
        ->getHighWaterProperty()) {
        $high_water_field = $this
          ->getHighWaterField();
        $high_water = $this
          ->getHighWater();

        // We check against NULL because 0 is an acceptable value for the high
        // water mark.
        if ($high_water !== NULL) {
          $conditions
            ->condition($high_water_field, $high_water, '>');
          $condition_added = TRUE;
        }

        // Always sort by the high water field, to ensure that the first run
        // (before we have a high water value) also has the results in a
        // consistent order.
        $this->query
          ->orderBy($high_water_field);
      }
      if ($condition_added) {
        $this->query
          ->condition($conditions);
      }

      // If the query has a group by, our added fields need it too, to keep the
      // query valid.
      // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
      $group_by = $this->query
        ->getGroupBy();
      if ($group_by && $added_fields) {
        foreach ($added_fields as $added_field) {
          $this->query
            ->groupBy($added_field);
        }
      }
    }

    // Download data in batches for performance.
    if ($this->batchSize > 0) {
      $this->query
        ->range($this->batch * $this->batchSize, $this->batchSize);
    }
    $statement = $this->query
      ->execute();
    $statement
      ->setFetchMode(\PDO::FETCH_ASSOC);
    return new \IteratorIterator($statement);
  }

  /**
   * Position the iterator to the following row.
   */
  protected function fetchNextRow() {
    $this
      ->getIterator()
      ->next();

    // We might be out of data entirely, or just out of data in the current
    // batch. Attempt to fetch the next batch and see.
    if ($this->batchSize > 0 && !$this
      ->getIterator()
      ->valid()) {
      $this
        ->fetchNextBatch();
    }
  }

  /**
   * Prepares query for the next set of data from the source database.
   */
  protected function fetchNextBatch() {
    $this->batch++;
    unset($this->iterator);
    $this
      ->getIterator()
      ->rewind();
  }

  /**
   * @return \Drupal\Core\Database\Query\SelectInterface
   */
  public abstract function query();

  /**
   * Gets the source count using countQuery().
   */
  protected function doCount() {
    return (int) $this
      ->query()
      ->countQuery()
      ->execute()
      ->fetchField();
  }

  /**
   * Checks if we can join against the map table.
   *
   * This function specifically catches issues when we're migrating with
   * unique sets of credentials for the source and destination database.
   *
   * @return bool
   *   TRUE if we can join against the map table otherwise FALSE.
   */
  protected function mapJoinable() {

    // Do not join map if explicitly configured not to.
    if (isset($this->configuration['ignore_map']) && $this->configuration['ignore_map']) {
      return FALSE;
    }

    // If we are using high water, but haven't yet set a high water mark, do not
    // join the map table, as we want to get all available records.
    if ($this
      ->getHighWaterProperty() && $this
      ->getHighWater() === NULL) {
      return FALSE;
    }

    // If we are tracking changes, we also need to retrieve all rows to compare
    // hashes
    if ($this->trackChanges) {
      return FALSE;
    }
    if (!$this
      ->getIds()) {
      return FALSE;
    }

    // With batching, we want a later batch to return the same rows that would
    // have been returned at the same point within a monolithic query. If we
    // join to the map table, the first batch is writing to the map table and
    // thus affecting the results of subsequent batches. To be safe, we avoid
    // joining to the map table when batching.
    if ($this->batchSize > 0) {
      return FALSE;
    }
    $id_map = $this->migration
      ->getIdMap();
    if (!$id_map instanceof Sql) {
      return FALSE;
    }
    $id_map_database_options = $id_map
      ->getDatabase()
      ->getConnectionOptions();
    $source_database_options = $this
      ->getDatabase()
      ->getConnectionOptions();

    // Special handling for sqlite which deals with files.
    if ($id_map_database_options['driver'] === 'sqlite' && $source_database_options['driver'] === 'sqlite' && $id_map_database_options['database'] != $source_database_options['database']) {
      return FALSE;
    }

    // FALSE if driver is PostgreSQL and database doesn't match.
    if ($id_map_database_options['driver'] === 'pgsql' && $source_database_options['driver'] === 'pgsql' && $id_map_database_options['database'] != $source_database_options['database']) {
      return FALSE;
    }
    foreach ([
      'username',
      'password',
      'host',
      'port',
      'namespace',
      'driver',
    ] as $key) {
      if (isset($source_database_options[$key]) && isset($id_map_database_options[$key])) {
        if ($id_map_database_options[$key] != $source_database_options[$key]) {
          return FALSE;
        }
      }
    }
    return TRUE;
  }

  /**
   * {@inheritdoc}
   */
  public function __sleep() {
    return array_diff(parent::__sleep(), [
      'database',
    ]);
  }

}

Members

Namesort descending Modifiers Type Description Overrides
DependencySerializationTrait::$_entityStorages protected property
DependencySerializationTrait::$_serviceIds protected property
DependencySerializationTrait::__wakeup public function 2
MessengerTrait::$messenger protected property The messenger. 18
MessengerTrait::messenger public function Gets the messenger. 18
MessengerTrait::setMessenger public function Sets the messenger.
MigrateSourceInterface::fields public function Returns available fields on the source. 87
MigrateSourceInterface::getIds public function Defines the source fields uniquely identifying a source row. 87
MigrateSourceInterface::NOT_COUNTABLE constant Indicates that the source is not countable.
PluginBase::$configuration protected property Configuration information passed into the plugin. 1
PluginBase::$pluginDefinition protected property The plugin implementation definition.
PluginBase::$pluginId protected property The plugin_id.
PluginBase::DERIVATIVE_SEPARATOR constant A string which is used to separate base plugin IDs from the derivative ID.
PluginBase::getBaseId public function
PluginBase::getDerivativeId public function
PluginBase::getPluginDefinition public function 2
PluginBase::getPluginId public function
PluginBase::isConfigurable public function Determines if the plugin is configurable.
SourcePluginBase::$cache protected property The backend cache.
SourcePluginBase::$cacheCounts protected property Whether this instance should cache the source count. 1
SourcePluginBase::$cacheKey protected property Key to use for caching counts.
SourcePluginBase::$currentRow protected property The current row from the query.
SourcePluginBase::$currentSourceIds protected property The primary key of the current row.
SourcePluginBase::$highWaterProperty protected property Information on the property used as the high-water mark.
SourcePluginBase::$highWaterStorage protected property The key-value storage for the high-water value.
SourcePluginBase::$idMap protected property The migration ID map.
SourcePluginBase::$iterator protected property The iterator to iterate over the source rows.
SourcePluginBase::$mapRowAdded protected property Flags whether source plugin will read the map row and add to data row.
SourcePluginBase::$migration protected property The entity migration object.
SourcePluginBase::$moduleHandler protected property The module handler service. 2
SourcePluginBase::$originalHighWater protected property The high water mark at the beginning of the import operation.
SourcePluginBase::$skipCount protected property Whether this instance should not attempt to count the source. 1
SourcePluginBase::$trackChanges protected property Flags whether to track changes to incoming data. 1
SourcePluginBase::aboveHighWater protected function Check if the incoming data is newer than what we've previously imported.
SourcePluginBase::count public function 2
SourcePluginBase::current public function
SourcePluginBase::getCache protected function Gets the cache object.
SourcePluginBase::getCurrentIds public function Gets the currentSourceIds data member.
SourcePluginBase::getHighWater protected function The current value of the high water mark.
SourcePluginBase::getHighWaterField protected function Get the name of the field used as the high watermark.
SourcePluginBase::getHighWaterProperty protected function Get information on the property used as the high watermark.
SourcePluginBase::getHighWaterStorage protected function Get the high water storage object.
SourcePluginBase::getIterator protected function Returns the iterator that will yield the row arrays to be processed.
SourcePluginBase::getModuleHandler protected function Gets the module handler.
SourcePluginBase::getSourceModule public function Gets the source module providing the source data. Overrides MigrateSourceInterface::getSourceModule
SourcePluginBase::key public function
SourcePluginBase::next public function
SourcePluginBase::postRollback public function Performs post-rollback tasks. Overrides RollbackAwareInterface::postRollback
SourcePluginBase::prepareRow public function Adds additional data to the row. Overrides MigrateSourceInterface::prepareRow 50
SourcePluginBase::preRollback public function Performs pre-rollback tasks. Overrides RollbackAwareInterface::preRollback
SourcePluginBase::rewind public function
SourcePluginBase::rowChanged protected function Checks if the incoming row has changed since our last import.
SourcePluginBase::saveHighWater protected function Save the new high water mark.
SourcePluginBase::valid public function
SqlBase::$batch protected property The count of the number of batches run.
SqlBase::$batchSize protected property Number of records to fetch from the database during each batch.
SqlBase::$database protected property The database object. 1
SqlBase::$query protected property The query string. 82
SqlBase::$state protected property State service for retrieving database info.
SqlBase::checkRequirements public function Checks if requirements for this plugin are OK. Overrides RequirementsInterface::checkRequirements 1
SqlBase::create public static function Creates an instance of the plugin. Overrides ContainerFactoryPluginInterface::create 1
SqlBase::doCount protected function Gets the source count using countQuery(). Overrides SourcePluginBase::doCount 6
SqlBase::fetchNextBatch protected function Prepares query for the next set of data from the source database.
SqlBase::fetchNextRow protected function Position the iterator to the following row. Overrides SourcePluginBase::fetchNextRow
SqlBase::getDatabase public function Gets the database connection object. 2
SqlBase::initializeIterator protected function Initializes the iterator with the source data. Overrides SourcePluginBase::initializeIterator 18
SqlBase::mapJoinable protected function Checks if we can join against the map table. 1
SqlBase::prepareQuery protected function Adds tags and metadata to the query.
SqlBase::query abstract public function 82
SqlBase::select protected function Wrapper for database select.
SqlBase::setUpDatabase protected function Gets a connection to the referenced database.
SqlBase::__construct public function Constructs a \Drupal\Component\Plugin\PluginBase object. Overrides SourcePluginBase::__construct 3
SqlBase::__sleep public function Overrides DependencySerializationTrait::__sleep
SqlBase::__toString public function Prints the query string when the object is used as a string. Overrides MigrateSourceInterface::__toString
StringTranslationTrait::$stringTranslation protected property The string translation service. 3
StringTranslationTrait::formatPlural protected function Formats a string containing a count of items.
StringTranslationTrait::getNumberOfPlurals protected function Returns the number of plurals supported by a given language.
StringTranslationTrait::getStringTranslation protected function Gets the string translation service.
StringTranslationTrait::setStringTranslation public function Sets the string translation service to use. 1
StringTranslationTrait::t protected function Translates a string to the current language or to a given language.