You are here

FeedImportProcessor.php in Feed Import 8

File

feed_import_base/src/FeedImportProcessor.php
View source
<?php

namespace Drupal\feed_import_base;


/**
 * Class that processess the import.
 */
class FeedImportProcessor extends FeedImportConfigurable {

  // No duplicates
  const UPDATE_COMBINE = 0;

  // Duplicates
  const UPDATE_MERGE = 1;

  // Overwrite
  const UPDATE_OVERWRITE = 2;

  // Overwrite fast
  const UPDATE_OVERWRITE_FAST = 3;

  // No field value actions.
  const ACTION_DEFAULT_VALUE = 0;
  const ACTION_DEFAULT_FILTERED_VALUE = 1;
  const ACTION_IGNORE_FIELD = 2;
  const ACTION_SKIP_ITEM = 3;

  // Action before/after combine entities
  const ENTITY_CONTINUE = 0;
  const ENTITY_RESCHEDULE = 1;
  const ENTITY_SKIP = 2;
  const ENTITY_NO_COMBINE = 3;
  const ENTITY_MARK_PROTECTED = 4;

  // Action before/after create/save entity
  const ENTITY_SKIP_CREATE = 5;
  const ENTITY_SKIP_SAVE = 6;

  // Hash property name.
  const TEMP_HASH = '$_FI~HASH_$';

  // Fields imported.
  protected $fields = array();

  // Static fields.
  protected $staticFields = array();

  // Base entity.
  protected $baseEntity = array();

  // Fields info.
  protected $fieldsInfo = array();

  // Uniq path.
  protected $uniq;

  // Feed group.
  protected $group;

  // Feed machine name.
  protected $machineName;

  // Flag to skip imported items.
  protected $skipImportedItems = FALSE;

  // Flag to only apply updates.
  protected $updatesOnly = FALSE;

  // Entity info.
  protected $entityInfo;

  // Current reader.
  protected $reader;

  // Current hash manager.
  protected $hashes;

  // Filters
  protected $filters;

  // After how many of entities to save.
  protected $itemsCount = 0;

  // After how many items to reset entity static cache.
  protected $resetStaticCache = 0;

  // Number entities in static cache.
  protected $staticCacheEntities = 0;

  // Callbacks for before/after combine entities
  protected $beforeCombine = FALSE;
  protected $afterCombine = FALSE;

  // Callbacks for before/after create/save entities
  protected $beforeCreate = FALSE;
  protected $beforeSave = FALSE;
  protected $afterSave = FALSE;

  // List of missing entities (as hash ids)
  protected $orphanHashes = array();

  // A null value.
  protected $NULL = NULL;

  // Flag indicating to skip creating dynamic functions if already exists.
  protected $skipDefFunChk = FALSE;

  // This will break import if a filter function is missing.
  protected $breakOnUndefinedFilter = TRUE;

  // Reports.
  protected $report = array(
    'total' => 0,
    'updated' => 0,
    'new' => 0,
    'rescheduled' => 0,
    'skipped' => 0,
    'protected' => 0,
    'protected_skipped' => 0,
    'missing' => 0,
    'started' => 0,
    'finished' => 0,
    'errors' => array(),
  );

  // Current entities in work.
  public $current;

  // Current prefiltered values.
  public $prefilteredValue;

  // Callback to alter uniq value
  protected $uniqAlter = FALSE;

  /**
   * Constructor.
   */
  public function __construct($machine_name) {
    $this->machineName = $machine_name;
  }

  // Error handler status.
  protected $errHandlerEnabled = FALSE;

  /**
   * Enables/disables error handler.
   *
   * @param bool $enable
   *    Whatever to enable or disable the error handler
   */
  public function setErrorHandler($enable) {
    if ($enable) {
      if (!$this->errHandlerEnabled) {

        // Set error handler.
        $this->errHandlerEnabled = TRUE;
        set_error_handler(array(
          $this,
          'errorHandler',
        ));
      }
    }
    elseif ($this->errHandlerEnabled) {

      // Resore eror handler.
      $this->errHandlerEnabled = FALSE;
      restore_error_handler();
    }
  }

  // Max number of errors to report.
  protected $errorsLeft = 100;
  protected $throwException = TRUE;

  /**
   * Error handler callback
   * This is setted with set_error_handling()
   */
  public function errorHandler($errno, $errmsg, $file, $line) {

    // Handle silenced errors with @.
    if (error_reporting() == 0) {
      return FALSE;
    }

    // Add error to reports.
    if ($this->errorsLeft) {
      $this->report['errors'][] = array(
        'error' => $errmsg,
        'error number' => $errno,
        'line' => $line,
        'file' => $file,
      );
      $this->errorsLeft--;
    }

    // Throw an exception to be caught by a try-catch statement.
    if ($this->throwException) {
      throw new \Exception("(Uncaught Feed Import Exception) [{$errmsg}; file: {$file}; line: {$line}]", $errno);
    }
  }

  /**
   * Gets an array with errors.
   *
   * @return array
   *    An array of errors
   */
  public function getErrors() {
    return $this->report['errors'];
  }

  /**
   * Adds an error to report
   *
   * @param string $errmsg
   *    Error message
   * @param int $errno
   *    Error number
   * @param int $line
   *    Error line
   * @param string $file
   *    Error file
   */
  public function addError($errmsg, $errno = 0, $line = NULL, $file = NULL) {
    $this->report['errors'][] = array(
      'error' => $errmsg,
      'error number' => $errno,
      'line' => $line,
      'file' => $file,
    );
  }

  /**
   * {@inheritdoc}
   */
  public function setOptions(array $options, $overwrite = FALSE) {
    if (isset($options['items_count']) && $options['items_count'] >= 0) {
      $this->itemsCount = (int) $options['items_count'];
    }
    if (isset($options['skip_imported'])) {
      $this->skipImportedItems = (bool) $options['skip_imported'];
    }
    if (isset($options['max_reported_errors'])) {
      $this->errorsLeft = (int) $options['max_reported_errors'];
    }
    if (isset($options['throw_exception'])) {
      $this->throwException = (bool) $options['throw_exception'];
    }
    if (isset($options['reset_cache']) && $options['reset_cache'] >= 0) {
      $this->resetStaticCache = (int) $options['reset_cache'];
    }
    if (isset($options['skip_defined_functions_check'])) {
      $this->skipDefFunChk = (bool) $options['skip_defined_functions_check'];
    }
    if (isset($options['break_on_undefined_filter'])) {
      $this->breakOnUndefinedFilter = (bool) $options['break_on_undefined_filter'];
    }
    if (isset($options['updates_only'])) {
      $this->updatesOnly = (bool) $options['updates_only'];
    }
    $cbks = array(
      'before_combine' => &$this->beforeCombine,
      'after_combine' => &$this->afterCombine,
      'before_create' => &$this->beforeCreate,
      'before_save' => &$this->beforeSave,
      'after_save' => &$this->afterSave,
      'uniq_callback' => &$this->uniqAlter,
    );
    foreach ($cbks as $opt => &$val) {
      if (!empty($options[$opt])) {
        $val = $options[$opt];
      }
    }
    return TRUE;
  }

  /**
   * Sets the entity info object.
   */
  public function setEntityInfo($entity_info) {
    if (!$entity_info) {
      $this
        ->addError('No entity info provided');
      return FALSE;
    }
    $this->entityInfo = $entity_info;
    return TRUE;
  }

  /**
   * Sets the filters object.
   *
   * @param FeedImportMultiFilter $filter
   *    An instance of FeedImportMultiFilter
   * @param array $functions
   *    An array of functions to create
   * @return bool
   *    TRUE on success
   */
  public function setFilter(FeedImportMultiFilter $filter, $functions = array()) {
    $this->filters = $filter;
    if ($functions) {
      foreach ($functions as &$f) {
        if ($this->skipDefFunChk && function_exists($f['name'])) {
          continue;
        }
        $msg = $this->filters
          ->createFunction($f['name'], $f['args'], $f['body']);
        if ($msg !== TRUE) {
          $this
            ->addError($msg);
          return FALSE;
        }
      }
    }
    return TRUE;
  }

  /**
   * Sets the reader and inits it
   *
   * @param FeedImportReader $reader
   *     An instance of reader
   *
   * @return boolean
   *     TRUE if success
   */
  public function setReader(FeedImportReader $reader) {
    $this->reader = $reader;
    $this->throwException = FALSE;
    $ok = $this->reader
      ->init();
    $this->throwException = TRUE;
    return $ok;
  }

  /**
   * Sets uniq path for monitoring items.
   *
   * @param string $uniq
   *     A path.
   *
   * @return boolean
   *     TRUE if success
   */
  public function setUniq($uniq) {
    if ($uniq == NULL) {
      return TRUE;
    }
    if ($this->reader) {
      $this->uniq = $this->reader
        ->formatPath($uniq);
      if (is_scalar($this->uniq)) {
        if ((string) $this->uniq === '') {
          $this->uniq = NULL;
        }
      }
      elseif (empty($this->uniq)) {
        $this->uniq = NULL;
      }
      return TRUE;
    }
    $this
      ->addError('You must have a reader to add unique path');
    return FALSE;
  }

  /**
   * Sets the hash manager
   *
   * @param FeedImportHashManager $hm
   *     An instance of FeedImportHashManager
   */
  public function setHashManager(FeedImportHashManager $hm) {
    $this->hashes = $hm;
    return TRUE;
  }

  /**
   * Sets entity fields.
   *
   * @param array $fields
   *     List of dynamic fields
   * @param array $static_fields
   *     List of static fields
   * @param array $compare_functions
   *     List of compare functions
   * @param string $default_compare
   *     Default compare function
   *
   * @return boolean
   *     TRUE if fields were set
   */
  public function setFields(array $fields = array(), array $static_fields = array(), array $compare_functions = array(), array $merge_classes = array(), $default_compare = '_feed_import_base_compare_other_fields') {

    // Check for requirements.
    if (!$this->entityInfo || !$this->reader || !$this->filters || !empty($this->baseEntity)) {
      $this
        ->addError('Could not set fields because there are missing dependencies or fields were already added');
      return FALSE;
    }

    // Create default merge
    if (!isset($merge_classes[static::UPDATE_COMBINE])) {
      $this
        ->addError(sprintf('Merge field default class found!'));
      return FALSE;
    }
    $class = $merge_classes[static::UPDATE_COMBINE];
    if (!class_exists($class)) {
      $this
        ->addError(sprintf('Merge field default class %s was not found!', $class));
      return FALSE;
    }
    elseif (!is_subclass_of($class, 'FeedImportMergeField')) {
      $this
        ->addError(sprintf('Merge field default class %s must extend % class!', $class, 'FeedImportMergeField'));
      return FALSE;
    }
    $merge_classes[static::UPDATE_COMBINE] = new $class();

    // Create format paths callback.
    $fp = array(
      $this->reader,
      'formatPath',
    );

    // Get language key.
    $lk = $this->entityInfo->langKey;

    // Set static fields.
    $this->staticFields =& $static_fields;

    // Parse all fields.
    foreach ($fields as &$f) {

      // Set field info.
      $this->fieldsInfo[$f['field']] =& $f;
      if (!isset($merge_classes[$f['update_mode']])) {
        $this
          ->addError(sprintf('No field merge class found for field %s!', $f['field']));
        return FALSE;
      }
      if (is_string($merge_classes[$f['update_mode']])) {
        $class = $merge_classes[$f['update_mode']];
        if (!class_exists($class)) {
          $this
            ->addError(sprintf('Merge field class %s for field %s was not found!', $class, $f['field']));
          return FALSE;
        }
        elseif (!is_subclass_of($class, 'FeedImportMergeField')) {
          $this
            ->addError(sprintf('Merge field class %s for field %s must extend % class!', $class, $f['field'], 'FeedImportMergeField'));
          return FALSE;
        }
        $merge_classes[$f['update_mode']] = new $class();
      }
      $f['update_mode'] = $merge_classes[$f['update_mode']];
      $f['overwrite'] = $f['update_mode']
        ->overwriteEmpty();

      // Add cardinality
      $f['cardinality'] = isset($this->entityInfo->fields[$f['field']]['cardinality']) ? $this->entityInfo->fields[$f['field']]['cardinality'] : 1;

      // Add compare function
      if ($f['column']) {
        $f['compare'] = $this->entityInfo->fields[$f['field']]['module'] . ':' . $this->entityInfo->fields[$f['field']]['type'];
        if (isset($compare_functions[$f['compare']])) {
          $f['compare'] = $compare_functions[$f['compare']];
        }
        else {
          $f['compare'] = $default_compare;
        }
      }

      // Check if column must be guessed.
      if ($f['column'] === TRUE && isset($this->entityInfo->fields[$f['field']])) {
        $f['column'] = $this->entityInfo->fields[$f['field']]['column'];
      }

      // Get paths count
      $f['paths_count'] = count($f['paths']);

      // Check if can be used as static field to improve performance
      if (!$f['paths_count'] && $f['default_action'] == 'default_value') {
        if ($f['column']) {

          // Its a field, add it to static fields.
          $this->staticFields[$f['field']][$f['column']] = $f['default_value'];
        }
        else {

          // Its a property.
          $this->staticFields[$f['field']] = $f['default_value'];
        }

        // Ignore filters.
        $f['prefilters'] = $f['filters'] = FALSE;

        // Next field.
        continue;
      }

      // Check (pre)filters
      foreach (array(
        'prefilters',
        'filters',
      ) as $fgroup) {
        if ($f[$fgroup]) {
          $ok = TRUE;

          // Add filters
          foreach ($f[$fgroup] as &$v) {
            if (!$this->filters
              ->add($fgroup, $f['field'], $v['function'], $v['params'])) {
              $ok = FALSE;
              $this
                ->addError(sprintf('Could not add function %s in %s for %s field because is not defined', $v['function'], $fgroup, $f['field']));
              if ($this->breakOnUndefinedFilter) {
                $this
                  ->addError('Import stopped because a filter function is missing!');
                return FALSE;
              }
            }
          }
          $f[$fgroup] = $ok;
        }
        else {

          // No filters
          $f[$fgroup] = FALSE;
        }
      }

      // Format paths
      $f['paths'] = array_map($fp, $f['paths']);

      // Add to fields
      $this->fields[$f['field']] =& $f;
    }

    // Remove reference to $f.
    unset($f);

    // Set all static fields properties.
    // In the end just fields remain in static fields (no properties).
    foreach ($this->staticFields as $f => &$value) {
      if (is_scalar($value)) {
        if (!isset($this->fieldsInfo[$f])) {
          $this->fieldsInfo[$f] = array(
            'field' => $f,
            'column' => FALSE,
            'update_mode' => $merge_classes[static::UPDATE_COMBINE],
            'overwrite' => $merge_classes[static::UPDATE_COMBINE]
              ->overwriteEmpty(),
            'cardinality' => 1,
          );
        }
        $this->baseEntity[$f] = $value;
        unset($this->staticFields[$f]);
      }
      elseif (!isset($this->fieldsInfo[$f])) {
        $this->fieldsInfo[$f] = array(
          'field' => $f,
          'column' => isset($this->entityInfo->fields[$f]) ? $this->entityInfo->fields[$f]['column'] : 'value',
          'update_mode' => $merge_classes[static::UPDATE_COMBINE],
          'overwrite' => $merge_classes[static::UPDATE_COMBINE]
            ->overwriteEmpty(),
          'cardinality' => isset($this->entityInfo->fields[$f]['cardinality']) ? $this->entityInfo->fields[$f]['cardinality'] : 1,
        );
      }
    }

    // There isn't lang in base entity nor in fields so use default.
    if (!isset($this->baseEntity[$lk]) && !isset($this->fields[$lk])) {
      $this->baseEntity[$lk] = \Drupal\Core\Language\Language::LANGCODE_NOT_SPECIFIED;
    }

    // Check for language in base entity.
    if (isset($this->baseEntity[$lk])) {

      // We have lang so set static fields in base entity.
      $lang = $this->baseEntity[$lk];
      foreach ($this->staticFields as $f => &$value) {
        if (!isset($this->fields[$f])) {

          // This field is not dynamically generated so use it in base entity.
          $this->baseEntity[$f][$lang][] = $value;

          // Remove it from static fields because now is part of base entity.
          unset($this->staticFields[$f]);
        }
      }
    }
    else {

      // No language, it means language is dynamically created.
      // Move language on top to be available for other fields.
      $this->fields = array_merge(array(
        $lk => NULL,
      ), $this->fields);

      // However, set it to NONE, and if used will be overwritten.
      $this->baseEntity[$lk] = \Drupal\Core\Language\Language::LANGCODE_NOT_SPECIFIED;
    }
    return TRUE;
  }

  /**
   * Sets custom settings, after base settings were set.
   * Can be used by child classes.
   *
   * @param array $settings
   *    An array of settings.
   *
   * @return bool
   *    TRUE if success. If not, the import will be stopped.
   */
  public function setCustomSettings(array $settings) {
    return TRUE;
  }

  /**
   * Checks if a variable has content
   *
   * @param mixed &$var
   *     Variable to check
   *
   * @return bool
   *     TRUE if there is content FALSE otherwise
   */
  protected function hasContent(&$var) {
    if (is_scalar($var)) {
      return (string) $var !== '';
    }
    return !empty($var);
  }

  /**
   * Attaches a field value to entity.
   *
   * @param array &$entity
   *     Entity where to attach field.
   * @param array &$field
   *     Field settings.
   * @param mixed &$value
   *     Field value.
   */
  protected function attachField(array &$entity, &$field, &$value) {
    if ($field['column']) {

      // It is a field.
      $fn =& $field['field'];
      $fc =& $field['column'];
      $lang =& $entity[$this->entityInfo->langKey];

      // Check if field exists.
      if (!isset($entity[$fn][$lang])) {
        $entity[$fn][$lang] = array();
      }

      // Get field reference.
      $ef =& $entity[$fn][$lang];

      // Check for static fields.
      $st = FALSE;
      if (!empty($this->staticFields[$fn])) {
        $st =& $this->staticFields[$fn];
      }

      // Make it look multivalued.
      if (!is_array($value)) {
        $value = array(
          $value,
        );
      }
      elseif ($field['cardinality'] != -1 && ($cnt = count($value)) > $field['cardinality']) {

        // Remove extra values.
        array_splice($value, $field['cardinality'], $cnt - $field['cardinality'], NULL);
      }

      // Set field properties.
      if ($st) {

        // There are static field props.
        $fa = array(
          $fc => NULL,
        );
        $fv =& $fa[$fc];
        foreach ($value as &$v) {
          if (is_object($v)) {
            $ef[] = (array) $v + $st;
          }
          else {
            $fv = $v;
            $ef[] = $fa + $st;
          }
        }
      }
      else {

        // No static field props.
        foreach ($value as &$v) {
          if (is_object($v)) {
            $ef[] = (array) $v;
          }
          else {
            $ef[][$fc] = $v;
          }
        }
      }
    }
    else {

      // If this is a property then get only first value.
      if (is_array($value) || is_object($value)) {

        // This still can be array but if so then problem is elsewhere.
        $value = reset($value);
      }

      // Set property.
      $entity[$field['field']] = $value;
    }
  }

  /**
   * Creates a new entity.
   *
   * @param mixed $item
   *     Item to be mapped to entity
   *
   * @return object
   *     An array containing the entity or NULL
   */
  protected function &createEntity(&$item) {

    // Create the entity.
    $entity = $this->baseEntity;

    // Set a ref for current entity.
    $this->current =& $entity;

    // Check for unique id of item.
    if ($this->uniq === NULL) {

      // No hash, so not monitored.
      $entity[static::TEMP_HASH] = NULL;
    }
    else {

      // Get uniq.
      $uniq = $this->reader
        ->map($item, $this->uniq);
      if (is_array($uniq)) {
        $uniq = isset($uniq[0]) ? $uniq[0] : reset($uniq);
      }

      // Check if uniq alter callback exists.
      if ($this->uniqAlter) {
        $uniq = call_user_func($this->uniqAlter, $uniq);
      }

      // Create hash.
      $entity[static::TEMP_HASH] = $this->hashes
        ->hash($uniq);
    }

    // Set entity fields.
    foreach ($this->fields as $name => &$field) {

      // Get filtered field value.
      $val = NULL;
      $i = -1;
      while (++$i < $field['paths_count']) {

        // Get value for specified path.
        $val = $this->reader
          ->map($item, $field['paths'][$i]);

        // Check if passes prefilter.
        if ($field['prefilters']) {
          $this->prefilteredValue = $this->filters
            ->apply("prefilters", $name, $val);
          if (!$this
            ->hasContent($this->prefilteredValue)) {
            $val = $this->prefilteredValue = NULL;

            // If item doesn't pass prefilter than go to next path.
            continue;
          }
        }
        if ($this
          ->hasContent($val)) {

          // We have content, check for filters.
          if ($field['filters']) {
            $val = $this->filters
              ->apply("filters", $name, $val);
            if (!$this
              ->hasContent($val)) {
              $val = NULL;
            }
          }

          // We matched one path.
          break;
        }
        else {
          $val = NULL;
        }
      }

      // Check if default action is needed.
      if ($val === NULL) {
        switch ($field['default_action']) {

          // Provide default value.
          // This is also default action.
          case static::ACTION_DEFAULT_VALUE:
          default:
            $val = $field['default_value'];
            break;

          // Provide filtered default value.
          case static::ACTION_DEFAULT_FILTERED_VALUE:
            $val = $this->filters
              ->apply("filters", $name, $field['default_value']);
            break;

          // Skip this item by returning NULL.
          case static::ACTION_SKIP_ITEM:
            $this->report['skipped']++;
            $this->current = $this->prefilteredValue = NULL;
            return $this->NULL;

          // Don't add this field to entity.
          case static::ACTION_IGNORE_FIELD:
            $this->prefilteredValue = NULL;
            continue 2;
        }
      }

      // Remove prefiltered value.
      $this->prefilteredValue = NULL;

      // Set field value in entity.
      $this
        ->attachField($entity, $field, $val);

      // Not needed anymore.
      unset($val);
    }

    // Remove current reference.
    unset($this->current);
    $this->current = NULL;

    // Return the entity array.
    return $entity;
  }

  /**
   * Combines two entities.
   *
   * @param array $new
   *     Created entity
   * @param object $curr
   *     Current entity
   *
   * @return boolean
   *     TRUE if current entity was changed
   */
  protected function combineEntities(array &$new, $curr) {

    // Initial entity status.
    $changed = FALSE;

    // Get entity language
    $lang =& $new[$this->entityInfo->langKey];

    // Compare fields from entities.
    foreach ($this->fieldsInfo as &$field) {

      // Get field name and column.
      $fn =& $field['field'];
      $fc =& $field['column'];
      if ($fc) {

        // It is a field
        if (isset($new[$fn][$lang])) {
          if (isset($curr->{$fn}[$lang])) {
            if ($field['update_mode']
              ->merge($curr->{$fn}[$lang], $new[$fn][$lang], $field)) {

              // Mark as changed.
              $changed = TRUE;
            }
          }
          else {

            // Overwrite.
            $curr->{$fn}[$lang] = $new[$fn][$lang];

            // Mark as changed.
            $changed = TRUE;
          }
        }
        elseif ($field['overwrite'] && isset($curr->{$fn}[$lang])) {

          // Forced update mode.
          unset($curr->{$fn}[$lang]);

          // Don't leave an empty property.
          if (empty($curr->{$fn})) {
            unset($curr->{$fn});
          }

          // Mark as changed.
          $changed = TRUE;
        }
      }
      else {

        // It is a property.
        if (isset($new[$fn])) {
          if (isset($curr->{$fn}) && $curr->{$fn} == $new[$fn]) {

            // Nothing changed.
            continue;
          }

          // Set the new property value.
          $curr->{$fn} = $new[$fn];

          // Mark as changed.
          $changed = TRUE;

          // Remove from memory.
          unset($new[$fn]);
        }
        elseif ($field['overwrite'] && isset($curr->{$fn})) {

          // Forced update mode is selected.
          unset($curr->{$fn});

          // Mark as changed.
          $changed = TRUE;
        }
      }
    }
    return $changed;
  }

  /**
   * Saves created entities.
   *
   * @param array &$entities
   *   An array of entities to be saved
   */
  protected function saveEntities(array &$entities) {
    static $loadArr = array(
      NULL,
    );

    // Number of entities.
    $total = count($entities);
    $this->report['total'] += $total;

    // Get ids from hashes.
    $ids = $this->hashes
      ->get();

    // Entity create callback.
    $createCallback =& $this->entityInfo->createCallback;

    // Entity save callback.
    $saveCallback =& $this->entityInfo->saveCallback;

    // Entity before combine callback
    $beforeCombine =& $this->beforeCombine;

    // Entity after combine callback
    $afterCombine =& $this->afterCombine;

    // Entity before create callback
    $beforeCreate =& $this->beforeCreate;

    // Entity before save callback
    $beforeSave =& $this->beforeSave;

    // Entity after save callback
    $afterSave =& $this->afterSave;

    // Entity load id.
    $loadId =& $loadArr[0];
    $i = -1;

    // Save each entity.
    while (++$i < $total) {
      $entity =& $entities[$i];
      unset($entities[$i]);
      if ($entity == NULL) {
        continue;
      }

      // Save hash.
      $hash =& $entity[static::TEMP_HASH];

      // Not needed anymore.
      unset($entity[static::TEMP_HASH]);

      // Check if item is already imported or is not monitored.
      if ($hash !== NULL && isset($ids[$hash])) {

        // Check if is used option to skip item if already imported
        // or entity is protected (import cannot make changes).
        if ($this->skipImportedItems || $ids[$hash]->expire == FeedImportHashManager::MARK_PROTECTED) {
          $this->report[$this->skipImportedItems ? 'skipped' : 'protected_skipped']++;
          unset($ids[$hash]);
          continue;
        }

        // Current entity id.
        $loadId = $ids[$hash]->entity_id;

        // Load current entity.
        $current = \Drupal::entityManager()
          ->getStorage($this->entityInfo->name);

        // If entity is missing then skip.
        if (!isset($current[$loadId])) {
          $this->report['missing']++;
          $this->orphanHashes[] = $ids[$hash]->id;
          unset($ids[$hash], $current);
          continue;
        }

        // Get current entity value.
        $current =& $current[$loadId];

        // Entity might be in static cache.
        $this->staticCacheEntities++;

        // Merge status of entity.
        $changed = FALSE;

        // Check for before combine callback.
        if ($beforeCombine) {
          switch ($beforeCombine($entity, $current, $changed)) {
            case static::ENTITY_RESCHEDULE:
              $this->report['rescheduled']++;
              goto ADD_HASH_UPDATE;
            case static::ENTITY_SKIP:
              $this->report['skipped']++;
              goto UNSET_VARS;
            case static::ENTITY_NO_COMBINE:
              goto COMBINE_FINISHED;
            case static::ENTITY_MARK_PROTECTED:
              $this->report['protected']++;
              $this->hashes
                ->protect($ids[$hash]->id);
              goto UNSET_VARS;
          }
        }

        // Check for diff.
        $changed = $this
          ->combineEntities($entity, $current);

        // Check for after combine callback.
        if ($afterCombine) {
          switch ($afterCombine($entity, $current, $changed)) {
            case static::ENTITY_RESCHEDULE:
              $this->report['rescheduled']++;
              goto ADD_HASH_UPDATE;
            case static::ENTITY_SKIP:
              $this->report['skipped']++;
              goto UNSET_VARS;
            case static::ENTITY_MARK_PROTECTED:
              $this->report['protected']++;
              $this->hashes
                ->protect($ids[$hash]->id);
              goto UNSET_VARS;
          }
        }
        COMBINE_FINISHED:

        // Not needed anymore.
        unset($entity);

        // Check if entity is changed and save changes.
        if ($changed) {

          // Save entity.
          if ($saveCallback) {
            $saveCallback($current);
          }
          elseif (method_exists($current, 'save')) {
            if (!$current
              ->save()) {
              unset($current);
              continue;
            }
          }
          elseif ($this->entityInfo->controller->canSave) {
            if (!$this->entityInfo->controller
              ->save($current)) {
              unset($current);
              continue;
            }
          }
          else {

            // Don't know how to save entity...
            unset($current);
            continue;
          }

          // Set report about updated items.
          $this->report['updated']++;

          // Check after save callback
          $afterSave && $afterSave($current, FALSE);
        }
        else {

          // Set report about rescheduled items.
          $this->report['rescheduled']++;
        }
        ADD_HASH_UPDATE:

        // Add to update ids.
        $this->hashes
          ->update($ids[$hash]->id);
        UNSET_VARS:

        // Free some memory.
        unset($ids[$hash], $current, $entity, $hash);
      }
      else {

        // Check if this imports accepts only updates.
        if ($this->updatesOnly) {
          $this->report['skipped']++;
          goto UNSET_E_VARS;
        }

        // Check before save callback
        if ($beforeCreate) {
          switch ($beforeCreate($entity)) {
            case static::ENTITY_SKIP:
              $this->report['skipped']++;
              goto UNSET_E_VARS;
            case static::ENTITY_SKIP_CREATE:
              goto ENTITY_SAVE;
            case static::ENTITY_SKIP_SAVE:
              goto ENTITY_SAVED;
          }
        }

        // Create a new entity.
        if ($createCallback) {
          $entity = $createCallback($entity, $this->entityInfo->name);
        }
        elseif ($this->entityInfo->controller->canCreate) {
          $entity = $this->entityInfo->controller
            ->create($entity);
        }
        else {
          $entity = (object) $entity;
        }
        ENTITY_SAVE:

        // Check before save callback
        if ($beforeSave) {
          switch ($beforeSave($entity)) {
            case static::ENTITY_SKIP:
              $this->report['skipped']++;
              goto UNSET_E_VARS;
            case static::ENTITY_SKIP_SAVE:
              goto ENTITY_SAVED;
          }
        }

        // No entity to save.
        if (!$entity) {
          unset($entity);
          continue;
        }
        elseif ($saveCallback) {
          $saveCallback($entity);
        }
        elseif (method_exists($entity, 'save')) {
          if (!$entity
            ->save()) {
            unset($entity);
            continue;
          }
        }
        elseif ($this->entityInfo->controller->canSave) {
          if (!$this->entityInfo->controller
            ->save($entity)) {
            unset($entity);
            continue;
          }
        }
        else {

          // Don't know how to save entity...
          unset($entity);
          continue;
        }
        ENTITY_SAVED:

        // Entity might be added in static cache.
        $this->staticCacheEntities++;

        // Check after save callback
        $afterSave && $afterSave($entity, TRUE);

        // Check if is monitored.
        if ($hash !== NULL) {

          // Insert into feed import hash table.
          $this->hashes
            ->insert($entity->{$this->entityInfo->idKey}, $hash);
        }

        // Set report about new items.
        $this->report['new']++;
        UNSET_E_VARS:

        // Not needed anymore.
        unset($entity, $hash);
      }

      // Remove entity cache.
      if ($this->staticCacheEntities == $this->resetStaticCache) {
        $this->entityInfo->controller->canResetCache && $this->entityInfo->controller
          ->resetCache();
        $this->staticCacheEntities = 0;
      }
    }
  }

  /**
   * Processes the import.
   */
  public function process() {

    // Give import time (for large imports).
    // Well, if safe mode is on this cannot be done so it may break import.
    if (!ini_get('safe_mode')) {
      set_time_limit(0);
    }

    // Reset report.
    $this->report['started'] = time();
    $entities = array();
    $reader = $this->reader;
    if ($this->itemsCount) {
      $current = 0;

      // Get next item from reader.
      while ($item = $reader
        ->get()) {

        // Create the entity.
        $entities[] =& $this
          ->createEntity($item);

        // Check for save.
        if (++$current == $this->itemsCount) {
          unset($item);
          $this
            ->saveEntities($entities);
          $current = 0;
          $entities = array();
        }
      }
    }
    else {

      // Get next item from reader.
      while ($item = $reader
        ->get()) {

        // Create entity.
        $entities[] =& $this
          ->createEntity($item);
      }
    }

    // Check for left over entities.
    if ($entities) {
      $this
        ->saveEntities($entities);
    }

    // Not needed anymore.
    unset($entities, $reader);

    // Reset the static cache.
    if ($this->staticCacheEntities && $this->resetStaticCache) {
      $this->entityInfo->controller->canResetCache && $this->entityInfo->controller
        ->resetCache();
      $this->staticCacheEntities = 0;
    }

    // Commit left over hash updates.
    $this->hashes
      ->updateCommit();

    // Also the new hash inserts.
    $this->hashes
      ->insertCommit();

    // Remove orphan hashes.
    if ($this->orphanHashes) {
      $this->hashes
        ->delete($this->orphanHashes);
      $this->orphanHashes = array();
    }
    $this->report['finished'] = time();
    $report = $this->report;

    // Reset report
    $this->report = array(
      'total' => 0,
      'updated' => 0,
      'new' => 0,
      'rescheduled' => 0,
      'skipped' => 0,
      'protected' => 0,
      'protected_skipped' => 0,
      'missing' => 0,
      'started' => 0,
      'finished' => 0,
      'errors' => array(),
    );
    return $report;
  }

}

Classes

Namesort descending Description
FeedImportProcessor Class that processess the import.