You are here

protected function Sql::ensureTables in Drupal 9

Same name and namespace in other branches
  1. 8 core/modules/migrate/src/Plugin/migrate/id_map/Sql.php \Drupal\migrate\Plugin\migrate\id_map\Sql::ensureTables()

Create the map and message tables if they don't already exist.

Throws

\Drupal\Core\Database\DatabaseException

2 calls to Sql::ensureTables()
Sql::init in core/modules/migrate/src/Plugin/migrate/id_map/Sql.php
Initialize the plugin.
TestSqlIdMap::ensureTables in core/modules/migrate/tests/src/Unit/TestSqlIdMap.php
Create the map and message tables if they don't already exist.
1 method overrides Sql::ensureTables()
TestSqlIdMap::ensureTables in core/modules/migrate/tests/src/Unit/TestSqlIdMap.php
Create the map and message tables if they don't already exist.

File

core/modules/migrate/src/Plugin/migrate/id_map/Sql.php, line 318

Class

Sql
Defines the sql based ID map implementation.

Namespace

Drupal\migrate\Plugin\migrate\id_map

Code

protected function ensureTables() {
  if (!$this
    ->getDatabase()
    ->schema()
    ->tableExists($this->mapTableName)) {

    // Generate appropriate schema info for the map and message tables,
    // and map from the source field names to the map/msg field names.
    $count = 1;
    $source_id_schema = [];
    $indexes = [];
    foreach ($this->migration
      ->getSourcePlugin()
      ->getIds() as $id_definition) {
      $mapkey = 'sourceid' . $count++;
      $indexes['source'][] = $mapkey;
      $source_id_schema[$mapkey] = $this
        ->getFieldSchema($id_definition);
      $source_id_schema[$mapkey]['not null'] = TRUE;
    }
    $source_ids_hash[$this::SOURCE_IDS_HASH] = [
      'type' => 'varchar',
      'length' => '64',
      'not null' => TRUE,
      'description' => 'Hash of source ids. Used as primary key',
    ];
    $fields = $source_ids_hash + $source_id_schema;

    // Add destination identifiers to map table.
    // @todo How do we discover the destination schema?
    $count = 1;
    foreach ($this->migration
      ->getDestinationPlugin()
      ->getIds() as $id_definition) {

      // Allow dest identifier fields to be NULL (for IGNORED/FAILED cases).
      $mapkey = 'destid' . $count++;
      $fields[$mapkey] = $this
        ->getFieldSchema($id_definition);
      $fields[$mapkey]['not null'] = FALSE;
    }
    $fields['source_row_status'] = [
      'type' => 'int',
      'size' => 'tiny',
      'unsigned' => TRUE,
      'not null' => TRUE,
      'default' => MigrateIdMapInterface::STATUS_IMPORTED,
      'description' => 'Indicates current status of the source row',
    ];
    $fields['rollback_action'] = [
      'type' => 'int',
      'size' => 'tiny',
      'unsigned' => TRUE,
      'not null' => TRUE,
      'default' => MigrateIdMapInterface::ROLLBACK_DELETE,
      'description' => 'Flag indicating what to do for this item on rollback',
    ];
    $fields['last_imported'] = [
      'type' => 'int',
      'unsigned' => TRUE,
      'not null' => TRUE,
      'default' => 0,
      'description' => 'UNIX timestamp of the last time this row was imported',
    ];
    $fields['hash'] = [
      'type' => 'varchar',
      'length' => '64',
      'not null' => FALSE,
      'description' => 'Hash of source row data, for detecting changes',
    ];

    // To keep within the MySQL maximum key length of 3072 bytes we try
    // different groupings of the source IDs. Groups are created in chunks
    // starting at a chunk size equivalent to the number of the source IDs.
    // On each loop the chunk size is reduced by one until either the map
    // table is successfully created or the chunk_size is less than zero. If
    // there are no source IDs the table is created.
    $chunk_size = count($source_id_schema);
    while ($chunk_size >= 0) {
      $indexes = [];
      if ($chunk_size > 0) {
        foreach (array_chunk(array_keys($source_id_schema), $chunk_size) as $key => $index_columns) {
          $index_name = $key === 0 ? 'source' : "source{$key}";
          $indexes[$index_name] = $index_columns;
        }
      }
      $schema = [
        'description' => 'Mappings from source identifier value(s) to destination identifier value(s).',
        'fields' => $fields,
        'primary key' => [
          $this::SOURCE_IDS_HASH,
        ],
        'indexes' => $indexes,
      ];
      try {
        $this
          ->getDatabase()
          ->schema()
          ->createTable($this->mapTableName, $schema);
        break;
      } catch (DatabaseException $e) {
        $pdo_exception = $e
          ->getPrevious();
        $mysql_index_error = $pdo_exception instanceof \PDOException && $pdo_exception
          ->getCode() === '42000' && $pdo_exception->errorInfo[1] === 1071;
        $chunk_size--;

        // Rethrow the exception if the source IDs can not be in smaller
        // groups.
        if (!$mysql_index_error || $chunk_size <= 0) {
          throw $e;
        }
      }
    }

    // Now do the message table.
    if (!$this
      ->getDatabase()
      ->schema()
      ->tableExists($this
      ->messageTableName())) {
      $fields = [];
      $fields['msgid'] = [
        'type' => 'serial',
        'unsigned' => TRUE,
        'not null' => TRUE,
      ];
      $fields += $source_ids_hash;
      $fields['level'] = [
        'type' => 'int',
        'unsigned' => TRUE,
        'not null' => TRUE,
        'default' => 1,
      ];
      $fields['message'] = [
        'type' => 'text',
        'size' => 'medium',
        'not null' => TRUE,
      ];
      $schema = [
        'description' => 'Messages generated during a migration process',
        'fields' => $fields,
        'primary key' => [
          'msgid',
        ],
      ];
      $this
        ->getDatabase()
        ->schema()
        ->createTable($this
        ->messageTableName(), $schema);
    }
  }
  else {

    // Add any missing columns to the map table.
    if (!$this
      ->getDatabase()
      ->schema()
      ->fieldExists($this->mapTableName, 'rollback_action')) {
      $this
        ->getDatabase()
        ->schema()
        ->addField($this->mapTableName, 'rollback_action', [
        'type' => 'int',
        'size' => 'tiny',
        'unsigned' => TRUE,
        'not null' => TRUE,
        'default' => 0,
        'description' => 'Flag indicating what to do for this item on rollback',
      ]);
    }
    if (!$this
      ->getDatabase()
      ->schema()
      ->fieldExists($this->mapTableName, 'hash')) {
      $this
        ->getDatabase()
        ->schema()
        ->addField($this->mapTableName, 'hash', [
        'type' => 'varchar',
        'length' => '64',
        'not null' => FALSE,
        'description' => 'Hash of source row data, for detecting changes',
      ]);
    }
    if (!$this
      ->getDatabase()
      ->schema()
      ->fieldExists($this->mapTableName, $this::SOURCE_IDS_HASH)) {
      $this
        ->getDatabase()
        ->schema()
        ->addField($this->mapTableName, $this::SOURCE_IDS_HASH, [
        'type' => 'varchar',
        'length' => '64',
        'not null' => TRUE,
        'description' => 'Hash of source ids. Used as primary key',
      ]);
    }
  }
}