class FeedImportProcessor in Feed Import 7.3
Class that processess the import.
Hierarchy
- class \FeedImportConfigurable
- class \FeedImportProcessor
Expanded class hierarchy of FeedImportProcessor
3 string references to 'FeedImportProcessor'
- FeedImport::getEmptyFeed in feed_import_base/
inc/ feed_import.inc - Gets a new empty feed configuration.
- feed_import_feed_import_processor_info in ./
feed_import.module - Implements hook_feed_import_processor_info().
- feed_import_feed_import_setting_types in ./
feed_import.module - Implements hook_feed_import_setting_types().
File
- feed_import_base/
inc/ feed_import.inc, line 568 - This file contains Feed Import helpers.
View source
class FeedImportProcessor extends FeedImportConfigurable {
// No duplicates
const UPDATE_COMBINE = 0;
// Duplicates
const UPDATE_MERGE = 1;
// Overwrite
const UPDATE_OVERWRITE = 2;
// Overwrite fast
const UPDATE_OVERWRITE_FAST = 3;
// No field value actions.
const ACTION_DEFAULT_VALUE = 0;
const ACTION_DEFAULT_FILTERED_VALUE = 1;
const ACTION_IGNORE_FIELD = 2;
const ACTION_SKIP_ITEM = 3;
// Action before/after combine entities
const ENTITY_CONTINUE = 0;
const ENTITY_RESCHEDULE = 1;
const ENTITY_SKIP = 2;
const ENTITY_NO_COMBINE = 3;
const ENTITY_MARK_PROTECTED = 4;
// Action before/after create/save entity
const ENTITY_SKIP_CREATE = 5;
const ENTITY_SKIP_SAVE = 6;
// Hash property name.
const TEMP_HASH = '$_FI~HASH_$';
// Fields imported.
protected $fields = array();
// Static fields.
protected $staticFields = array();
// Base entity.
protected $baseEntity = array();
// Fields info.
protected $fieldsInfo = array();
// Uniq path.
protected $uniq;
// Feed group.
protected $group;
// Feed machine name.
protected $machineName;
// Flag to skip imported items.
protected $skipImportedItems = FALSE;
// Flag to only apply updates.
protected $updatesOnly = FALSE;
// Entity info.
protected $entityInfo;
// Current reader.
protected $reader;
// Current hash manager.
protected $hashes;
// Filters
protected $filters;
// After how many of entities to save.
protected $itemsCount = 0;
// After how many items to reset entity static cache.
protected $resetStaticCache = 0;
// Number entities in static cache.
protected $staticCacheEntities = 0;
// Callbacks for before/after combine entities
protected $beforeCombine = FALSE;
protected $afterCombine = FALSE;
// Callbacks for before/after create/save entities
protected $beforeCreate = FALSE;
protected $beforeSave = FALSE;
protected $afterSave = FALSE;
// List of missing entities (as hash ids)
protected $orphanHashes = array();
// A null value.
protected $NULL = NULL;
// Flag indicating to skip creating dynamic functions if already exists.
protected $skipDefFunChk = FALSE;
// This will break import if a filter function is missing.
protected $breakOnUndefinedFilter = TRUE;
// Reports.
protected $report = array(
'total' => 0,
'updated' => 0,
'new' => 0,
'rescheduled' => 0,
'skipped' => 0,
'protected' => 0,
'protected_skipped' => 0,
'missing' => 0,
'started' => 0,
'finished' => 0,
'errors' => array(),
);
// Current entities in work.
public $current;
// Current prefiltered values.
public $prefilteredValue;
// Callback to alter uniq value
protected $uniqAlter = FALSE;
/**
* Constructor.
*/
public function __construct($machine_name) {
$this->machineName = $machine_name;
}
// Error handler status.
protected $errHandlerEnabled = FALSE;
/**
* Enables/disables error handler.
*
* @param bool $enable
* Whatever to enable or disable the error handler
*/
public function setErrorHandler($enable) {
if ($enable) {
if (!$this->errHandlerEnabled) {
// Set error handler.
$this->errHandlerEnabled = TRUE;
set_error_handler(array(
$this,
'errorHandler',
));
}
}
elseif ($this->errHandlerEnabled) {
// Resore eror handler.
$this->errHandlerEnabled = FALSE;
restore_error_handler();
}
}
// Max number of errors to report.
protected $errorsLeft = 100;
protected $throwException = TRUE;
/**
* Error handler callback
* This is setted with set_error_handling()
*/
public function errorHandler($errno, $errmsg, $file, $line) {
// Handle silenced errors with @.
if (error_reporting() == 0) {
return FALSE;
}
// Add error to reports.
if ($this->errorsLeft) {
$this->report['errors'][] = array(
'error' => $errmsg,
'error number' => $errno,
'line' => $line,
'file' => $file,
);
$this->errorsLeft--;
}
// Throw an exception to be caught by a try-catch statement.
if ($this->throwException) {
throw new Exception("(Uncaught Feed Import Exception) [{$errmsg}; file: {$file}; line: {$line}]", $errno);
}
}
/**
* Gets an array with errors.
*
* @return array
* An array of errors
*/
public function getErrors() {
return $this->report['errors'];
}
/**
* Adds an error to report
*
* @param string $errmsg
* Error message
* @param int $errno
* Error number
* @param int $line
* Error line
* @param string $file
* Error file
*/
public function addError($errmsg, $errno = 0, $line = NULL, $file = NULL) {
$this->report['errors'][] = array(
'error' => $errmsg,
'error number' => $errno,
'line' => $line,
'file' => $file,
);
}
/**
* {@inheritdoc}
*/
public function setOptions(array $options, $overwrite = FALSE) {
if (isset($options['items_count']) && $options['items_count'] >= 0) {
$this->itemsCount = (int) $options['items_count'];
}
if (isset($options['skip_imported'])) {
$this->skipImportedItems = (bool) $options['skip_imported'];
}
if (isset($options['max_reported_errors'])) {
$this->errorsLeft = (int) $options['max_reported_errors'];
}
if (isset($options['throw_exception'])) {
$this->throwException = (bool) $options['throw_exception'];
}
if (isset($options['reset_cache']) && $options['reset_cache'] >= 0) {
$this->resetStaticCache = (int) $options['reset_cache'];
}
if (isset($options['skip_defined_functions_check'])) {
$this->skipDefFunChk = (bool) $options['skip_defined_functions_check'];
}
if (isset($options['break_on_undefined_filter'])) {
$this->breakOnUndefinedFilter = (bool) $options['break_on_undefined_filter'];
}
if (isset($options['updates_only'])) {
$this->updatesOnly = (bool) $options['updates_only'];
}
$cbks = array(
'before_combine' => &$this->beforeCombine,
'after_combine' => &$this->afterCombine,
'before_create' => &$this->beforeCreate,
'before_save' => &$this->beforeSave,
'after_save' => &$this->afterSave,
'uniq_callback' => &$this->uniqAlter,
);
foreach ($cbks as $opt => &$val) {
if (!empty($options[$opt])) {
$val = $options[$opt];
}
}
return TRUE;
}
/**
* Sets the entity info object.
*/
public function setEntityInfo($entity_info) {
if (!$entity_info) {
$this
->addError('No entity info provided');
return FALSE;
}
$this->entityInfo = $entity_info;
return TRUE;
}
/**
* Sets the filters object.
*
* @param FeedImportMultiFilter $filter
* An instance of FeedImportMultiFilter
* @param array $functions
* An array of functions to create
* @return bool
* TRUE on success
*/
public function setFilter(FeedImportMultiFilter $filter, $functions = array()) {
$this->filters = $filter;
if ($functions) {
foreach ($functions as &$f) {
if ($this->skipDefFunChk && function_exists($f['name'])) {
continue;
}
$msg = $this->filters
->createFunction($f['name'], $f['args'], $f['body']);
if ($msg !== TRUE) {
$this
->addError($msg);
return FALSE;
}
}
}
return TRUE;
}
/**
* Sets the reader and inits it
*
* @param FeedImportReader $reader
* An instance of reader
*
* @return boolean
* TRUE if success
*/
public function setReader(FeedImportReader $reader) {
$this->reader = $reader;
$this->throwException = FALSE;
$ok = $this->reader
->init();
$this->throwException = TRUE;
return $ok;
}
/**
* Sets uniq path for monitoring items.
*
* @param string $uniq
* A path.
*
* @return boolean
* TRUE if success
*/
public function setUniq($uniq) {
if ($uniq == NULL) {
return TRUE;
}
if ($this->reader) {
$this->uniq = $this->reader
->formatPath($uniq);
if (is_scalar($this->uniq)) {
if ((string) $this->uniq === '') {
$this->uniq = NULL;
}
}
elseif (empty($this->uniq)) {
$this->uniq = NULL;
}
return TRUE;
}
$this
->addError('You must have a reader to add unique path');
return FALSE;
}
/**
* Sets the hash manager
*
* @param FeedImportHashManager $hm
* An instance of FeedImportHashManager
*/
public function setHashManager(FeedImportHashManager $hm) {
$this->hashes = $hm;
return TRUE;
}
/**
* Sets entity fields.
*
* @param array $fields
* List of dynamic fields
* @param array $static_fields
* List of static fields
* @param array $compare_functions
* List of compare functions
* @param string $default_compare
* Default compare function
*
* @return boolean
* TRUE if fields were set
*/
public function setFields(array $fields = array(), array $static_fields = array(), array $compare_functions = array(), array $merge_classes = array(), $default_compare = '_feed_import_base_compare_other_fields') {
// Check for requirements.
if (!$this->entityInfo || !$this->reader || !$this->filters || !empty($this->baseEntity)) {
$this
->addError('Could not set fields because there are missing dependencies or fields were already added');
return FALSE;
}
// Create default merge
if (!isset($merge_classes[static::UPDATE_COMBINE])) {
$this
->addError(sprintf('Merge field default class found!'));
return FALSE;
}
$class = $merge_classes[static::UPDATE_COMBINE];
if (!class_exists($class)) {
$this
->addError(sprintf('Merge field default class %s was not found!', $class));
return FALSE;
}
elseif (!is_subclass_of($class, 'FeedImportMergeField')) {
$this
->addError(sprintf('Merge field default class %s must extend % class!', $class, 'FeedImportMergeField'));
return FALSE;
}
$merge_classes[static::UPDATE_COMBINE] = new $class();
// Create format paths callback.
$fp = array(
$this->reader,
'formatPath',
);
// Get language key.
$lk = $this->entityInfo->langKey;
// Set static fields.
$this->staticFields =& $static_fields;
// Parse all fields.
foreach ($fields as &$f) {
// Set field info.
$this->fieldsInfo[$f['field']] =& $f;
if (!isset($merge_classes[$f['update_mode']])) {
$this
->addError(sprintf('No field merge class found for field %s!', $f['field']));
return FALSE;
}
if (is_string($merge_classes[$f['update_mode']])) {
$class = $merge_classes[$f['update_mode']];
if (!class_exists($class)) {
$this
->addError(sprintf('Merge field class %s for field %s was not found!', $class, $f['field']));
return FALSE;
}
elseif (!is_subclass_of($class, 'FeedImportMergeField')) {
$this
->addError(sprintf('Merge field class %s for field %s must extend % class!', $class, $f['field'], 'FeedImportMergeField'));
return FALSE;
}
$merge_classes[$f['update_mode']] = new $class();
}
$f['update_mode'] = $merge_classes[$f['update_mode']];
$f['overwrite'] = $f['update_mode']
->overwriteEmpty();
// Add cardinality
$f['cardinality'] = isset($this->entityInfo->fields[$f['field']]['cardinality']) ? $this->entityInfo->fields[$f['field']]['cardinality'] : 1;
// Add compare function
if ($f['column']) {
$f['compare'] = $this->entityInfo->fields[$f['field']]['module'] . ':' . $this->entityInfo->fields[$f['field']]['type'];
if (isset($compare_functions[$f['compare']])) {
$f['compare'] = $compare_functions[$f['compare']];
}
else {
$f['compare'] = $default_compare;
}
}
// Check if column must be guessed.
if ($f['column'] === TRUE && isset($this->entityInfo->fields[$f['field']])) {
$f['column'] = $this->entityInfo->fields[$f['field']]['column'];
}
// Get paths count
$f['paths_count'] = count($f['paths']);
// Check if can be used as static field to improve performance
if (!$f['paths_count'] && $f['default_action'] == 'default_value') {
if ($f['column']) {
// Its a field, add it to static fields.
$this->staticFields[$f['field']][$f['column']] = $f['default_value'];
}
else {
// Its a property.
$this->staticFields[$f['field']] = $f['default_value'];
}
// Ignore filters.
$f['prefilters'] = $f['filters'] = FALSE;
// Next field.
continue;
}
// Check (pre)filters
foreach (array(
'prefilters',
'filters',
) as $fgroup) {
if ($f[$fgroup]) {
$ok = TRUE;
// Add filters
foreach ($f[$fgroup] as &$v) {
if (!$this->filters
->add($fgroup, $f['field'], $v['function'], $v['params'])) {
$ok = FALSE;
$this
->addError(sprintf('Could not add function %s in %s for %s field because is not defined', $v['function'], $fgroup, $f['field']));
if ($this->breakOnUndefinedFilter) {
$this
->addError('Import stopped because a filter function is missing!');
return FALSE;
}
}
}
$f[$fgroup] = $ok;
}
else {
// No filters
$f[$fgroup] = FALSE;
}
}
// Format paths
$f['paths'] = array_map($fp, $f['paths']);
// Add to fields
$this->fields[$f['field']] =& $f;
}
// Remove reference to $f.
unset($f);
// Set all static fields properties.
// In the end just fields remain in static fields (no properties).
foreach ($this->staticFields as $f => &$value) {
if (is_scalar($value)) {
if (!isset($this->fieldsInfo[$f])) {
$this->fieldsInfo[$f] = array(
'field' => $f,
'column' => FALSE,
'update_mode' => $merge_classes[static::UPDATE_COMBINE],
'overwrite' => $merge_classes[static::UPDATE_COMBINE]
->overwriteEmpty(),
'cardinality' => 1,
);
}
$this->baseEntity[$f] = $value;
unset($this->staticFields[$f]);
}
elseif (!isset($this->fieldsInfo[$f])) {
$this->fieldsInfo[$f] = array(
'field' => $f,
'column' => isset($this->entityInfo->fields[$f]) ? $this->entityInfo->fields[$f]['column'] : 'value',
'update_mode' => $merge_classes[static::UPDATE_COMBINE],
'overwrite' => $merge_classes[static::UPDATE_COMBINE]
->overwriteEmpty(),
'cardinality' => isset($this->entityInfo->fields[$f]['cardinality']) ? $this->entityInfo->fields[$f]['cardinality'] : 1,
);
}
}
// There isn't lang in base entity nor in fields so use default.
if (!isset($this->baseEntity[$lk]) && !isset($this->fields[$lk])) {
$this->baseEntity[$lk] = LANGUAGE_NONE;
}
// Check for language in base entity.
if (isset($this->baseEntity[$lk])) {
// We have lang so set static fields in base entity.
$lang = $this->baseEntity[$lk];
foreach ($this->staticFields as $f => &$value) {
if (!isset($this->fields[$f])) {
// This field is not dynamically generated so use it in base entity.
$this->baseEntity[$f][$lang][] = $value;
// Remove it from static fields because now is part of base entity.
unset($this->staticFields[$f]);
}
}
}
else {
// No language, it means language is dynamically created.
// Move language on top to be available for other fields.
$this->fields = array_merge(array(
$lk => NULL,
), $this->fields);
// However, set it to NONE, and if used will be overwritten.
$this->baseEntity[$lk] = LANGUAGE_NONE;
}
return TRUE;
}
/**
* Sets custom settings, after base settings were set.
* Can be used by child classes.
*
* @param array $settings
* An array of settings.
*
* @return bool
* TRUE if success. If not, the import will be stopped.
*/
public function setCustomSettings(array $settings) {
return TRUE;
}
/**
* Checks if a variable has content
*
* @param mixed &$var
* Variable to check
*
* @return bool
* TRUE if there is content FALSE otherwise
*/
protected function hasContent(&$var) {
if (is_scalar($var)) {
return (string) $var !== '';
}
return !empty($var);
}
/**
* Attaches a field value to entity.
*
* @param array &$entity
* Entity where to attach field.
* @param array &$field
* Field settings.
* @param mixed &$value
* Field value.
*/
protected function attachField(array &$entity, &$field, &$value) {
if ($field['column']) {
// It is a field.
$fn =& $field['field'];
$fc =& $field['column'];
$lang =& $entity[$this->entityInfo->langKey];
// Check if field exists.
if (!isset($entity[$fn][$lang])) {
$entity[$fn][$lang] = array();
}
// Get field reference.
$ef =& $entity[$fn][$lang];
// Check for static fields.
$st = FALSE;
if (!empty($this->staticFields[$fn])) {
$st =& $this->staticFields[$fn];
}
// Make it look multivalued.
if (!is_array($value)) {
$value = array(
$value,
);
}
elseif ($field['cardinality'] != -1 && ($cnt = count($value)) > $field['cardinality']) {
// Remove extra values.
array_splice($value, $field['cardinality'], $cnt - $field['cardinality'], NULL);
}
// Set field properties.
if ($st) {
// There are static field props.
$fa = array(
$fc => NULL,
);
$fv =& $fa[$fc];
foreach ($value as &$v) {
if (is_object($v)) {
$ef[] = (array) $v + $st;
}
else {
$fv = $v;
$ef[] = $fa + $st;
}
}
}
else {
// No static field props.
foreach ($value as &$v) {
if (is_object($v)) {
$ef[] = (array) $v;
}
else {
$ef[][$fc] = $v;
}
}
}
}
else {
// If this is a property then get only first value.
if (is_array($value) || is_object($value)) {
// This still can be array but if so then problem is elsewhere.
$value = reset($value);
}
// Set property.
$entity[$field['field']] = $value;
}
}
/**
* Creates a new entity.
*
* @param mixed $item
* Item to be mapped to entity
*
* @return object
* An array containing the entity or NULL
*/
protected function &createEntity(&$item) {
// Create the entity.
$entity = $this->baseEntity;
// Set a ref for current entity.
$this->current =& $entity;
// Check for unique id of item.
if ($this->uniq === NULL) {
// No hash, so not monitored.
$entity[static::TEMP_HASH] = NULL;
}
else {
// Get uniq.
$uniq = $this->reader
->map($item, $this->uniq);
if (is_array($uniq)) {
$uniq = isset($uniq[0]) ? $uniq[0] : reset($uniq);
}
// Check if uniq alter callback exists.
if ($this->uniqAlter) {
$uniq = call_user_func($this->uniqAlter, $uniq);
}
// Create hash.
$entity[static::TEMP_HASH] = $this->hashes
->hash($uniq);
}
// Set entity fields.
foreach ($this->fields as $name => &$field) {
// Get filtered field value.
$val = NULL;
$i = -1;
while (++$i < $field['paths_count']) {
// Get value for specified path.
$val = $this->reader
->map($item, $field['paths'][$i]);
// Check if passes prefilter.
if ($field['prefilters']) {
$this->prefilteredValue = $this->filters
->apply("prefilters", $name, $val);
if (!$this
->hasContent($this->prefilteredValue)) {
$val = $this->prefilteredValue = NULL;
// If item doesn't pass prefilter than go to next path.
continue;
}
}
if ($this
->hasContent($val)) {
// We have content, check for filters.
if ($field['filters']) {
$val = $this->filters
->apply("filters", $name, $val);
if (!$this
->hasContent($val)) {
$val = NULL;
}
}
// We matched one path.
break;
}
else {
$val = NULL;
}
}
// Check if default action is needed.
if ($val === NULL) {
switch ($field['default_action']) {
// Provide default value.
// This is also default action.
case static::ACTION_DEFAULT_VALUE:
default:
$val = $field['default_value'];
break;
// Provide filtered default value.
case static::ACTION_DEFAULT_FILTERED_VALUE:
$val = $this->filters
->apply("filters", $name, $field['default_value']);
break;
// Skip this item by returning NULL.
case static::ACTION_SKIP_ITEM:
$this->report['skipped']++;
$this->current = $this->prefilteredValue = NULL;
return $this->NULL;
// Don't add this field to entity.
case static::ACTION_IGNORE_FIELD:
$this->prefilteredValue = NULL;
continue 2;
}
}
// Remove prefiltered value.
$this->prefilteredValue = NULL;
// Set field value in entity.
$this
->attachField($entity, $field, $val);
// Not needed anymore.
unset($val);
}
// Remove current reference.
unset($this->current);
$this->current = NULL;
// Return the entity array.
return $entity;
}
/**
* Combines two entities.
*
* @param array $new
* Created entity
* @param object $curr
* Current entity
*
* @return boolean
* TRUE if current entity was changed
*/
protected function combineEntities(array &$new, $curr) {
// Initial entity status.
$changed = FALSE;
// Get entity language
$lang =& $new[$this->entityInfo->langKey];
// Compare fields from entities.
foreach ($this->fieldsInfo as &$field) {
// Get field name and column.
$fn =& $field['field'];
$fc =& $field['column'];
if ($fc) {
// It is a field
if (isset($new[$fn][$lang])) {
if (isset($curr->{$fn}[$lang])) {
if ($field['update_mode']
->merge($curr->{$fn}[$lang], $new[$fn][$lang], $field)) {
// Mark as changed.
$changed = TRUE;
}
}
else {
// Overwrite.
$curr->{$fn}[$lang] = $new[$fn][$lang];
// Mark as changed.
$changed = TRUE;
}
}
elseif ($field['overwrite'] && isset($curr->{$fn}[$lang])) {
// Forced update mode.
unset($curr->{$fn}[$lang]);
// Don't leave an empty property.
if (empty($curr->{$fn})) {
unset($curr->{$fn});
}
// Mark as changed.
$changed = TRUE;
}
}
else {
// It is a property.
if (isset($new[$fn])) {
if (isset($curr->{$fn}) && $curr->{$fn} == $new[$fn]) {
// Nothing changed.
continue;
}
// Set the new property value.
$curr->{$fn} = $new[$fn];
// Mark as changed.
$changed = TRUE;
// Remove from memory.
unset($new[$fn]);
}
elseif ($field['overwrite'] && isset($curr->{$fn})) {
// Forced update mode is selected.
unset($curr->{$fn});
// Mark as changed.
$changed = TRUE;
}
}
}
return $changed;
}
/**
* Saves created entities.
*
* @param array &$entities
* An array of entities to be saved
*/
protected function saveEntities(array &$entities) {
static $loadArr = array(
NULL,
);
// Number of entities.
$total = count($entities);
$this->report['total'] += $total;
// Get ids from hashes.
$ids = $this->hashes
->get();
// Entity create callback.
$createCallback =& $this->entityInfo->createCallback;
// Entity save callback.
$saveCallback =& $this->entityInfo->saveCallback;
// Entity before combine callback
$beforeCombine =& $this->beforeCombine;
// Entity after combine callback
$afterCombine =& $this->afterCombine;
// Entity before create callback
$beforeCreate =& $this->beforeCreate;
// Entity before save callback
$beforeSave =& $this->beforeSave;
// Entity after save callback
$afterSave =& $this->afterSave;
// Entity load id.
$loadId =& $loadArr[0];
$i = -1;
// Save each entity.
while (++$i < $total) {
$entity =& $entities[$i];
unset($entities[$i]);
if ($entity == NULL) {
continue;
}
// Save hash.
$hash =& $entity[static::TEMP_HASH];
// Not needed anymore.
unset($entity[static::TEMP_HASH]);
// Check if item is already imported or is not monitored.
if ($hash !== NULL && isset($ids[$hash])) {
// Check if is used option to skip item if already imported
// or entity is protected (import cannot make changes).
if ($this->skipImportedItems || $ids[$hash]->expire == FeedImportHashManager::MARK_PROTECTED) {
$this->report[$this->skipImportedItems ? 'skipped' : 'protected_skipped']++;
unset($ids[$hash]);
continue;
}
// Current entity id.
$loadId = $ids[$hash]->entity_id;
// Load current entity.
$current = entity_load($this->entityInfo->name, $loadArr);
// If entity is missing then skip.
if (!isset($current[$loadId])) {
$this->report['missing']++;
$this->orphanHashes[] = $ids[$hash]->id;
unset($ids[$hash], $current);
continue;
}
// Get current entity value.
$current =& $current[$loadId];
// Entity might be in static cache.
$this->staticCacheEntities++;
// Merge status of entity.
$changed = FALSE;
// Check for before combine callback.
if ($beforeCombine) {
switch ($beforeCombine($entity, $current, $changed)) {
case static::ENTITY_RESCHEDULE:
$this->report['rescheduled']++;
goto ADD_HASH_UPDATE;
case static::ENTITY_SKIP:
$this->report['skipped']++;
goto UNSET_VARS;
case static::ENTITY_NO_COMBINE:
goto COMBINE_FINISHED;
case static::ENTITY_MARK_PROTECTED:
$this->report['protected']++;
$this->hashes
->protect($ids[$hash]->id);
goto UNSET_VARS;
}
}
// Check for diff.
$changed = $this
->combineEntities($entity, $current);
// Check for after combine callback.
if ($afterCombine) {
switch ($afterCombine($entity, $current, $changed)) {
case static::ENTITY_RESCHEDULE:
$this->report['rescheduled']++;
goto ADD_HASH_UPDATE;
case static::ENTITY_SKIP:
$this->report['skipped']++;
goto UNSET_VARS;
case static::ENTITY_MARK_PROTECTED:
$this->report['protected']++;
$this->hashes
->protect($ids[$hash]->id);
goto UNSET_VARS;
}
}
COMBINE_FINISHED:
// Not needed anymore.
unset($entity);
// Check if entity is changed and save changes.
if ($changed) {
// Save entity.
if ($saveCallback) {
$saveCallback($current);
}
elseif (method_exists($current, 'save')) {
if (!$current
->save()) {
unset($current);
continue;
}
}
elseif ($this->entityInfo->controller->canSave) {
if (!$this->entityInfo->controller
->save($current)) {
unset($current);
continue;
}
}
else {
// Don't know how to save entity...
unset($current);
continue;
}
// Set report about updated items.
$this->report['updated']++;
// Check after save callback
$afterSave && $afterSave($current, FALSE);
}
else {
// Set report about rescheduled items.
$this->report['rescheduled']++;
}
ADD_HASH_UPDATE:
// Add to update ids.
$this->hashes
->update($ids[$hash]->id);
UNSET_VARS:
// Free some memory.
unset($ids[$hash], $current, $entity, $hash);
}
else {
// Check if this imports accepts only updates.
if ($this->updatesOnly) {
$this->report['skipped']++;
goto UNSET_E_VARS;
}
// Check before save callback
if ($beforeCreate) {
switch ($beforeCreate($entity)) {
case static::ENTITY_SKIP:
$this->report['skipped']++;
goto UNSET_E_VARS;
case static::ENTITY_SKIP_CREATE:
goto ENTITY_SAVE;
case static::ENTITY_SKIP_SAVE:
goto ENTITY_SAVED;
}
}
// Create a new entity.
if ($createCallback) {
$entity = $createCallback($entity, $this->entityInfo->name);
}
elseif ($this->entityInfo->controller->canCreate) {
$entity = $this->entityInfo->controller
->create($entity);
}
else {
$entity = (object) $entity;
}
ENTITY_SAVE:
// Check before save callback
if ($beforeSave) {
switch ($beforeSave($entity)) {
case static::ENTITY_SKIP:
$this->report['skipped']++;
goto UNSET_E_VARS;
case static::ENTITY_SKIP_SAVE:
goto ENTITY_SAVED;
}
}
// No entity to save.
if (!$entity) {
unset($entity);
continue;
}
elseif ($saveCallback) {
$saveCallback($entity);
}
elseif (method_exists($entity, 'save')) {
if (!$entity
->save()) {
unset($entity);
continue;
}
}
elseif ($this->entityInfo->controller->canSave) {
if (!$this->entityInfo->controller
->save($entity)) {
unset($entity);
continue;
}
}
else {
// Don't know how to save entity...
unset($entity);
continue;
}
ENTITY_SAVED:
// Entity might be added in static cache.
$this->staticCacheEntities++;
// Check after save callback
$afterSave && $afterSave($entity, TRUE);
// Check if is monitored.
if ($hash !== NULL) {
// Insert into feed import hash table.
$this->hashes
->insert($entity->{$this->entityInfo->idKey}, $hash);
}
// Set report about new items.
$this->report['new']++;
UNSET_E_VARS:
// Not needed anymore.
unset($entity, $hash);
}
// Remove entity cache.
if ($this->staticCacheEntities == $this->resetStaticCache) {
$this->entityInfo->controller->canResetCache && $this->entityInfo->controller
->resetCache();
$this->staticCacheEntities = 0;
}
}
}
/**
* Processes the import.
*/
public function process() {
// Give import time (for large imports).
// Well, if safe mode is on this cannot be done so it may break import.
if (!ini_get('safe_mode')) {
set_time_limit(0);
}
// Reset report.
$this->report['started'] = time();
$entities = array();
$reader = $this->reader;
if ($this->itemsCount) {
$current = 0;
// Get next item from reader.
while ($item = $reader
->get()) {
// Create the entity.
$entities[] =& $this
->createEntity($item);
// Check for save.
if (++$current == $this->itemsCount) {
unset($item);
$this
->saveEntities($entities);
$current = 0;
$entities = array();
}
}
}
else {
// Get next item from reader.
while ($item = $reader
->get()) {
// Create entity.
$entities[] =& $this
->createEntity($item);
}
}
// Check for left over entities.
if ($entities) {
$this
->saveEntities($entities);
}
// Not needed anymore.
unset($entities, $reader);
// Reset the static cache.
if ($this->staticCacheEntities && $this->resetStaticCache) {
$this->entityInfo->controller->canResetCache && $this->entityInfo->controller
->resetCache();
$this->staticCacheEntities = 0;
}
// Commit left over hash updates.
$this->hashes
->updateCommit();
// Also the new hash inserts.
$this->hashes
->insertCommit();
// Remove orphan hashes.
if ($this->orphanHashes) {
$this->hashes
->delete($this->orphanHashes);
$this->orphanHashes = array();
}
$this->report['finished'] = time();
$report = $this->report;
// Reset report
$this->report = array(
'total' => 0,
'updated' => 0,
'new' => 0,
'rescheduled' => 0,
'skipped' => 0,
'protected' => 0,
'protected_skipped' => 0,
'missing' => 0,
'started' => 0,
'finished' => 0,
'errors' => array(),
);
return $report;
}
}