View source  
  <?php
namespace Drupal\migrate_upgrade;
use Drupal\Core\Link;
use Drupal\Core\Url;
use Drupal\migrate\Entity\Migration;
use Drupal\migrate\Entity\MigrationInterface;
use Drupal\migrate\Event\MigrateEvents;
use Drupal\migrate\Event\MigrateIdMapMessageEvent;
use Drupal\migrate\Event\MigrateMapDeleteEvent;
use Drupal\migrate\Event\MigrateMapSaveEvent;
use Drupal\migrate\Event\MigratePostRowSaveEvent;
use Drupal\migrate\Event\MigrateRowDeleteEvent;
use Drupal\migrate\MigrateExecutable;
class MigrateUpgradeRunBatch {
  
  const MESSAGE_LENGTH = 20;
  
  protected static $numProcessed = 0;
  
  protected static $listenersAdded = FALSE;
  
  protected static $maxExecTime;
  
  protected static $messages;
  
  public static function run($initial_ids, $operation, &$context) {
    if (!static::$listenersAdded) {
      $event_dispatcher = \Drupal::service('event_dispatcher');
      if ($operation == 'import') {
        $event_dispatcher
          ->addListener(MigrateEvents::POST_ROW_SAVE, [
          static::class,
          'onPostRowSave',
        ]);
        $event_dispatcher
          ->addListener(MigrateEvents::MAP_SAVE, [
          static::class,
          'onMapSave',
        ]);
        $event_dispatcher
          ->addListener(MigrateEvents::IDMAP_MESSAGE, [
          static::class,
          'onIdMapMessage',
        ]);
      }
      else {
        $event_dispatcher
          ->addListener(MigrateEvents::POST_ROW_DELETE, [
          static::class,
          'onPostRowDelete',
        ]);
        $event_dispatcher
          ->addListener(MigrateEvents::MAP_DELETE, [
          static::class,
          'onMapDelete',
        ]);
      }
      static::$maxExecTime = ini_get('max_execution_time');
      if (static::$maxExecTime <= 0) {
        static::$maxExecTime = 60;
      }
      
      static::$maxExecTime -= 3;
      static::$listenersAdded = TRUE;
    }
    if (!isset($context['sandbox']['migration_ids'])) {
      $context['sandbox']['max'] = count($initial_ids);
      $context['sandbox']['current'] = 1;
      
      $context['sandbox']['num_processed'] = 0;
      
      $context['sandbox']['migration_ids'] = $initial_ids;
      $context['sandbox']['messages'] = [];
      $context['results']['failures'] = 0;
      $context['results']['successes'] = 0;
      $context['results']['operation'] = $operation;
    }
    
    static::$numProcessed = 0;
    $migration_id = reset($context['sandbox']['migration_ids']);
    
    $migration = Migration::load($migration_id);
    if ($migration) {
      static::$messages = new MigrateMessageCapture();
      $executable = new MigrateExecutable($migration, static::$messages);
      $migration_name = $migration
        ->label() ? $migration
        ->label() : $migration_id;
      try {
        if ($operation == 'import') {
          $migration_status = $executable
            ->import();
        }
        else {
          $migration_status = $executable
            ->rollback();
        }
      } catch (\Exception $e) {
        static::logger()
          ->error($e
          ->getMessage());
        $migration_status = MigrationInterface::RESULT_FAILED;
      }
      switch ($migration_status) {
        case MigrationInterface::RESULT_COMPLETED:
          
          $context['sandbox']['num_processed'] += static::$numProcessed;
          if ($operation == 'import') {
            $message = static::getTranslation()
              ->formatPlural($context['sandbox']['num_processed'], 'Upgraded @migration (processed 1 item total)', 'Upgraded @migration (processed @num_processed items total)', [
              '@migration' => $migration_name,
              '@num_processed' => $context['sandbox']['num_processed'],
            ]);
          }
          else {
            $message = static::getTranslation()
              ->formatPlural($context['sandbox']['num_processed'], 'Rolled back @migration (processed 1 item total)', 'Rolled back @migration (processed @num_processed items total)', [
              '@migration' => $migration_name,
              '@num_processed' => $context['sandbox']['num_processed'],
            ]);
            $migration
              ->delete();
          }
          $context['sandbox']['messages'][] = $message;
          static::logger()
            ->notice($message);
          $context['sandbox']['num_processed'] = 0;
          $context['results']['successes']++;
          break;
        case MigrationInterface::RESULT_INCOMPLETE:
          $context['sandbox']['messages'][] = static::getTranslation()
            ->formatPlural(static::$numProcessed, 'Continuing with @migration (processed 1 item)', 'Continuing with @migration (processed @num_processed items)', [
            '@migration' => $migration_name,
            '@num_processed' => static::$numProcessed,
          ]);
          $context['sandbox']['num_processed'] += static::$numProcessed;
          break;
        case MigrationInterface::RESULT_STOPPED:
          $context['sandbox']['messages'][] = t('Operation stopped by request');
          break;
        case MigrationInterface::RESULT_FAILED:
          $context['sandbox']['messages'][] = t('Operation on @migration failed', [
            '@migration' => $migration_name,
          ]);
          $context['results']['failures']++;
          static::logger()
            ->error('Operation on @migration failed', [
            '@migration' => $migration_name,
          ]);
          break;
        case MigrationInterface::RESULT_SKIPPED:
          $context['sandbox']['messages'][] = t('Operation on @migration skipped due to unfulfilled dependencies', [
            '@migration' => $migration_name,
          ]);
          static::logger()
            ->error('Operation on @migration skipped due to unfulfilled dependencies', [
            '@migration' => $migration_name,
          ]);
          break;
        case MigrationInterface::RESULT_DISABLED:
          
          break;
      }
      
      if ($migration_status != MigrationInterface::RESULT_INCOMPLETE) {
        array_shift($context['sandbox']['migration_ids']);
        $context['sandbox']['current']++;
      }
      
      foreach (static::$messages
        ->getMessages() as $message) {
        $context['sandbox']['messages'][] = $message;
        static::logger()
          ->error($message);
      }
      
      $message_count = count($context['sandbox']['messages']);
      $context['message'] = '';
      for ($index = max(0, $message_count - self::MESSAGE_LENGTH); $index < $message_count; $index++) {
        $context['message'] = $context['sandbox']['messages'][$index] . "<br />\n" . $context['message'];
      }
      if ($message_count > self::MESSAGE_LENGTH) {
        
        $context['message'] .= '…';
      }
      
      if (!empty($context['sandbox']['migration_ids'])) {
        $migration_id = reset($context['sandbox']['migration_ids']);
        $migration = Migration::load($migration_id);
        $migration_name = $migration
          ->label() ? $migration
          ->label() : $migration_id;
        if ($operation == 'import') {
          $context['message'] = t('Currently upgrading @migration (@current of @max total tasks)', [
            '@migration' => $migration_name,
            '@current' => $context['sandbox']['current'],
            '@max' => $context['sandbox']['max'],
          ]) . "<br />\n" . $context['message'];
        }
        else {
          $context['message'] = t('Currently rolling back @migration (@current of @max total tasks)', [
            '@migration' => $migration_name,
            '@current' => $context['sandbox']['current'],
            '@max' => $context['sandbox']['max'],
          ]) . "<br />\n" . $context['message'];
        }
      }
    }
    else {
      array_shift($context['sandbox']['migration_ids']);
      $context['sandbox']['current']++;
    }
    $context['finished'] = 1 - count($context['sandbox']['migration_ids']) / $context['sandbox']['max'];
  }
  
  protected static function logger() {
    return \Drupal::logger('migrate_upgrade');
  }
  
  protected static function getTranslation() {
    return \Drupal::translation();
  }
  
  public static function finished($success, $results, $operations, $elapsed) {
    static::displayResults($results);
  }
  
  protected static function displayResults($results) {
    $successes = $results['successes'];
    $failures = $results['failures'];
    
    if ($successes > 0) {
      if ($results['operation'] == 'import') {
        drupal_set_message(static::getTranslation()
          ->formatPlural($successes, 'Completed 1 upgrade task successfully', 'Completed @count upgrade tasks successfully'));
      }
      else {
        drupal_set_message(static::getTranslation()
          ->formatPlural($successes, 'Completed 1 rollback task successfully', 'Completed @count rollback tasks successfully'));
      }
    }
    
    if ($failures > 0) {
      if ($results['operation'] == 'import') {
        drupal_set_message(static::getTranslation()
          ->formatPlural($failures, '1 upgrade failed', '@count upgrades failed'));
        drupal_set_message(t('Upgrade process not completed'), 'error');
      }
      else {
        drupal_set_message(static::getTranslation()
          ->formatPlural($failures, '1 rollback failed', '@count rollbacks failed'));
        drupal_set_message(t('Rollback process not completed'), 'error');
      }
    }
    else {
      if ($results['operation'] == 'import') {
        
        drupal_set_message(t('Congratulations, you upgraded Drupal!'));
      }
      else {
        drupal_set_message(t('Rollback of the upgrade is complete - you may now start the upgrade process from scratch.'));
      }
    }
    if (\Drupal::moduleHandler()
      ->moduleExists('dblog')) {
      $url = Url::fromRoute('migrate_upgrade.log');
      drupal_set_message(Link::fromTextAndUrl(t('Review the detailed upgrade log'), $url), $failures ? 'error' : 'status');
    }
  }
  
  public static function onPostRowSave(MigratePostRowSaveEvent $event) {
    
    if (time() - REQUEST_TIME > static::$maxExecTime) {
      $event
        ->getMigration()
        ->interruptMigration(MigrationInterface::RESULT_INCOMPLETE);
    }
  }
  
  public static function onPostRowDelete(MigrateRowDeleteEvent $event) {
    
    if (time() - REQUEST_TIME > static::$maxExecTime) {
      $event
        ->getMigration()
        ->interruptMigration(MigrationInterface::RESULT_INCOMPLETE);
    }
  }
  
  public static function onMapSave(MigrateMapSaveEvent $event) {
    static::$numProcessed++;
  }
  
  public static function onMapDelete(MigrateMapDeleteEvent $event) {
    static::$numProcessed++;
  }
  
  public static function onIdMapMessage(MigrateIdMapMessageEvent $event) {
    if ($event
      ->getLevel() == MigrationInterface::MESSAGE_NOTICE || $event
      ->getLevel() == MigrationInterface::MESSAGE_INFORMATIONAL) {
      $type = 'status';
    }
    else {
      $type = 'error';
    }
    $source_id_string = implode(',', $event
      ->getSourceIdValues());
    $message = t('Source ID @source_id: @message', [
      '@source_id' => $source_id_string,
      '@message' => $event
        ->getMessage(),
    ]);
    static::$messages
      ->display($message, $type);
  }
}