View source  
  <?php
namespace Drupal\migrate\Plugin;
use Drupal\Component\Plugin\Exception\InvalidPluginDefinitionException;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Plugin\PluginBase;
use Drupal\migrate\Exception\RequirementsException;
use Drupal\migrate\MigrateException;
use Drupal\migrate\MigrateSkipRowException;
use Drupal\Component\Utility\NestedArray;
use Symfony\Component\DependencyInjection\ContainerInterface;
class Migration extends PluginBase implements MigrationInterface, RequirementsInterface, ContainerFactoryPluginInterface {
  
  protected $id;
  
  protected $label;
  
  protected $row;
  
  protected $source;
  
  protected $sourcePlugin;
  
  protected $process = [];
  
  protected $processPlugins = [];
  
  protected $destination;
  
  protected $destinationPlugin;
  
  protected $idMap = [];
  
  protected $idMapPlugin;
  
  protected $sourceIds = [];
  
  protected $destinationIds = [];
  
  protected $sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
  
  protected $trackLastImported = FALSE;
  
  protected $requirements = [];
  
  protected $migration_tags = [];
  
  protected $audit = FALSE;
  
  protected $migration_dependencies = [];
  
  protected $dependencies = [];
  
  protected $migrationPluginManager;
  
  protected $sourcePluginManager;
  
  protected $processPluginManager;
  
  protected $destinationPluginManager;
  
  protected $idMapPluginManager;
  
  protected $statusLabels = [
    self::STATUS_IDLE => 'Idle',
    self::STATUS_IMPORTING => 'Importing',
    self::STATUS_ROLLING_BACK => 'Rolling back',
    self::STATUS_STOPPING => 'Stopping',
    self::STATUS_DISABLED => 'Disabled',
  ];
  
  public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationPluginManagerInterface $migration_plugin_manager, MigratePluginManagerInterface $source_plugin_manager, MigratePluginManagerInterface $process_plugin_manager, MigrateDestinationPluginManager $destination_plugin_manager, MigratePluginManagerInterface $idmap_plugin_manager) {
    parent::__construct($configuration, $plugin_id, $plugin_definition);
    $this->migrationPluginManager = $migration_plugin_manager;
    $this->sourcePluginManager = $source_plugin_manager;
    $this->processPluginManager = $process_plugin_manager;
    $this->destinationPluginManager = $destination_plugin_manager;
    $this->idMapPluginManager = $idmap_plugin_manager;
    foreach (NestedArray::mergeDeepArray([
      $plugin_definition,
      $configuration,
    ], TRUE) as $key => $value) {
      $this->{$key} = $value;
    }
  }
  
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static($configuration, $plugin_id, $plugin_definition, $container
      ->get('plugin.manager.migration'), $container
      ->get('plugin.manager.migrate.source'), $container
      ->get('plugin.manager.migrate.process'), $container
      ->get('plugin.manager.migrate.destination'), $container
      ->get('plugin.manager.migrate.id_map'));
  }
  
  public function id() {
    return $this->pluginId;
  }
  
  public function label() {
    return $this->label;
  }
  
  public function get($property) {
    @trigger_error('\\Drupal\\migrate\\Plugin\\Migration::get() is deprecated in Drupal 8.1.x, will be removed before Drupal 9.0.x. Use more specific getters instead. See https://www.drupal.org/node/2873795', E_USER_DEPRECATED);
    return isset($this->{$property}) ? $this->{$property} : NULL;
  }
  
  public function getIdMapPlugin() {
    return $this->idMapPlugin;
  }
  
  public function getSourcePlugin() {
    if (!isset($this->sourcePlugin)) {
      $this->sourcePlugin = $this->sourcePluginManager
        ->createInstance($this->source['plugin'], $this->source, $this);
    }
    return $this->sourcePlugin;
  }
  
  public function getProcessPlugins(array $process = NULL) {
    if (!isset($process)) {
      $process = $this
        ->getProcess();
    }
    $index = serialize($process);
    if (!isset($this->processPlugins[$index])) {
      $this->processPlugins[$index] = [];
      foreach ($this
        ->getProcessNormalized($process) as $property => $configurations) {
        $this->processPlugins[$index][$property] = [];
        if (!is_array($configurations) && !$this->processPlugins[$index][$property]) {
          throw new MigrateException(sprintf("Process configuration for '{$property}' must be an array", $property));
        }
        foreach ($configurations as $configuration) {
          if (isset($configuration['source'])) {
            $this->processPlugins[$index][$property][] = $this->processPluginManager
              ->createInstance('get', $configuration, $this);
          }
          
          if ($configuration['plugin'] != 'get') {
            $this->processPlugins[$index][$property][] = $this->processPluginManager
              ->createInstance($configuration['plugin'], $configuration, $this);
          }
          if (!$this->processPlugins[$index][$property]) {
            throw new MigrateException("Invalid process configuration for {$property}");
          }
        }
      }
    }
    return $this->processPlugins[$index];
  }
  
  protected function getProcessNormalized(array $process) {
    $normalized_configurations = [];
    foreach ($process as $destination => $configuration) {
      if (is_string($configuration)) {
        $configuration = [
          'plugin' => 'get',
          'source' => $configuration,
        ];
      }
      if (isset($configuration['plugin'])) {
        $configuration = [
          $configuration,
        ];
      }
      if (!is_array($configuration)) {
        $migration_id = $this
          ->getPluginId();
        throw new MigrateException("Invalid process for destination '{$destination}' in migration '{$migration_id}'");
      }
      $normalized_configurations[$destination] = $configuration;
    }
    return $normalized_configurations;
  }
  
  public function getDestinationPlugin($stub_being_requested = FALSE) {
    if ($stub_being_requested && !empty($this->destination['no_stub'])) {
      throw new MigrateSkipRowException('Stub requested but not made because no_stub configuration is set.');
    }
    if (!isset($this->destinationPlugin)) {
      $this->destinationPlugin = $this->destinationPluginManager
        ->createInstance($this->destination['plugin'], $this->destination, $this);
    }
    return $this->destinationPlugin;
  }
  
  public function getIdMap() {
    if (!isset($this->idMapPlugin)) {
      $configuration = $this->idMap;
      $plugin = isset($configuration['plugin']) ? $configuration['plugin'] : 'sql';
      $this->idMapPlugin = $this->idMapPluginManager
        ->createInstance($plugin, $configuration, $this);
    }
    return $this->idMapPlugin;
  }
  
  public function checkRequirements() {
    
    if ($this
      ->getSourcePlugin() instanceof RequirementsInterface) {
      $this
        ->getSourcePlugin()
        ->checkRequirements();
    }
    if ($this
      ->getDestinationPlugin() instanceof RequirementsInterface) {
      $this
        ->getDestinationPlugin()
        ->checkRequirements();
    }
    if (empty($this->requirements)) {
      
      return;
    }
    
    $required_migrations = $this
      ->getMigrationPluginManager()
      ->createInstances($this->requirements);
    $missing_migrations = array_diff($this->requirements, array_keys($required_migrations));
    
    foreach ($required_migrations as $migration_id => $required_migration) {
      if (!$required_migration
        ->allRowsProcessed()) {
        $missing_migrations[] = $migration_id;
      }
    }
    if ($missing_migrations) {
      throw new RequirementsException('Missing migrations ' . implode(', ', $missing_migrations) . '.', [
        'requirements' => $missing_migrations,
      ]);
    }
  }
  
  protected function getMigrationPluginManager() {
    return $this->migrationPluginManager;
  }
  
  public function setStatus($status) {
    \Drupal::keyValue('migrate_status')
      ->set($this
      ->id(), $status);
  }
  
  public function getStatus() {
    return \Drupal::keyValue('migrate_status')
      ->get($this
      ->id(), static::STATUS_IDLE);
  }
  
  public function getStatusLabel() {
    $status = $this
      ->getStatus();
    if (isset($this->statusLabels[$status])) {
      return $this->statusLabels[$status];
    }
    else {
      return '';
    }
  }
  
  public function getInterruptionResult() {
    return \Drupal::keyValue('migrate_interruption_result')
      ->get($this
      ->id(), static::RESULT_INCOMPLETE);
  }
  
  public function clearInterruptionResult() {
    \Drupal::keyValue('migrate_interruption_result')
      ->delete($this
      ->id());
  }
  
  public function interruptMigration($result) {
    $this
      ->setStatus(MigrationInterface::STATUS_STOPPING);
    \Drupal::keyValue('migrate_interruption_result')
      ->set($this
      ->id(), $result);
  }
  
  public function allRowsProcessed() {
    $source_count = $this
      ->getSourcePlugin()
      ->count();
    
    if ($source_count < 0) {
      return TRUE;
    }
    $processed_count = $this
      ->getIdMap()
      ->processedCount();
    
    return $source_count <= $processed_count;
  }
  
  public function set($property_name, $value) {
    if ($property_name == 'source') {
      
      unset($this->sourcePlugin);
    }
    elseif ($property_name === 'destination') {
      
      unset($this->destinationPlugin);
    }
    $this->{$property_name} = $value;
    return $this;
  }
  
  public function getProcess() {
    return $this
      ->getProcessNormalized($this->process);
  }
  
  public function setProcess(array $process) {
    $this->process = $process;
    return $this;
  }
  
  public function setProcessOfProperty($property, $process_of_property) {
    $this->process[$property] = $process_of_property;
    return $this;
  }
  
  public function mergeProcessOfProperty($property, array $process_of_property) {
    
    $current_process = $this
      ->getProcess();
    if (isset($current_process[$property])) {
      $this->process = NestedArray::mergeDeepArray([
        $current_process,
        $this
          ->getProcessNormalized([
          $property => $process_of_property,
        ]),
      ], TRUE);
    }
    else {
      $this
        ->setProcessOfProperty($property, $process_of_property);
    }
    return $this;
  }
  
  public function isTrackLastImported() {
    return $this->trackLastImported;
  }
  
  public function setTrackLastImported($track_last_imported) {
    $this->trackLastImported = (bool) $track_last_imported;
    return $this;
  }
  
  public function getMigrationDependencies() {
    $this->migration_dependencies = ($this->migration_dependencies ?: []) + [
      'required' => [],
      'optional' => [],
    ];
    if (count($this->migration_dependencies) !== 2 || !is_array($this->migration_dependencies['required']) || !is_array($this->migration_dependencies['optional'])) {
      throw new InvalidPluginDefinitionException($this
        ->id(), "Invalid migration dependencies configuration for migration {$this->id()}");
    }
    $this->migration_dependencies['optional'] = array_unique(array_merge($this->migration_dependencies['optional'], $this
      ->findMigrationDependencies($this->process)));
    return $this->migration_dependencies;
  }
  
  protected function findMigrationDependencies($process) {
    $return = [];
    foreach ($this
      ->getProcessNormalized($process) as $process_pipeline) {
      foreach ($process_pipeline as $plugin_configuration) {
        if (in_array($plugin_configuration['plugin'], [
          'migration',
          'migration_lookup',
        ], TRUE)) {
          $return = array_merge($return, (array) $plugin_configuration['migration']);
        }
        if (in_array($plugin_configuration['plugin'], [
          'iterator',
          'sub_process',
        ], TRUE)) {
          $return = array_merge($return, $this
            ->findMigrationDependencies($plugin_configuration['process']));
        }
      }
    }
    return $return;
  }
  
  public function getPluginDefinition() {
    $definition = [];
    
    foreach (parent::getPluginDefinition() as $key => $value) {
      $definition[$key] = isset($this->{$key}) ? $this->{$key} : $value;
    }
    return $definition;
  }
  
  public function getDestinationConfiguration() {
    return $this->destination;
  }
  
  public function getSourceConfiguration() {
    return $this->source;
  }
  
  public function getTrackLastImported() {
    return $this->trackLastImported;
  }
  
  public function getDestinationIds() {
    return $this->destinationIds;
  }
  
  public function getMigrationTags() {
    return $this->migration_tags;
  }
  
  public function isAuditable() {
    return (bool) $this->audit;
  }
}