You are here

feed_import.inc in Feed Import 7.3

This file contains Feed Import helpers.

File

feed_import_base/inc/feed_import.inc
View source
<?php

/**
 * @file
 * This file contains Feed Import helpers.
 */

/**
 * This class provides helper functions for feed import.
 */
class FeedImport {

  // Constants
  const FEED_OK = 1;
  const FEED_SOURCE_ERR = 2;
  const FEED_ITEMS_ERR = 3;
  const FEED_OVERLAP_ERR = 4;
  const FEED_CONFIG_ERR = 5;

  // Entity info cache.
  protected static $entityInfo = array();

  // Default compare function.

  /**
   * Gets info about an entity.
   * This info is cached.
   *
   * @param string $entity_name The entity name
   *
   * @return object An object that describes entity
   */
  public static function getEntityInfo($entity_name) {
    if (empty($entity_name)) {
      return FALSE;
    }
    elseif (isset(static::$entityInfo[$entity_name])) {
      return static::$entityInfo[$entity_name];
    }
    if (!($entity = entity_get_info($entity_name))) {
      return FALSE;
    }

    // Set main entity info.
    $info = (object) array(
      'name' => $entity_name,
      'idKey' => isset($entity['entity keys']['id']) ? $entity['entity keys']['id'] : NULL,
      'langKey' => isset($entity['entity keys']['language']) ? $entity['entity keys']['language'] : 'language',
      'bundleKey' => isset($entity['entity keys']['bundle']) ? $entity['entity keys']['bundle'] : NULL,
      'createCallback' => isset($entity['creation callback']) ? $entity['creation callback'] : NULL,
      'saveCallback' => isset($entity['save callback']) ? $entity['save callback'] : NULL,
      'deleteCallback' => isset($entity['deletion callback']) ? $entity['deletion callback'] : NULL,
      'controller' => entity_get_controller($entity_name),
      'properties' => array(),
      'fields' => array(),
    );
    if ($info->controller) {
      $info->controller->canCreate = method_exists($info->controller, 'create');
      $info->controller->canSave = method_exists($info->controller, 'save');
      $info->controller->canDelete = method_exists($info->controller, 'delete');
      $info->controller->canResetCache = method_exists($info->controller, 'resetCache');
    }
    else {
      $info->controller = (object) array(
        'canCreate' => FALSE,
        'canSave' => FALSE,
        'canDelete' => FALSE,
        'canResetCache' => FALSE,
      );
    }
    if (!$info->saveCallback && function_exists($entity_name . '_save')) {
      $info->saveCallback = $entity_name . '_save';
    }
    if (!$info->deleteCallback && function_exists($entity_name . '_delete')) {
      $info->deleteCallback = $entity_name . '_delete';
    }

    // Get fields info.
    if ($fieldlist = field_info_instances($entity_name)) {
      foreach ($fieldlist as &$fields) {
        foreach ($fields as &$field) {
          if (!empty($field['deleted']) || isset($info->fields[$field['field_name']])) {
            continue;
          }
          $field_info = field_info_field($field['field_name']);
          $info->fields[$field['field_name']] = array(
            'name' => $field['field_name'],
            'column' => key($field_info['columns']),
            'columns' => array_keys($field_info['columns']),
            'cardinality' => $field_info['cardinality'],
            'type' => $field_info['type'],
            'module' => $field_info['module'],
          );
        }
      }
    }
    if (function_exists('entity_get_property_info')) {

      // Get properties info.
      $prop_info = entity_get_property_info($entity_name);
      if (isset($prop_info['properties'])) {
        foreach ($prop_info['properties'] as $pname => $pval) {
          if (isset($pval['schema field'])) {
            $pval = $pval['schema field'];
          }
          else {
            $pval = $pname;
          }
          if (!isset($info->fields[$pval])) {
            $info->properties[] = $pval;
          }
        }
        $info->properties = array_unique($info->properties);
      }
    }
    elseif (isset($entity['schema_fields_sql']['base table'])) {
      $info->properties = array_diff($entity['schema_fields_sql']['base table'], array_keys($info->fields));
    }
    return static::$entityInfo[$entity_name] = $info;
  }

  /**
   * Cache used for entity names.
   */
  protected static $entityNames = array();

  /**
   * Get all entity names.
   *
   * @return array
   *    List of all entity names.
   */
  public static function getAllEntities() {
    if (!static::$entityNames) {
      foreach (entity_get_info() as $entity => $info) {
        static::$entityNames[$entity] = $info['label'];
      }
    }
    return static::$entityNames;
  }

  // Default field compare function.
  public static $defaultFieldCompareFunction = '_feed_import_base_compare_other_fields';

  /**
   * Sets processor settings.
   *
   * @param FeedImportProcessor $fi
   *    The processor
   * @param object $feed
   *     Feed settings
   * @param string $filter_dir
   *     Path to extra filters dir
   *
   * @return FeedImportProcessor
   *     An instance of FeedImportProcessor or FALSE on error
   */
  public static function setProcessorSettings(FeedImportProcessor $fi, $feed, $filter_dir) {
    $s = $feed->settings;
    $ok = TRUE;
    $ok = $ok && $fi
      ->setEntityInfo(static::getEntityInfo($feed->entity));
    $ok = $ok && $fi
      ->setOptions($s['processor']['options']);
    if (!isset($s['functions'])) {
      $s['functions'] = NULL;
    }
    $ok = $ok && $fi
      ->setFilter(new $s['filter']['class']($filter_dir, $s['filter']['options']), $s['functions']);
    $ok = $ok && $fi
      ->setReader(new $s['reader']['class']($s['reader']['options']));
    $hm = new $s['hashes']['class']($feed->entity, $feed->machine_name);
    $hm
      ->setOptions($s['hashes']['options']);
    $ok = $ok && $fi
      ->setHashManager($hm);
    $ok = $ok && $fi
      ->setUniq($s['uniq_path']);
    $ok = $ok && $fi
      ->setFields($s['fields'], $s['static_fields'], static::getFieldsCompareFunctions(), static::getMergeFieldClasses(), static::$defaultFieldCompareFunction);
    $ok = $ok && $fi
      ->setCustomSettings(array_diff_key($s, array(
      'uniq_path' => 1,
      'processor' => 1,
      'reader' => 1,
      'hashes' => 1,
      'filter' => 1,
      'fields' => 1,
      'static_fields' => 1,
      'feed' => 1,
      'functions' => 1,
    )));
    return $ok ? TRUE : $fi
      ->getErrors();
  }

  /**
   * Returns field compare functions.
   */
  public static function getFieldsCompareFunctions() {
    $funcs = module_invoke_all('feed_import_field_compare_functions');
    foreach ($funcs as $type => &$f) {
      if (is_array($f)) {
        for ($i = 0, $max = count($f); $i < $max; $i++) {
          if (is_callable($f[$i])) {
            $f = $f[$i];
            continue 2;
          }
        }
        unset($funcs[$type]);
      }
      elseif (!is_callable($f)) {
        unset($funcs[$type]);
      }
    }
    return $funcs;
  }

  /**
   * Gets the classes used to merge field values.
   *
   * @return array
   *    An array of classes keyed by merge mode.
   */
  public static function getMergeFieldClasses() {
    $classes = module_invoke_all('feed_import_field_merge_classes');
    foreach ($classes as $key => &$class) {
      if (isset($class['class'])) {
        $class = $class['class'];
      }
      else {
        unset($classes[$key]);
      }
    }
    return $classes;
  }

  // A list of all feeds configuration.
  protected static $feedsList = array();

  /**
   * Get a list with all feeds.
   */
  public static function loadAllFeeds() {
    if (static::$feedsList) {
      return static::$feedsList;
    }
    $feeds = db_select('feed_import_settings', 'f')
      ->fields('f')
      ->orderBy('id', 'DESC')
      ->execute()
      ->fetchAll();
    $ret = array();
    foreach ($feeds as &$feed) {
      $feed->settings = unserialize($feed->settings);
      $ret[$feed->machine_name] = $feed;
    }
    return static::$feedsList = $ret;
  }

  /**
   * Loads a feed from database
   *
   * @param int|string $name
   *    Feed machine name or id
   *
   * @return object
   *    Feed object or FALSE
   */
  public static function loadFeed($name) {
    $feed = db_select('feed_import_settings', 'f')
      ->fields('f')
      ->condition((int) $name ? 'id' : 'machine_name', $name, '=')
      ->range(0, 1)
      ->execute()
      ->fetchObject();
    if (!$feed) {
      return FALSE;
    }
    $feed->settings = unserialize($feed->settings);
    return $feed;
  }

  /**
   * Deletes a feed.
   *
   * @param object $feed
   *    Feed info
   * @param bool $hashes
   *    Also remove hashes
   */
  public static function deleteFeed($feed, $hashes = TRUE) {
    db_delete('feed_import_settings')
      ->condition('machine_name', $feed->machine_name)
      ->execute();
    if ($hashes && isset($feed->settings['hashes']['class'])) {
      $class = $feed->settings['hashes']['class'];
      $class::deleteByFeed($feed->machine_name);
    }
  }

  /**
   * Gets a new empty feed configuration.
   *
   * @return array
   *     An empty feed configuration.
   */
  public static function getEmptyFeed() {
    return array(
      'name' => NULL,
      'machine_name' => NULL,
      'entity' => NULL,
      'cron_import' => 0,
      'last_run' => 0,
      'last_run_duration' => 0,
      'last_run_items' => 0,
      'settings' => array(
        'uniq_path' => NULL,
        'preprocess' => NULL,
        'feed' => array(
          'protect_on_invalid_source' => FALSE,
          'protect_on_fewer_items' => 0,
        ),
        'processor' => array(
          'name' => 'default',
          'class' => 'FeedImportProcessor',
          'options' => array(
            'items_count' => 0,
            'skip_imported' => FALSE,
            'reset_cache' => 100,
            'break_on_undefined_filter' => TRUE,
            'skip_defined_functions_check' => FALSE,
            'updates_only' => FALSE,
          ),
        ),
        'reader' => array(
          'name' => 'xml',
          'class' => 'SimpleXMLFIReader',
          'options' => array(),
        ),
        'hashes' => array(
          'name' => 'sql',
          'class' => 'FeedImportSQLHashes',
          'options' => array(
            'ttl' => 0,
            'insert_chunk' => 300,
            'update_chunk' => 300,
            'group' => '',
          ),
        ),
        'filter' => array(
          'name' => 'default',
          'class' => 'FeedImportMultiFilter',
          'options' => array(
            'param' => '[field]',
            'include' => NULL,
          ),
        ),
        'fields' => array(),
        'static_fields' => array(),
        'functions' => array(),
      ),
    );
  }

  /**
   * Saves a feed in database
   *
   * @param object $feed
   *    Feed info
   */
  public static function saveFeed($feed) {
    if (empty($feed->name) || empty($feed->machine_name) || empty($feed->entity) || empty($feed->settings)) {
      return FALSE;
    }
    if (!isset($feed->cron_import)) {
      $feed->cron_import = 0;
    }
    $fields = array(
      'name' => $feed->name,
      'machine_name' => $feed->machine_name,
      'entity' => $feed->entity,
      'cron_import' => (int) $feed->cron_import,
      'settings' => serialize($feed->settings),
    );
    if (isset($feed->id)) {
      db_update('feed_import_settings')
        ->fields($fields)
        ->condition('id', $feed->id)
        ->execute();
    }
    else {
      $fields += array(
        'last_run' => 0,
        'last_run_duration' => 0,
        'last_run_items' => 0,
      );
      db_insert('feed_import_settings')
        ->fields($fields)
        ->execute();
    }
    return TRUE;
  }

  /**
   * Saves feed import status
   */
  public static function saveFeedImportStatus($feed) {
    db_update('feed_import_settings')
      ->fields(array(
      'last_run' => (int) $feed->last_run,
      'last_run_duration' => (int) $feed->last_run_duration,
      'last_run_items' => (int) $feed->last_run_items,
    ))
      ->condition('machine_name', $feed->machine_name)
      ->execute();
  }

  // Active import
  public static $activeImport = NULL;

  /**
   * Imports a feed.
   *
   * @param object $feed
   *    Feed info
   * @param string $filters_dir
   *    Path to filters dir
   */
  public static function import($feed, $filters_dir) {
    if (!empty($feed->settings['preprocess']) && function_exists($feed->settings['preprocess'])) {

      // Preprocess feed before import.
      call_user_func($feed->settings['preprocess'], $feed);
    }
    $class = $feed->settings['processor']['class'];
    $fi = new $class($feed->machine_name);
    $fi
      ->setErrorHandler(TRUE);
    $f = static::setProcessorSettings($fi, $feed, $filters_dir);
    if ($f !== TRUE) {
      $fi
        ->setErrorHandler(FALSE);
      return array(
        'init_error' => TRUE,
        'errors' => $f,
      );
    }
    unset($f);
    $lastimport = static::$activeImport;
    static::$activeImport = $fi;
    $f = $fi
      ->process();
    static::$activeImport = $lastimport;
    $fi
      ->setErrorHandler(FALSE);
    return $f;
  }

  /**
   * Gets all hash manager classes used by feeds.
   *
   * @return array
   *    An array of classes
   */
  public static function getHashManagers() {
    static $hm = array();
    if (!$hm) {
      foreach (static::loadAllFeeds() as $feed) {
        if (!empty($feed->settings['hashes']['class'])) {
          $hm[] = $feed->settings['hashes']['class'];
        }
      }
      $hm = array_unique($hm);
    }
    return $hm;
  }

  /**
   * Delete all expired items.
   *
   * @param int $max
   *    Max number of entity ids to delete from
   *    a hash manager
   *
   * @return int
   *    Number of deleted entities
   */
  public static function deleteExpired($max = 0) {
    $deleted = 0;
    foreach (static::loadAllFeeds() as $feed) {
      if (empty($feed->settings['hashes']['class'])) {
        continue;
      }
      $class = $feed->settings['hashes']['class'];
      $items = $class::getExpired($feed->machine_name, $max);
      foreach ($items as $e => &$ids) {
        $entity = static::getEntityInfo($e);

        // Delete entities.
        if ($entity->deleteCallback) {
          $f = $entity->deleteCallback . '_multiple';
          if (function_exists($f)) {
            $f($ids);
          }
          else {
            array_map($entity->deleteCallback, $ids);
          }
        }
        else {
          $entity->controller
            ->delete($ids);
        }

        // Delete hashes.
        $class::delete(array_keys($ids));
        $deleted += count($ids);
        unset($items[$e], $entity);
      }
    }
    return $deleted;
  }

  // Deleted entities.
  protected static $deletedEntities = array();

  /**
   * Schedule a deleted entity to be removed from hashes.
   *
   * @param string $entity_type
   *    Entity type
   * @param int $id
   *    Entity id
   */
  public static function addDeletedEntity($entity_type, $id) {
    static $unregistered = TRUE;
    if ($unregistered) {
      $unregistered = FALSE;
      register_shutdown_function(array(
        __CLASS__,
        'removeEntityHashes',
      ));
    }
    static::$deletedEntities[$entity_type][] = $id;
  }

  /**
   * Removes entity hashes.
   */
  public static function removeEntityHashes($entities = NULL) {
    if ($entities == NULL) {
      $entities =& static::$deletedEntities;
    }
    foreach (static::getHashManagers() as $hm) {
      array_walk($entities, array(
        $hm,
        'deleteEntities',
      ));
    }
    $entities = NULL;
  }

}

/**
 * Class that processess the import.
 */
class FeedImportProcessor extends FeedImportConfigurable {

  // No duplicates
  const UPDATE_COMBINE = 0;

  // Duplicates
  const UPDATE_MERGE = 1;

  // Overwrite
  const UPDATE_OVERWRITE = 2;

  // Overwrite fast
  const UPDATE_OVERWRITE_FAST = 3;

  // No field value actions.
  const ACTION_DEFAULT_VALUE = 0;
  const ACTION_DEFAULT_FILTERED_VALUE = 1;
  const ACTION_IGNORE_FIELD = 2;
  const ACTION_SKIP_ITEM = 3;

  // Action before/after combine entities
  const ENTITY_CONTINUE = 0;
  const ENTITY_RESCHEDULE = 1;
  const ENTITY_SKIP = 2;
  const ENTITY_NO_COMBINE = 3;
  const ENTITY_MARK_PROTECTED = 4;

  // Action before/after create/save entity
  const ENTITY_SKIP_CREATE = 5;
  const ENTITY_SKIP_SAVE = 6;

  // Hash property name.
  const TEMP_HASH = '$_FI~HASH_$';

  // Fields imported.
  protected $fields = array();

  // Static fields.
  protected $staticFields = array();

  // Base entity.
  protected $baseEntity = array();

  // Fields info.
  protected $fieldsInfo = array();

  // Uniq path.
  protected $uniq;

  // Feed group.
  protected $group;

  // Feed machine name.
  protected $machineName;

  // Flag to skip imported items.
  protected $skipImportedItems = FALSE;

  // Flag to only apply updates.
  protected $updatesOnly = FALSE;

  // Entity info.
  protected $entityInfo;

  // Current reader.
  protected $reader;

  // Current hash manager.
  protected $hashes;

  // Filters
  protected $filters;

  // After how many of entities to save.
  protected $itemsCount = 0;

  // After how many items to reset entity static cache.
  protected $resetStaticCache = 0;

  // Number entities in static cache.
  protected $staticCacheEntities = 0;

  // Callbacks for before/after combine entities
  protected $beforeCombine = FALSE;
  protected $afterCombine = FALSE;

  // Callbacks for before/after create/save entities
  protected $beforeCreate = FALSE;
  protected $beforeSave = FALSE;
  protected $afterSave = FALSE;

  // List of missing entities (as hash ids)
  protected $orphanHashes = array();

  // A null value.
  protected $NULL = NULL;

  // Flag indicating to skip creating dynamic functions if already exists.
  protected $skipDefFunChk = FALSE;

  // This will break import if a filter function is missing.
  protected $breakOnUndefinedFilter = TRUE;

  // Reports.
  protected $report = array(
    'total' => 0,
    'updated' => 0,
    'new' => 0,
    'rescheduled' => 0,
    'skipped' => 0,
    'protected' => 0,
    'protected_skipped' => 0,
    'missing' => 0,
    'started' => 0,
    'finished' => 0,
    'errors' => array(),
  );

  // Current entities in work.
  public $current;

  // Current prefiltered values.
  public $prefilteredValue;

  // Callback to alter uniq value
  protected $uniqAlter = FALSE;

  /**
   * Constructor.
   */
  public function __construct($machine_name) {
    $this->machineName = $machine_name;
  }

  // Error handler status.
  protected $errHandlerEnabled = FALSE;

  /**
   * Enables/disables error handler.
   *
   * @param bool $enable
   *    Whatever to enable or disable the error handler
   */
  public function setErrorHandler($enable) {
    if ($enable) {
      if (!$this->errHandlerEnabled) {

        // Set error handler.
        $this->errHandlerEnabled = TRUE;
        set_error_handler(array(
          $this,
          'errorHandler',
        ));
      }
    }
    elseif ($this->errHandlerEnabled) {

      // Resore eror handler.
      $this->errHandlerEnabled = FALSE;
      restore_error_handler();
    }
  }

  // Max number of errors to report.
  protected $errorsLeft = 100;
  protected $throwException = TRUE;

  /**
   * Error handler callback
   * This is setted with set_error_handling()
   */
  public function errorHandler($errno, $errmsg, $file, $line) {

    // Handle silenced errors with @.
    if (error_reporting() == 0) {
      return FALSE;
    }

    // Add error to reports.
    if ($this->errorsLeft) {
      $this->report['errors'][] = array(
        'error' => $errmsg,
        'error number' => $errno,
        'line' => $line,
        'file' => $file,
      );
      $this->errorsLeft--;
    }

    // Throw an exception to be caught by a try-catch statement.
    if ($this->throwException) {
      throw new Exception("(Uncaught Feed Import Exception) [{$errmsg}; file: {$file}; line: {$line}]", $errno);
    }
  }

  /**
   * Gets an array with errors.
   *
   * @return array
   *    An array of errors
   */
  public function getErrors() {
    return $this->report['errors'];
  }

  /**
   * Adds an error to report
   *
   * @param string $errmsg
   *    Error message
   * @param int $errno
   *    Error number
   * @param int $line
   *    Error line
   * @param string $file
   *    Error file
   */
  public function addError($errmsg, $errno = 0, $line = NULL, $file = NULL) {
    $this->report['errors'][] = array(
      'error' => $errmsg,
      'error number' => $errno,
      'line' => $line,
      'file' => $file,
    );
  }

  /**
   * {@inheritdoc}
   */
  public function setOptions(array $options, $overwrite = FALSE) {
    if (isset($options['items_count']) && $options['items_count'] >= 0) {
      $this->itemsCount = (int) $options['items_count'];
    }
    if (isset($options['skip_imported'])) {
      $this->skipImportedItems = (bool) $options['skip_imported'];
    }
    if (isset($options['max_reported_errors'])) {
      $this->errorsLeft = (int) $options['max_reported_errors'];
    }
    if (isset($options['throw_exception'])) {
      $this->throwException = (bool) $options['throw_exception'];
    }
    if (isset($options['reset_cache']) && $options['reset_cache'] >= 0) {
      $this->resetStaticCache = (int) $options['reset_cache'];
    }
    if (isset($options['skip_defined_functions_check'])) {
      $this->skipDefFunChk = (bool) $options['skip_defined_functions_check'];
    }
    if (isset($options['break_on_undefined_filter'])) {
      $this->breakOnUndefinedFilter = (bool) $options['break_on_undefined_filter'];
    }
    if (isset($options['updates_only'])) {
      $this->updatesOnly = (bool) $options['updates_only'];
    }
    $cbks = array(
      'before_combine' => &$this->beforeCombine,
      'after_combine' => &$this->afterCombine,
      'before_create' => &$this->beforeCreate,
      'before_save' => &$this->beforeSave,
      'after_save' => &$this->afterSave,
      'uniq_callback' => &$this->uniqAlter,
    );
    foreach ($cbks as $opt => &$val) {
      if (!empty($options[$opt])) {
        $val = $options[$opt];
      }
    }
    return TRUE;
  }

  /**
   * Sets the entity info object.
   */
  public function setEntityInfo($entity_info) {
    if (!$entity_info) {
      $this
        ->addError('No entity info provided');
      return FALSE;
    }
    $this->entityInfo = $entity_info;
    return TRUE;
  }

  /**
   * Sets the filters object.
   *
   * @param FeedImportMultiFilter $filter
   *    An instance of FeedImportMultiFilter
   * @param array $functions
   *    An array of functions to create
   * @return bool
   *    TRUE on success
   */
  public function setFilter(FeedImportMultiFilter $filter, $functions = array()) {
    $this->filters = $filter;
    if ($functions) {
      foreach ($functions as &$f) {
        if ($this->skipDefFunChk && function_exists($f['name'])) {
          continue;
        }
        $msg = $this->filters
          ->createFunction($f['name'], $f['args'], $f['body']);
        if ($msg !== TRUE) {
          $this
            ->addError($msg);
          return FALSE;
        }
      }
    }
    return TRUE;
  }

  /**
   * Sets the reader and inits it
   *
   * @param FeedImportReader $reader
   *     An instance of reader
   *
   * @return boolean
   *     TRUE if success
   */
  public function setReader(FeedImportReader $reader) {
    $this->reader = $reader;
    $this->throwException = FALSE;
    $ok = $this->reader
      ->init();
    $this->throwException = TRUE;
    return $ok;
  }

  /**
   * Sets uniq path for monitoring items.
   *
   * @param string $uniq
   *     A path.
   *
   * @return boolean
   *     TRUE if success
   */
  public function setUniq($uniq) {
    if ($uniq == NULL) {
      return TRUE;
    }
    if ($this->reader) {
      $this->uniq = $this->reader
        ->formatPath($uniq);
      if (is_scalar($this->uniq)) {
        if ((string) $this->uniq === '') {
          $this->uniq = NULL;
        }
      }
      elseif (empty($this->uniq)) {
        $this->uniq = NULL;
      }
      return TRUE;
    }
    $this
      ->addError('You must have a reader to add unique path');
    return FALSE;
  }

  /**
   * Sets the hash manager
   *
   * @param FeedImportHashManager $hm
   *     An instance of FeedImportHashManager
   */
  public function setHashManager(FeedImportHashManager $hm) {
    $this->hashes = $hm;
    return TRUE;
  }

  /**
   * Sets entity fields.
   *
   * @param array $fields
   *     List of dynamic fields
   * @param array $static_fields
   *     List of static fields
   * @param array $compare_functions
   *     List of compare functions
   * @param string $default_compare
   *     Default compare function
   *
   * @return boolean
   *     TRUE if fields were set
   */
  public function setFields(array $fields = array(), array $static_fields = array(), array $compare_functions = array(), array $merge_classes = array(), $default_compare = '_feed_import_base_compare_other_fields') {

    // Check for requirements.
    if (!$this->entityInfo || !$this->reader || !$this->filters || !empty($this->baseEntity)) {
      $this
        ->addError('Could not set fields because there are missing dependencies or fields were already added');
      return FALSE;
    }

    // Create default merge
    if (!isset($merge_classes[static::UPDATE_COMBINE])) {
      $this
        ->addError(sprintf('Merge field default class found!'));
      return FALSE;
    }
    $class = $merge_classes[static::UPDATE_COMBINE];
    if (!class_exists($class)) {
      $this
        ->addError(sprintf('Merge field default class %s was not found!', $class));
      return FALSE;
    }
    elseif (!is_subclass_of($class, 'FeedImportMergeField')) {
      $this
        ->addError(sprintf('Merge field default class %s must extend % class!', $class, 'FeedImportMergeField'));
      return FALSE;
    }
    $merge_classes[static::UPDATE_COMBINE] = new $class();

    // Create format paths callback.
    $fp = array(
      $this->reader,
      'formatPath',
    );

    // Get language key.
    $lk = $this->entityInfo->langKey;

    // Set static fields.
    $this->staticFields =& $static_fields;

    // Parse all fields.
    foreach ($fields as &$f) {

      // Set field info.
      $this->fieldsInfo[$f['field']] =& $f;
      if (!isset($merge_classes[$f['update_mode']])) {
        $this
          ->addError(sprintf('No field merge class found for field %s!', $f['field']));
        return FALSE;
      }
      if (is_string($merge_classes[$f['update_mode']])) {
        $class = $merge_classes[$f['update_mode']];
        if (!class_exists($class)) {
          $this
            ->addError(sprintf('Merge field class %s for field %s was not found!', $class, $f['field']));
          return FALSE;
        }
        elseif (!is_subclass_of($class, 'FeedImportMergeField')) {
          $this
            ->addError(sprintf('Merge field class %s for field %s must extend % class!', $class, $f['field'], 'FeedImportMergeField'));
          return FALSE;
        }
        $merge_classes[$f['update_mode']] = new $class();
      }
      $f['update_mode'] = $merge_classes[$f['update_mode']];
      $f['overwrite'] = $f['update_mode']
        ->overwriteEmpty();

      // Add cardinality
      $f['cardinality'] = isset($this->entityInfo->fields[$f['field']]['cardinality']) ? $this->entityInfo->fields[$f['field']]['cardinality'] : 1;

      // Add compare function
      if ($f['column']) {
        $f['compare'] = $this->entityInfo->fields[$f['field']]['module'] . ':' . $this->entityInfo->fields[$f['field']]['type'];
        if (isset($compare_functions[$f['compare']])) {
          $f['compare'] = $compare_functions[$f['compare']];
        }
        else {
          $f['compare'] = $default_compare;
        }
      }

      // Check if column must be guessed.
      if ($f['column'] === TRUE && isset($this->entityInfo->fields[$f['field']])) {
        $f['column'] = $this->entityInfo->fields[$f['field']]['column'];
      }

      // Get paths count
      $f['paths_count'] = count($f['paths']);

      // Check if can be used as static field to improve performance
      if (!$f['paths_count'] && $f['default_action'] == 'default_value') {
        if ($f['column']) {

          // Its a field, add it to static fields.
          $this->staticFields[$f['field']][$f['column']] = $f['default_value'];
        }
        else {

          // Its a property.
          $this->staticFields[$f['field']] = $f['default_value'];
        }

        // Ignore filters.
        $f['prefilters'] = $f['filters'] = FALSE;

        // Next field.
        continue;
      }

      // Check (pre)filters
      foreach (array(
        'prefilters',
        'filters',
      ) as $fgroup) {
        if ($f[$fgroup]) {
          $ok = TRUE;

          // Add filters
          foreach ($f[$fgroup] as &$v) {
            if (!$this->filters
              ->add($fgroup, $f['field'], $v['function'], $v['params'])) {
              $ok = FALSE;
              $this
                ->addError(sprintf('Could not add function %s in %s for %s field because is not defined', $v['function'], $fgroup, $f['field']));
              if ($this->breakOnUndefinedFilter) {
                $this
                  ->addError('Import stopped because a filter function is missing!');
                return FALSE;
              }
            }
          }
          $f[$fgroup] = $ok;
        }
        else {

          // No filters
          $f[$fgroup] = FALSE;
        }
      }

      // Format paths
      $f['paths'] = array_map($fp, $f['paths']);

      // Add to fields
      $this->fields[$f['field']] =& $f;
    }

    // Remove reference to $f.
    unset($f);

    // Set all static fields properties.
    // In the end just fields remain in static fields (no properties).
    foreach ($this->staticFields as $f => &$value) {
      if (is_scalar($value)) {
        if (!isset($this->fieldsInfo[$f])) {
          $this->fieldsInfo[$f] = array(
            'field' => $f,
            'column' => FALSE,
            'update_mode' => $merge_classes[static::UPDATE_COMBINE],
            'overwrite' => $merge_classes[static::UPDATE_COMBINE]
              ->overwriteEmpty(),
            'cardinality' => 1,
          );
        }
        $this->baseEntity[$f] = $value;
        unset($this->staticFields[$f]);
      }
      elseif (!isset($this->fieldsInfo[$f])) {
        $this->fieldsInfo[$f] = array(
          'field' => $f,
          'column' => isset($this->entityInfo->fields[$f]) ? $this->entityInfo->fields[$f]['column'] : 'value',
          'update_mode' => $merge_classes[static::UPDATE_COMBINE],
          'overwrite' => $merge_classes[static::UPDATE_COMBINE]
            ->overwriteEmpty(),
          'cardinality' => isset($this->entityInfo->fields[$f]['cardinality']) ? $this->entityInfo->fields[$f]['cardinality'] : 1,
        );
      }
    }

    // There isn't lang in base entity nor in fields so use default.
    if (!isset($this->baseEntity[$lk]) && !isset($this->fields[$lk])) {
      $this->baseEntity[$lk] = LANGUAGE_NONE;
    }

    // Check for language in base entity.
    if (isset($this->baseEntity[$lk])) {

      // We have lang so set static fields in base entity.
      $lang = $this->baseEntity[$lk];
      foreach ($this->staticFields as $f => &$value) {
        if (!isset($this->fields[$f])) {

          // This field is not dynamically generated so use it in base entity.
          $this->baseEntity[$f][$lang][] = $value;

          // Remove it from static fields because now is part of base entity.
          unset($this->staticFields[$f]);
        }
      }
    }
    else {

      // No language, it means language is dynamically created.
      // Move language on top to be available for other fields.
      $this->fields = array_merge(array(
        $lk => NULL,
      ), $this->fields);

      // However, set it to NONE, and if used will be overwritten.
      $this->baseEntity[$lk] = LANGUAGE_NONE;
    }
    return TRUE;
  }

  /**
   * Sets custom settings, after base settings were set.
   * Can be used by child classes.
   *
   * @param array $settings
   *    An array of settings.
   *
   * @return bool
   *    TRUE if success. If not, the import will be stopped.
   */
  public function setCustomSettings(array $settings) {
    return TRUE;
  }

  /**
   * Checks if a variable has content
   *
   * @param mixed &$var
   *     Variable to check
   *
   * @return bool
   *     TRUE if there is content FALSE otherwise
   */
  protected function hasContent(&$var) {
    if (is_scalar($var)) {
      return (string) $var !== '';
    }
    return !empty($var);
  }

  /**
   * Attaches a field value to entity.
   *
   * @param array &$entity
   *     Entity where to attach field.
   * @param array &$field
   *     Field settings.
   * @param mixed &$value
   *     Field value.
   */
  protected function attachField(array &$entity, &$field, &$value) {
    if ($field['column']) {

      // It is a field.
      $fn =& $field['field'];
      $fc =& $field['column'];
      $lang =& $entity[$this->entityInfo->langKey];

      // Check if field exists.
      if (!isset($entity[$fn][$lang])) {
        $entity[$fn][$lang] = array();
      }

      // Get field reference.
      $ef =& $entity[$fn][$lang];

      // Check for static fields.
      $st = FALSE;
      if (!empty($this->staticFields[$fn])) {
        $st =& $this->staticFields[$fn];
      }

      // Make it look multivalued.
      if (!is_array($value)) {
        $value = array(
          $value,
        );
      }
      elseif ($field['cardinality'] != -1 && ($cnt = count($value)) > $field['cardinality']) {

        // Remove extra values.
        array_splice($value, $field['cardinality'], $cnt - $field['cardinality'], NULL);
      }

      // Set field properties.
      if ($st) {

        // There are static field props.
        $fa = array(
          $fc => NULL,
        );
        $fv =& $fa[$fc];
        foreach ($value as &$v) {
          if (is_object($v)) {
            $ef[] = (array) $v + $st;
          }
          else {
            $fv = $v;
            $ef[] = $fa + $st;
          }
        }
      }
      else {

        // No static field props.
        foreach ($value as &$v) {
          if (is_object($v)) {
            $ef[] = (array) $v;
          }
          else {
            $ef[][$fc] = $v;
          }
        }
      }
    }
    else {

      // If this is a property then get only first value.
      if (is_array($value) || is_object($value)) {

        // This still can be array but if so then problem is elsewhere.
        $value = reset($value);
      }

      // Set property.
      $entity[$field['field']] = $value;
    }
  }

  /**
   * Creates a new entity.
   *
   * @param mixed $item
   *     Item to be mapped to entity
   *
   * @return object
   *     An array containing the entity or NULL
   */
  protected function &createEntity(&$item) {

    // Create the entity.
    $entity = $this->baseEntity;

    // Set a ref for current entity.
    $this->current =& $entity;

    // Check for unique id of item.
    if ($this->uniq === NULL) {

      // No hash, so not monitored.
      $entity[static::TEMP_HASH] = NULL;
    }
    else {

      // Get uniq.
      $uniq = $this->reader
        ->map($item, $this->uniq);
      if (is_array($uniq)) {
        $uniq = isset($uniq[0]) ? $uniq[0] : reset($uniq);
      }

      // Check if uniq alter callback exists.
      if ($this->uniqAlter) {
        $uniq = call_user_func($this->uniqAlter, $uniq);
      }

      // Create hash.
      $entity[static::TEMP_HASH] = $this->hashes
        ->hash($uniq);
    }

    // Set entity fields.
    foreach ($this->fields as $name => &$field) {

      // Get filtered field value.
      $val = NULL;
      $i = -1;
      while (++$i < $field['paths_count']) {

        // Get value for specified path.
        $val = $this->reader
          ->map($item, $field['paths'][$i]);

        // Check if passes prefilter.
        if ($field['prefilters']) {
          $this->prefilteredValue = $this->filters
            ->apply("prefilters", $name, $val);
          if (!$this
            ->hasContent($this->prefilteredValue)) {
            $val = $this->prefilteredValue = NULL;

            // If item doesn't pass prefilter than go to next path.
            continue;
          }
        }
        if ($this
          ->hasContent($val)) {

          // We have content, check for filters.
          if ($field['filters']) {
            $val = $this->filters
              ->apply("filters", $name, $val);
            if (!$this
              ->hasContent($val)) {
              $val = NULL;
            }
          }

          // We matched one path.
          break;
        }
        else {
          $val = NULL;
        }
      }

      // Check if default action is needed.
      if ($val === NULL) {
        switch ($field['default_action']) {

          // Provide default value.
          // This is also default action.
          case static::ACTION_DEFAULT_VALUE:
          default:
            $val = $field['default_value'];
            break;

          // Provide filtered default value.
          case static::ACTION_DEFAULT_FILTERED_VALUE:
            $val = $this->filters
              ->apply("filters", $name, $field['default_value']);
            break;

          // Skip this item by returning NULL.
          case static::ACTION_SKIP_ITEM:
            $this->report['skipped']++;
            $this->current = $this->prefilteredValue = NULL;
            return $this->NULL;

          // Don't add this field to entity.
          case static::ACTION_IGNORE_FIELD:
            $this->prefilteredValue = NULL;
            continue 2;
        }
      }

      // Remove prefiltered value.
      $this->prefilteredValue = NULL;

      // Set field value in entity.
      $this
        ->attachField($entity, $field, $val);

      // Not needed anymore.
      unset($val);
    }

    // Remove current reference.
    unset($this->current);
    $this->current = NULL;

    // Return the entity array.
    return $entity;
  }

  /**
   * Combines two entities.
   *
   * @param array $new
   *     Created entity
   * @param object $curr
   *     Current entity
   *
   * @return boolean
   *     TRUE if current entity was changed
   */
  protected function combineEntities(array &$new, $curr) {

    // Initial entity status.
    $changed = FALSE;

    // Get entity language
    $lang =& $new[$this->entityInfo->langKey];

    // Compare fields from entities.
    foreach ($this->fieldsInfo as &$field) {

      // Get field name and column.
      $fn =& $field['field'];
      $fc =& $field['column'];
      if ($fc) {

        // It is a field
        if (isset($new[$fn][$lang])) {
          if (isset($curr->{$fn}[$lang])) {
            if ($field['update_mode']
              ->merge($curr->{$fn}[$lang], $new[$fn][$lang], $field)) {

              // Mark as changed.
              $changed = TRUE;
            }
          }
          else {

            // Overwrite.
            $curr->{$fn}[$lang] = $new[$fn][$lang];

            // Mark as changed.
            $changed = TRUE;
          }
        }
        elseif ($field['overwrite'] && isset($curr->{$fn}[$lang])) {

          // Forced update mode.
          unset($curr->{$fn}[$lang]);

          // Don't leave an empty property.
          if (empty($curr->{$fn})) {
            unset($curr->{$fn});
          }

          // Mark as changed.
          $changed = TRUE;
        }
      }
      else {

        // It is a property.
        if (isset($new[$fn])) {
          if (isset($curr->{$fn}) && $curr->{$fn} == $new[$fn]) {

            // Nothing changed.
            continue;
          }

          // Set the new property value.
          $curr->{$fn} = $new[$fn];

          // Mark as changed.
          $changed = TRUE;

          // Remove from memory.
          unset($new[$fn]);
        }
        elseif ($field['overwrite'] && isset($curr->{$fn})) {

          // Forced update mode is selected.
          unset($curr->{$fn});

          // Mark as changed.
          $changed = TRUE;
        }
      }
    }
    return $changed;
  }

  /**
   * Saves created entities.
   *
   * @param array &$entities
   *   An array of entities to be saved
   */
  protected function saveEntities(array &$entities) {
    static $loadArr = array(
      NULL,
    );

    // Number of entities.
    $total = count($entities);
    $this->report['total'] += $total;

    // Get ids from hashes.
    $ids = $this->hashes
      ->get();

    // Entity create callback.
    $createCallback =& $this->entityInfo->createCallback;

    // Entity save callback.
    $saveCallback =& $this->entityInfo->saveCallback;

    // Entity before combine callback
    $beforeCombine =& $this->beforeCombine;

    // Entity after combine callback
    $afterCombine =& $this->afterCombine;

    // Entity before create callback
    $beforeCreate =& $this->beforeCreate;

    // Entity before save callback
    $beforeSave =& $this->beforeSave;

    // Entity after save callback
    $afterSave =& $this->afterSave;

    // Entity load id.
    $loadId =& $loadArr[0];
    $i = -1;

    // Save each entity.
    while (++$i < $total) {
      $entity =& $entities[$i];
      unset($entities[$i]);
      if ($entity == NULL) {
        continue;
      }

      // Save hash.
      $hash =& $entity[static::TEMP_HASH];

      // Not needed anymore.
      unset($entity[static::TEMP_HASH]);

      // Check if item is already imported or is not monitored.
      if ($hash !== NULL && isset($ids[$hash])) {

        // Check if is used option to skip item if already imported
        // or entity is protected (import cannot make changes).
        if ($this->skipImportedItems || $ids[$hash]->expire == FeedImportHashManager::MARK_PROTECTED) {
          $this->report[$this->skipImportedItems ? 'skipped' : 'protected_skipped']++;
          unset($ids[$hash]);
          continue;
        }

        // Current entity id.
        $loadId = $ids[$hash]->entity_id;

        // Load current entity.
        $current = entity_load($this->entityInfo->name, $loadArr);

        // If entity is missing then skip.
        if (!isset($current[$loadId])) {
          $this->report['missing']++;
          $this->orphanHashes[] = $ids[$hash]->id;
          unset($ids[$hash], $current);
          continue;
        }

        // Get current entity value.
        $current =& $current[$loadId];

        // Entity might be in static cache.
        $this->staticCacheEntities++;

        // Merge status of entity.
        $changed = FALSE;

        // Check for before combine callback.
        if ($beforeCombine) {
          switch ($beforeCombine($entity, $current, $changed)) {
            case static::ENTITY_RESCHEDULE:
              $this->report['rescheduled']++;
              goto ADD_HASH_UPDATE;
            case static::ENTITY_SKIP:
              $this->report['skipped']++;
              goto UNSET_VARS;
            case static::ENTITY_NO_COMBINE:
              goto COMBINE_FINISHED;
            case static::ENTITY_MARK_PROTECTED:
              $this->report['protected']++;
              $this->hashes
                ->protect($ids[$hash]->id);
              goto UNSET_VARS;
          }
        }

        // Check for diff.
        $changed = $this
          ->combineEntities($entity, $current);

        // Check for after combine callback.
        if ($afterCombine) {
          switch ($afterCombine($entity, $current, $changed)) {
            case static::ENTITY_RESCHEDULE:
              $this->report['rescheduled']++;
              goto ADD_HASH_UPDATE;
            case static::ENTITY_SKIP:
              $this->report['skipped']++;
              goto UNSET_VARS;
            case static::ENTITY_MARK_PROTECTED:
              $this->report['protected']++;
              $this->hashes
                ->protect($ids[$hash]->id);
              goto UNSET_VARS;
          }
        }
        COMBINE_FINISHED:

        // Not needed anymore.
        unset($entity);

        // Check if entity is changed and save changes.
        if ($changed) {

          // Save entity.
          if ($saveCallback) {
            $saveCallback($current);
          }
          elseif (method_exists($current, 'save')) {
            if (!$current
              ->save()) {
              unset($current);
              continue;
            }
          }
          elseif ($this->entityInfo->controller->canSave) {
            if (!$this->entityInfo->controller
              ->save($current)) {
              unset($current);
              continue;
            }
          }
          else {

            // Don't know how to save entity...
            unset($current);
            continue;
          }

          // Set report about updated items.
          $this->report['updated']++;

          // Check after save callback
          $afterSave && $afterSave($current, FALSE);
        }
        else {

          // Set report about rescheduled items.
          $this->report['rescheduled']++;
        }
        ADD_HASH_UPDATE:

        // Add to update ids.
        $this->hashes
          ->update($ids[$hash]->id);
        UNSET_VARS:

        // Free some memory.
        unset($ids[$hash], $current, $entity, $hash);
      }
      else {

        // Check if this imports accepts only updates.
        if ($this->updatesOnly) {
          $this->report['skipped']++;
          goto UNSET_E_VARS;
        }

        // Check before save callback
        if ($beforeCreate) {
          switch ($beforeCreate($entity)) {
            case static::ENTITY_SKIP:
              $this->report['skipped']++;
              goto UNSET_E_VARS;
            case static::ENTITY_SKIP_CREATE:
              goto ENTITY_SAVE;
            case static::ENTITY_SKIP_SAVE:
              goto ENTITY_SAVED;
          }
        }

        // Create a new entity.
        if ($createCallback) {
          $entity = $createCallback($entity, $this->entityInfo->name);
        }
        elseif ($this->entityInfo->controller->canCreate) {
          $entity = $this->entityInfo->controller
            ->create($entity);
        }
        else {
          $entity = (object) $entity;
        }
        ENTITY_SAVE:

        // Check before save callback
        if ($beforeSave) {
          switch ($beforeSave($entity)) {
            case static::ENTITY_SKIP:
              $this->report['skipped']++;
              goto UNSET_E_VARS;
            case static::ENTITY_SKIP_SAVE:
              goto ENTITY_SAVED;
          }
        }

        // No entity to save.
        if (!$entity) {
          unset($entity);
          continue;
        }
        elseif ($saveCallback) {
          $saveCallback($entity);
        }
        elseif (method_exists($entity, 'save')) {
          if (!$entity
            ->save()) {
            unset($entity);
            continue;
          }
        }
        elseif ($this->entityInfo->controller->canSave) {
          if (!$this->entityInfo->controller
            ->save($entity)) {
            unset($entity);
            continue;
          }
        }
        else {

          // Don't know how to save entity...
          unset($entity);
          continue;
        }
        ENTITY_SAVED:

        // Entity might be added in static cache.
        $this->staticCacheEntities++;

        // Check after save callback
        $afterSave && $afterSave($entity, TRUE);

        // Check if is monitored.
        if ($hash !== NULL) {

          // Insert into feed import hash table.
          $this->hashes
            ->insert($entity->{$this->entityInfo->idKey}, $hash);
        }

        // Set report about new items.
        $this->report['new']++;
        UNSET_E_VARS:

        // Not needed anymore.
        unset($entity, $hash);
      }

      // Remove entity cache.
      if ($this->staticCacheEntities == $this->resetStaticCache) {
        $this->entityInfo->controller->canResetCache && $this->entityInfo->controller
          ->resetCache();
        $this->staticCacheEntities = 0;
      }
    }
  }

  /**
   * Processes the import.
   */
  public function process() {

    // Give import time (for large imports).
    // Well, if safe mode is on this cannot be done so it may break import.
    if (!ini_get('safe_mode')) {
      set_time_limit(0);
    }

    // Reset report.
    $this->report['started'] = time();
    $entities = array();
    $reader = $this->reader;
    if ($this->itemsCount) {
      $current = 0;

      // Get next item from reader.
      while ($item = $reader
        ->get()) {

        // Create the entity.
        $entities[] =& $this
          ->createEntity($item);

        // Check for save.
        if (++$current == $this->itemsCount) {
          unset($item);
          $this
            ->saveEntities($entities);
          $current = 0;
          $entities = array();
        }
      }
    }
    else {

      // Get next item from reader.
      while ($item = $reader
        ->get()) {

        // Create entity.
        $entities[] =& $this
          ->createEntity($item);
      }
    }

    // Check for left over entities.
    if ($entities) {
      $this
        ->saveEntities($entities);
    }

    // Not needed anymore.
    unset($entities, $reader);

    // Reset the static cache.
    if ($this->staticCacheEntities && $this->resetStaticCache) {
      $this->entityInfo->controller->canResetCache && $this->entityInfo->controller
        ->resetCache();
      $this->staticCacheEntities = 0;
    }

    // Commit left over hash updates.
    $this->hashes
      ->updateCommit();

    // Also the new hash inserts.
    $this->hashes
      ->insertCommit();

    // Remove orphan hashes.
    if ($this->orphanHashes) {
      $this->hashes
        ->delete($this->orphanHashes);
      $this->orphanHashes = array();
    }
    $this->report['finished'] = time();
    $report = $this->report;

    // Reset report
    $this->report = array(
      'total' => 0,
      'updated' => 0,
      'new' => 0,
      'rescheduled' => 0,
      'skipped' => 0,
      'protected' => 0,
      'protected_skipped' => 0,
      'missing' => 0,
      'started' => 0,
      'finished' => 0,
      'errors' => array(),
    );
    return $report;
  }

}

/**
 * This class implements SQL hash storage
 */
class FeedImportSQLHashes extends FeedImportHashManager {

  // Feed machine name.
  protected $feedName;

  // Entity name.
  protected $entity;

  // Insert format.
  protected $insertFormat;

  // Update format.
  protected $updateFormat;

  // Reuseable db objects.
  protected $select;
  protected $insert;
  protected $update;

  // Options
  protected $updateChunkSize = 500;
  protected $insertChunkSize = 500;
  protected $group = '';

  /**
   * {@inheritdoc}
   */
  public function __construct($entity_name, $feed_machine_name) {
    $this->entity = $entity_name;
    $this->feedName = $feed_machine_name;
  }

  /**
   * {@inheritdoc}
   */
  public function setOptions(array $options, $overwrite = FALSE) {
    if (isset($options['ttl']) && $options['ttl'] >= 0) {
      $this->ttl = (int) $options['ttl'];
    }
    if (isset($options['update_chunk']) && $options['update_chunk'] > 0) {
      $this->updateChunkSize = (int) $options['update_chunk'];
    }
    if (isset($options['insert_chunk']) && $options['insert_chunk'] > 0) {
      $this->insertChunkSize = (int) $options['insert_chunk'];
    }
    if (isset($options['group'])) {
      $this->group = $options['group'];
    }

    // Create insert format.
    $this->insertFormat = array(
      $this->feedName,
      $this->group,
      $this->entity,
      0,
      // holds id
      NULL,
      // holds hash
      0,
    );

    // Create select object.
    $this->select = db_select('feed_import_hashes', 'f')
      ->fields('f', array(
      'hash',
      'id',
      'entity_id',
      'expire',
    ))
      ->condition('feed_group', $this->group)
      ->condition('entity', $this->entity)
      ->condition('hash', array(), 'IN');

    // Create update object.
    $this->update = db_update('feed_import_hashes')
      ->condition('feed_group', $this->group)
      ->condition('entity', $this->entity);

    // Create update format.
    $this->updateFormat = array(
      'expire' => 0,
    );

    // Create insert object.
    $this->insert = db_insert('feed_import_hashes')
      ->fields(array(
      'feed_machine_name',
      'feed_group',
      'entity',
      'entity_id',
      'hash',
      'expire',
    ));
  }

  /**
   * {@inheritdoc}
   */
  public function hash(&$uniq) {
    return $this->generatedHashes[] = md5($uniq);
  }

  /**
   * {@inheritdoc}
   */
  public function get() {
    if (!$this->generatedHashes) {
      return array();
    }

    // Get select conditions (reuse same select object).
    $cond =& $this->select
      ->conditions();

    // Remove last condition.
    array_pop($cond);

    // Return hashes.
    $hashes = $this->select
      ->condition('hash', $this->generatedHashes, 'IN')
      ->execute()
      ->fetchAllAssoc('hash');
    $this->generatedHashes = array();
    return $hashes;
  }

  // Items left to insert.
  protected $toInsert = 0;

  /**
   * {@inheritdoc}
   */
  public function insert($id, $hash) {
    $this->insertFormat[3] = $id;
    $this->insertFormat[4] = $hash;
    $this->insertFormat[5] = $this->ttl ? $this->ttl + time() : 0;
    $this->insert
      ->values($this->insertFormat);

    // Commit insert.
    if (++$this->toInsert == $this->insertChunkSize) {
      $this
        ->insertCommit();
    }
  }

  /**
   * {@inheritdoc}
   */
  public function insertCommit() {
    if ($this->toInsert) {
      $this->insert
        ->execute();
      $this->toInsert = 0;
    }
  }

  // Ids that need to be updated.
  protected $updateIds = array();
  protected $toUpdate = 0;

  /**
   * {@inheritdoc}
   */
  public function update($id) {
    $this->updateIds[] = $id;
    if (++$this->toUpdate == $this->updateChunkSize) {
      $this
        ->_update($this->updateIds, $this->ttl ? $this->ttl + time() : 0);
      $this->toUpdate = 0;
    }
  }

  // Ids that need to be protected.
  protected $protectIds = array();
  protected $toProtect = 0;

  /**
   * {@inheritdoc}
   */
  public function protect($id) {
    $this->protectIds[] = $id;
    if (++$this->toProtect == $this->updateChunkSize) {
      $this
        ->_update($this->protectIds, static::MARK_PROTECTED);
      $this->toProtect = 0;
    }
  }

  /**
   * {@inheritdoc}
   */
  public function updateCommit() {
    if ($this->toUpdate) {
      $this
        ->_update($this->updateIds, $this->ttl ? $this->ttl + time() : 0);
      $this->toUpdate = 0;
    }
    if ($this->toProtect) {
      $this
        ->_update($this->protectIds, static::MARK_PROTECTED);
      $this->toProtect = 0;
    }
  }

  /**
   * Makes an update.
   */
  protected function _update(array &$ids, $expire) {
    $this->updateFormat['expire'] = $expire;
    $this->update
      ->fields($this->updateFormat);

    // Get update conditions.
    $conditions =& $this->update
      ->conditions();
    if (count($ids) <= $this->updateChunkSize) {

      // Update hashes.
      $this->update
        ->condition('id', $ids, 'IN')
        ->execute();

      // Remove last IN condition.
      array_pop($conditions);

      // Clear array.
      $ids = array();
    }
    else {

      // Split ids in chunks.
      $ids = array_chunk($ids, $this->updateChunkSize);
      for ($i = 0, $m = count($ids); $i < $m; $i++) {

        // Update hashes.
        $this->update
          ->condition('id', $ids[$i], 'IN')
          ->execute();
        unset($ids[$i]);

        // Remove last IN condition.
        array_pop($conditions);
      }
    }
  }

  /**
   * {@inheritdoc}
   */
  public static function delete(array $ids) {
    db_delete('feed_import_hashes')
      ->condition('id', $ids)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function deleteByFeed($name) {
    db_delete('feed_import_hashes')
      ->condition('feed_machine_name', $name)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function deleteByGroup($group) {
    db_delete('feed_import_hashes')
      ->condition('feed_group', $group)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function deleteEntities($ids, $entity_type) {
    db_delete('feed_import_hashes')
      ->condition('entity', $entity_type)
      ->condition('entity_id', $ids)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function getExpired($name, $max = 0) {
    $q = db_select('feed_import_hashes', 'f')
      ->fields('f', array(
      'entity',
      'id',
      'entity_id',
    ))
      ->condition('feed_machine_name', $name)
      ->condition('expire', array(
      static::MARK_PROTECTED + 1,
      time(),
    ), 'BETWEEN');
    if ($max) {
      $q
        ->range(0, $max);
    }
    $q = $q
      ->execute();
    $ret = array();
    while ($r = $q
      ->fetch(PDO::FETCH_ASSOC)) {
      $ret[$r['entity']][$r['id']] = $r['entity_id'];
    }
    return $ret;
  }

  /**
   * {@inheritdoc}
   */
  public static function rescheduleAll($name, $ttl) {
    if ($ttl) {
      $ttl += time();
    }
    db_update('feed_import_hashes')
      ->condition('feed_machine_name', $name)
      ->condition('expire', static::MARK_PROTECTED, '>')
      ->fields(array(
      'expire' => $ttl,
    ))
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function totalHashes($name = NULL) {
    $q = db_select('feed_import_hashes', 'f')
      ->fields('f', array(
      'feed_machine_name',
    ));
    if ($name) {
      $q
        ->condition('feed_machine_name', $name);
    }
    $q
      ->addExpression('COUNT(*)', 'cnt');
    $q = $q
      ->groupBy('feed_machine_name')
      ->execute()
      ->fetchAllKeyed();
    if ($name && is_scalar($name)) {
      return isset($q[$name]) ? $q[$name] : 0;
    }
    return $q;
  }

}

/**
 * This class implements SQL hash storage compatible with version 2.x
 */
class FeedImportSQLHashesv2Compatible extends FeedImportSQLHashes {

  // Used to generate hash.
  protected $hashPart;

  /**
   * {@inheritdoc}
   */
  public function hash(&$uniq) {
    return $this->generatedHashes[] = md5($uniq . $this->hashPart);
  }

  /**
   * {@inheritdoc}
   */
  public function setOptions(array $options, $overwrite = FALSE) {
    parent::setOptions($options, $overwrite);
    $this->hashPart = '/' . $this->group . '/' . $this->entity;
  }

}

Classes

Namesort descending Description
FeedImport This class provides helper functions for feed import.
FeedImportProcessor Class that processess the import.
FeedImportSQLHashes This class implements SQL hash storage
FeedImportSQLHashesv2Compatible This class implements SQL hash storage compatible with version 2.x