You are here

protected function MigrateExecutable::processPipeline in Drupal 10

Runs a process pipeline.

Parameters

\Drupal\migrate\Row $row: The $row to be processed.

string $destination: The destination property name.

array $plugins: The process pipeline plugins.

mixed $value: (optional) Initial value of the pipeline for the destination.

Throws

\Drupal\migrate\MigrateException

See also

\Drupal\migrate\MigrateExecutableInterface::processRow

2 calls to MigrateExecutable::processPipeline()
MigrateExecutable::import in core/modules/migrate/src/MigrateExecutable.php
Performs an import operation - migrate items from source to destination.
MigrateExecutable::processRow in core/modules/migrate/src/MigrateExecutable.php
Processes a row.

File

core/modules/migrate/src/MigrateExecutable.php, line 412

Class

MigrateExecutable
Defines a migrate executable class.

Namespace

Drupal\migrate

Code

protected function processPipeline(Row $row, string $destination, array $plugins, $value) {
  $multiple = FALSE;

  /** @var \Drupal\migrate\Plugin\MigrateProcessInterface $plugin */
  foreach ($plugins as $plugin) {
    $definition = $plugin
      ->getPluginDefinition();

    // Many plugins expect a scalar value but the current value of the
    // pipeline might be multiple scalars (this is set by the previous plugin)
    // and in this case the current value needs to be iterated and each scalar
    // separately transformed.
    if ($multiple && !$definition['handle_multiples']) {
      $new_value = [];
      if (!is_array($value)) {
        throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin
          ->getPluginId(), $destination, $value));
      }
      $break = FALSE;
      foreach ($value as $scalar_value) {
        try {
          $new_value[] = $plugin
            ->transform($scalar_value, $this, $row, $destination);
        } catch (MigrateSkipProcessException $e) {
          $new_value[] = NULL;
          $break = TRUE;
        } catch (MigrateException $e) {

          // Prepend the process plugin id to the message.
          $message = sprintf("%s: %s", $plugin
            ->getPluginId(), $e
            ->getMessage());
          throw new MigrateException($message);
        }
      }
      $value = $new_value;
      if ($break) {
        break;
      }
    }
    else {
      try {
        $value = $plugin
          ->transform($value, $this, $row, $destination);
      } catch (MigrateSkipProcessException $e) {
        $value = NULL;
        break;
      } catch (MigrateException $e) {

        // Prepend the process plugin id to the message.
        $message = sprintf("%s: %s", $plugin
          ->getPluginId(), $e
          ->getMessage());
        throw new MigrateException($message);
      }
      $multiple = $plugin
        ->multiple();
    }
  }

  // Ensure all values, including nulls, are migrated.
  if ($plugins) {
    if (isset($value)) {
      $row
        ->setDestinationProperty($destination, $value);
    }
    else {
      $row
        ->setEmptyDestinationProperty($destination);
    }
  }
}