View source
<?php
namespace Drupal\migrate_upgrade;
use Drupal\Core\Link;
use Drupal\Core\Url;
use Drupal\migrate\Entity\Migration;
use Drupal\migrate\Entity\MigrationInterface;
use Drupal\migrate\Event\MigrateEvents;
use Drupal\migrate\Event\MigrateIdMapMessageEvent;
use Drupal\migrate\Event\MigrateMapDeleteEvent;
use Drupal\migrate\Event\MigrateMapSaveEvent;
use Drupal\migrate\Event\MigratePostRowSaveEvent;
use Drupal\migrate\Event\MigrateRowDeleteEvent;
use Drupal\migrate\MigrateExecutable;
class MigrateUpgradeRunBatch {
const MESSAGE_LENGTH = 20;
protected static $numProcessed = 0;
protected static $listenersAdded = FALSE;
protected static $maxExecTime;
protected static $messages;
public static function run($initial_ids, $operation, &$context) {
if (!static::$listenersAdded) {
$event_dispatcher = \Drupal::service('event_dispatcher');
if ($operation == 'import') {
$event_dispatcher
->addListener(MigrateEvents::POST_ROW_SAVE, [
static::class,
'onPostRowSave',
]);
$event_dispatcher
->addListener(MigrateEvents::MAP_SAVE, [
static::class,
'onMapSave',
]);
$event_dispatcher
->addListener(MigrateEvents::IDMAP_MESSAGE, [
static::class,
'onIdMapMessage',
]);
}
else {
$event_dispatcher
->addListener(MigrateEvents::POST_ROW_DELETE, [
static::class,
'onPostRowDelete',
]);
$event_dispatcher
->addListener(MigrateEvents::MAP_DELETE, [
static::class,
'onMapDelete',
]);
}
static::$maxExecTime = ini_get('max_execution_time');
if (static::$maxExecTime <= 0) {
static::$maxExecTime = 60;
}
static::$maxExecTime -= 3;
static::$listenersAdded = TRUE;
}
if (!isset($context['sandbox']['migration_ids'])) {
$context['sandbox']['max'] = count($initial_ids);
$context['sandbox']['current'] = 1;
$context['sandbox']['num_processed'] = 0;
$context['sandbox']['migration_ids'] = $initial_ids;
$context['sandbox']['messages'] = [];
$context['results']['failures'] = 0;
$context['results']['successes'] = 0;
$context['results']['operation'] = $operation;
}
static::$numProcessed = 0;
$migration_id = reset($context['sandbox']['migration_ids']);
$migration = Migration::load($migration_id);
if ($migration) {
static::$messages = new MigrateMessageCapture();
$executable = new MigrateExecutable($migration, static::$messages);
$migration_name = $migration
->label() ? $migration
->label() : $migration_id;
try {
if ($operation == 'import') {
$migration_status = $executable
->import();
}
else {
$migration_status = $executable
->rollback();
}
} catch (\Exception $e) {
static::logger()
->error($e
->getMessage());
$migration_status = MigrationInterface::RESULT_FAILED;
}
switch ($migration_status) {
case MigrationInterface::RESULT_COMPLETED:
$context['sandbox']['num_processed'] += static::$numProcessed;
if ($operation == 'import') {
$message = static::getTranslation()
->formatPlural($context['sandbox']['num_processed'], 'Upgraded @migration (processed 1 item total)', 'Upgraded @migration (processed @num_processed items total)', [
'@migration' => $migration_name,
'@num_processed' => $context['sandbox']['num_processed'],
]);
}
else {
$message = static::getTranslation()
->formatPlural($context['sandbox']['num_processed'], 'Rolled back @migration (processed 1 item total)', 'Rolled back @migration (processed @num_processed items total)', [
'@migration' => $migration_name,
'@num_processed' => $context['sandbox']['num_processed'],
]);
$migration
->delete();
}
$context['sandbox']['messages'][] = $message;
static::logger()
->notice($message);
$context['sandbox']['num_processed'] = 0;
$context['results']['successes']++;
break;
case MigrationInterface::RESULT_INCOMPLETE:
$context['sandbox']['messages'][] = static::getTranslation()
->formatPlural(static::$numProcessed, 'Continuing with @migration (processed 1 item)', 'Continuing with @migration (processed @num_processed items)', [
'@migration' => $migration_name,
'@num_processed' => static::$numProcessed,
]);
$context['sandbox']['num_processed'] += static::$numProcessed;
break;
case MigrationInterface::RESULT_STOPPED:
$context['sandbox']['messages'][] = t('Operation stopped by request');
break;
case MigrationInterface::RESULT_FAILED:
$context['sandbox']['messages'][] = t('Operation on @migration failed', [
'@migration' => $migration_name,
]);
$context['results']['failures']++;
static::logger()
->error('Operation on @migration failed', [
'@migration' => $migration_name,
]);
break;
case MigrationInterface::RESULT_SKIPPED:
$context['sandbox']['messages'][] = t('Operation on @migration skipped due to unfulfilled dependencies', [
'@migration' => $migration_name,
]);
static::logger()
->error('Operation on @migration skipped due to unfulfilled dependencies', [
'@migration' => $migration_name,
]);
break;
case MigrationInterface::RESULT_DISABLED:
break;
}
if ($migration_status != MigrationInterface::RESULT_INCOMPLETE) {
array_shift($context['sandbox']['migration_ids']);
$context['sandbox']['current']++;
}
foreach (static::$messages
->getMessages() as $message) {
$context['sandbox']['messages'][] = $message;
static::logger()
->error($message);
}
$message_count = count($context['sandbox']['messages']);
$context['message'] = '';
for ($index = max(0, $message_count - self::MESSAGE_LENGTH); $index < $message_count; $index++) {
$context['message'] = $context['sandbox']['messages'][$index] . "<br />\n" . $context['message'];
}
if ($message_count > self::MESSAGE_LENGTH) {
$context['message'] .= '…';
}
if (!empty($context['sandbox']['migration_ids'])) {
$migration_id = reset($context['sandbox']['migration_ids']);
$migration = Migration::load($migration_id);
$migration_name = $migration
->label() ? $migration
->label() : $migration_id;
if ($operation == 'import') {
$context['message'] = t('Currently upgrading @migration (@current of @max total tasks)', [
'@migration' => $migration_name,
'@current' => $context['sandbox']['current'],
'@max' => $context['sandbox']['max'],
]) . "<br />\n" . $context['message'];
}
else {
$context['message'] = t('Currently rolling back @migration (@current of @max total tasks)', [
'@migration' => $migration_name,
'@current' => $context['sandbox']['current'],
'@max' => $context['sandbox']['max'],
]) . "<br />\n" . $context['message'];
}
}
}
else {
array_shift($context['sandbox']['migration_ids']);
$context['sandbox']['current']++;
}
$context['finished'] = 1 - count($context['sandbox']['migration_ids']) / $context['sandbox']['max'];
}
protected static function logger() {
return \Drupal::logger('migrate_upgrade');
}
protected static function getTranslation() {
return \Drupal::translation();
}
public static function finished($success, $results, $operations, $elapsed) {
static::displayResults($results);
}
protected static function displayResults($results) {
$successes = $results['successes'];
$failures = $results['failures'];
if ($successes > 0) {
if ($results['operation'] == 'import') {
drupal_set_message(static::getTranslation()
->formatPlural($successes, 'Completed 1 upgrade task successfully', 'Completed @count upgrade tasks successfully'));
}
else {
drupal_set_message(static::getTranslation()
->formatPlural($successes, 'Completed 1 rollback task successfully', 'Completed @count rollback tasks successfully'));
}
}
if ($failures > 0) {
if ($results['operation'] == 'import') {
drupal_set_message(static::getTranslation()
->formatPlural($failures, '1 upgrade failed', '@count upgrades failed'));
drupal_set_message(t('Upgrade process not completed'), 'error');
}
else {
drupal_set_message(static::getTranslation()
->formatPlural($failures, '1 rollback failed', '@count rollbacks failed'));
drupal_set_message(t('Rollback process not completed'), 'error');
}
}
else {
if ($results['operation'] == 'import') {
drupal_set_message(t('Congratulations, you upgraded Drupal!'));
}
else {
drupal_set_message(t('Rollback of the upgrade is complete - you may now start the upgrade process from scratch.'));
}
}
if (\Drupal::moduleHandler()
->moduleExists('dblog')) {
$url = Url::fromRoute('migrate_upgrade.log');
drupal_set_message(Link::fromTextAndUrl(t('Review the detailed upgrade log'), $url), $failures ? 'error' : 'status');
}
}
public static function onPostRowSave(MigratePostRowSaveEvent $event) {
if (time() - REQUEST_TIME > static::$maxExecTime) {
$event
->getMigration()
->interruptMigration(MigrationInterface::RESULT_INCOMPLETE);
}
}
public static function onPostRowDelete(MigrateRowDeleteEvent $event) {
if (time() - REQUEST_TIME > static::$maxExecTime) {
$event
->getMigration()
->interruptMigration(MigrationInterface::RESULT_INCOMPLETE);
}
}
public static function onMapSave(MigrateMapSaveEvent $event) {
static::$numProcessed++;
}
public static function onMapDelete(MigrateMapDeleteEvent $event) {
static::$numProcessed++;
}
public static function onIdMapMessage(MigrateIdMapMessageEvent $event) {
if ($event
->getLevel() == MigrationInterface::MESSAGE_NOTICE || $event
->getLevel() == MigrationInterface::MESSAGE_INFORMATIONAL) {
$type = 'status';
}
else {
$type = 'error';
}
$source_id_string = implode(',', $event
->getSourceIdValues());
$message = t('Source ID @source_id: @message', [
'@source_id' => $source_id_string,
'@message' => $event
->getMessage(),
]);
static::$messages
->display($message, $type);
}
}