View source
<?php
namespace Drupal\migrate\Plugin\migrate\source;
use Drupal\Core\Database\Database;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\State\StateInterface;
use Drupal\migrate\Entity\MigrationInterface;
use Drupal\migrate\Plugin\migrate\id_map\Sql;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface {
protected $query;
protected $database;
protected $state;
public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
$this->state = $state;
}
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
return new static($configuration, $plugin_id, $plugin_definition, $migration, $container
->get('state'));
}
public function __toString() {
return (string) $this->query;
}
public function getDatabase() {
if (!isset($this->database)) {
if (isset($this->configuration['database_state_key'])) {
$this->database = $this
->setUpDatabase($this->state
->get($this->configuration['database_state_key']));
}
else {
$this->database = $this
->setUpDatabase($this->configuration);
}
}
return $this->database;
}
protected function setUpDatabase(array $database_info) {
if (isset($database_info['key'])) {
$key = $database_info['key'];
}
else {
$key = 'migrate';
}
if (isset($database_info['target'])) {
$target = $database_info['target'];
}
else {
$target = 'default';
}
if (isset($database_info['database'])) {
Database::addConnectionInfo($key, $target, $database_info['database']);
}
return Database::getConnection($target, $key);
}
protected function select($table, $alias = NULL, array $options = array()) {
$options['fetch'] = \PDO::FETCH_ASSOC;
return $this
->getDatabase()
->select($table, $alias, $options);
}
protected function prepareQuery() {
$this->query = clone $this
->query();
$this->query
->addTag('migrate');
$this->query
->addTag('migrate_' . $this->migration
->id());
$this->query
->addMetaData('migration', $this->migration);
return $this->query;
}
protected function initializeIterator() {
$this
->prepareQuery();
$high_water_property = $this->migration
->get('highWaterProperty');
$keys = array();
$conditions = $this->query
->orConditionGroup();
$condition_added = FALSE;
if (empty($this->configuration['ignore_map']) && $this
->mapJoinable()) {
$count = 1;
$map_join = '';
$delimiter = '';
foreach ($this
->getIds() as $field_name => $field_schema) {
if (isset($field_schema['alias'])) {
$field_name = $field_schema['alias'] . '.' . $this->query
->escapeField($field_name);
}
$map_join .= "{$delimiter}{$field_name} = map.sourceid" . $count++;
$delimiter = ' AND ';
}
$alias = $this->query
->leftJoin($this->migration
->getIdMap()
->getQualifiedMapTableName(), 'map', $map_join);
$conditions
->isNull($alias . '.sourceid1');
$conditions
->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
$condition_added = TRUE;
$n = count($this
->getIds());
for ($count = 1; $count <= $n; $count++) {
$map_key = 'sourceid' . $count;
$this->query
->addField($alias, $map_key, "migrate_map_{$map_key}");
}
if ($n = count($this->migration
->get('destinationIds'))) {
for ($count = 1; $count <= $n; $count++) {
$map_key = 'destid' . $count++;
$this->query
->addField($alias, $map_key, "migrate_map_{$map_key}");
}
}
$this->query
->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
}
if (isset($high_water_property['name']) && ($high_water = $this->migration
->getHighWater()) !== '') {
if (isset($high_water_property['alias'])) {
$high_water = $high_water_property['alias'] . '.' . $high_water_property['name'];
}
else {
$high_water = $high_water_property['name'];
}
$conditions
->condition($high_water, $high_water, '>');
$condition_added = TRUE;
}
if ($condition_added) {
$this->query
->condition($conditions);
}
return new \IteratorIterator($this->query
->execute());
}
public abstract function query();
public function count() {
return $this
->query()
->countQuery()
->execute()
->fetchField();
}
protected function mapJoinable() {
if (!$this
->getIds()) {
return FALSE;
}
$id_map = $this->migration
->getIdMap();
if (!$id_map instanceof Sql) {
return FALSE;
}
$id_map_database_options = $id_map
->getDatabase()
->getConnectionOptions();
$source_database_options = $this
->getDatabase()
->getConnectionOptions();
foreach (array(
'username',
'password',
'host',
'port',
'namespace',
'driver',
) as $key) {
if (isset($source_database_options[$key])) {
if ($id_map_database_options[$key] != $source_database_options[$key]) {
return FALSE;
}
}
}
return TRUE;
}
}