View source
<?php
namespace Drupal\feed_import_base;
class FeedImportProcessor extends FeedImportConfigurable {
const UPDATE_COMBINE = 0;
const UPDATE_MERGE = 1;
const UPDATE_OVERWRITE = 2;
const UPDATE_OVERWRITE_FAST = 3;
const ACTION_DEFAULT_VALUE = 0;
const ACTION_DEFAULT_FILTERED_VALUE = 1;
const ACTION_IGNORE_FIELD = 2;
const ACTION_SKIP_ITEM = 3;
const ENTITY_CONTINUE = 0;
const ENTITY_RESCHEDULE = 1;
const ENTITY_SKIP = 2;
const ENTITY_NO_COMBINE = 3;
const ENTITY_MARK_PROTECTED = 4;
const ENTITY_SKIP_CREATE = 5;
const ENTITY_SKIP_SAVE = 6;
const TEMP_HASH = '$_FI~HASH_$';
protected $fields = array();
protected $staticFields = array();
protected $baseEntity = array();
protected $fieldsInfo = array();
protected $uniq;
protected $group;
protected $machineName;
protected $skipImportedItems = FALSE;
protected $updatesOnly = FALSE;
protected $entityInfo;
protected $reader;
protected $hashes;
protected $filters;
protected $itemsCount = 0;
protected $resetStaticCache = 0;
protected $staticCacheEntities = 0;
protected $beforeCombine = FALSE;
protected $afterCombine = FALSE;
protected $beforeCreate = FALSE;
protected $beforeSave = FALSE;
protected $afterSave = FALSE;
protected $orphanHashes = array();
protected $NULL = NULL;
protected $skipDefFunChk = FALSE;
protected $breakOnUndefinedFilter = TRUE;
protected $report = array(
'total' => 0,
'updated' => 0,
'new' => 0,
'rescheduled' => 0,
'skipped' => 0,
'protected' => 0,
'protected_skipped' => 0,
'missing' => 0,
'started' => 0,
'finished' => 0,
'errors' => array(),
);
public $current;
public $prefilteredValue;
protected $uniqAlter = FALSE;
public function __construct($machine_name) {
$this->machineName = $machine_name;
}
protected $errHandlerEnabled = FALSE;
public function setErrorHandler($enable) {
if ($enable) {
if (!$this->errHandlerEnabled) {
$this->errHandlerEnabled = TRUE;
set_error_handler(array(
$this,
'errorHandler',
));
}
}
elseif ($this->errHandlerEnabled) {
$this->errHandlerEnabled = FALSE;
restore_error_handler();
}
}
protected $errorsLeft = 100;
protected $throwException = TRUE;
public function errorHandler($errno, $errmsg, $file, $line) {
if (error_reporting() == 0) {
return FALSE;
}
if ($this->errorsLeft) {
$this->report['errors'][] = array(
'error' => $errmsg,
'error number' => $errno,
'line' => $line,
'file' => $file,
);
$this->errorsLeft--;
}
if ($this->throwException) {
throw new \Exception("(Uncaught Feed Import Exception) [{$errmsg}; file: {$file}; line: {$line}]", $errno);
}
}
public function getErrors() {
return $this->report['errors'];
}
public function addError($errmsg, $errno = 0, $line = NULL, $file = NULL) {
$this->report['errors'][] = array(
'error' => $errmsg,
'error number' => $errno,
'line' => $line,
'file' => $file,
);
}
public function setOptions(array $options, $overwrite = FALSE) {
if (isset($options['items_count']) && $options['items_count'] >= 0) {
$this->itemsCount = (int) $options['items_count'];
}
if (isset($options['skip_imported'])) {
$this->skipImportedItems = (bool) $options['skip_imported'];
}
if (isset($options['max_reported_errors'])) {
$this->errorsLeft = (int) $options['max_reported_errors'];
}
if (isset($options['throw_exception'])) {
$this->throwException = (bool) $options['throw_exception'];
}
if (isset($options['reset_cache']) && $options['reset_cache'] >= 0) {
$this->resetStaticCache = (int) $options['reset_cache'];
}
if (isset($options['skip_defined_functions_check'])) {
$this->skipDefFunChk = (bool) $options['skip_defined_functions_check'];
}
if (isset($options['break_on_undefined_filter'])) {
$this->breakOnUndefinedFilter = (bool) $options['break_on_undefined_filter'];
}
if (isset($options['updates_only'])) {
$this->updatesOnly = (bool) $options['updates_only'];
}
$cbks = array(
'before_combine' => &$this->beforeCombine,
'after_combine' => &$this->afterCombine,
'before_create' => &$this->beforeCreate,
'before_save' => &$this->beforeSave,
'after_save' => &$this->afterSave,
'uniq_callback' => &$this->uniqAlter,
);
foreach ($cbks as $opt => &$val) {
if (!empty($options[$opt])) {
$val = $options[$opt];
}
}
return TRUE;
}
public function setEntityInfo($entity_info) {
if (!$entity_info) {
$this
->addError('No entity info provided');
return FALSE;
}
$this->entityInfo = $entity_info;
return TRUE;
}
public function setFilter(FeedImportMultiFilter $filter, $functions = array()) {
$this->filters = $filter;
if ($functions) {
foreach ($functions as &$f) {
if ($this->skipDefFunChk && function_exists($f['name'])) {
continue;
}
$msg = $this->filters
->createFunction($f['name'], $f['args'], $f['body']);
if ($msg !== TRUE) {
$this
->addError($msg);
return FALSE;
}
}
}
return TRUE;
}
public function setReader(FeedImportReader $reader) {
$this->reader = $reader;
$this->throwException = FALSE;
$ok = $this->reader
->init();
$this->throwException = TRUE;
return $ok;
}
public function setUniq($uniq) {
if ($uniq == NULL) {
return TRUE;
}
if ($this->reader) {
$this->uniq = $this->reader
->formatPath($uniq);
if (is_scalar($this->uniq)) {
if ((string) $this->uniq === '') {
$this->uniq = NULL;
}
}
elseif (empty($this->uniq)) {
$this->uniq = NULL;
}
return TRUE;
}
$this
->addError('You must have a reader to add unique path');
return FALSE;
}
public function setHashManager(FeedImportHashManager $hm) {
$this->hashes = $hm;
return TRUE;
}
public function setFields(array $fields = array(), array $static_fields = array(), array $compare_functions = array(), array $merge_classes = array(), $default_compare = '_feed_import_base_compare_other_fields') {
if (!$this->entityInfo || !$this->reader || !$this->filters || !empty($this->baseEntity)) {
$this
->addError('Could not set fields because there are missing dependencies or fields were already added');
return FALSE;
}
if (!isset($merge_classes[static::UPDATE_COMBINE])) {
$this
->addError(sprintf('Merge field default class found!'));
return FALSE;
}
$class = $merge_classes[static::UPDATE_COMBINE];
if (!class_exists($class)) {
$this
->addError(sprintf('Merge field default class %s was not found!', $class));
return FALSE;
}
elseif (!is_subclass_of($class, 'FeedImportMergeField')) {
$this
->addError(sprintf('Merge field default class %s must extend % class!', $class, 'FeedImportMergeField'));
return FALSE;
}
$merge_classes[static::UPDATE_COMBINE] = new $class();
$fp = array(
$this->reader,
'formatPath',
);
$lk = $this->entityInfo->langKey;
$this->staticFields =& $static_fields;
foreach ($fields as &$f) {
$this->fieldsInfo[$f['field']] =& $f;
if (!isset($merge_classes[$f['update_mode']])) {
$this
->addError(sprintf('No field merge class found for field %s!', $f['field']));
return FALSE;
}
if (is_string($merge_classes[$f['update_mode']])) {
$class = $merge_classes[$f['update_mode']];
if (!class_exists($class)) {
$this
->addError(sprintf('Merge field class %s for field %s was not found!', $class, $f['field']));
return FALSE;
}
elseif (!is_subclass_of($class, 'FeedImportMergeField')) {
$this
->addError(sprintf('Merge field class %s for field %s must extend % class!', $class, $f['field'], 'FeedImportMergeField'));
return FALSE;
}
$merge_classes[$f['update_mode']] = new $class();
}
$f['update_mode'] = $merge_classes[$f['update_mode']];
$f['overwrite'] = $f['update_mode']
->overwriteEmpty();
$f['cardinality'] = isset($this->entityInfo->fields[$f['field']]['cardinality']) ? $this->entityInfo->fields[$f['field']]['cardinality'] : 1;
if ($f['column']) {
$f['compare'] = $this->entityInfo->fields[$f['field']]['module'] . ':' . $this->entityInfo->fields[$f['field']]['type'];
if (isset($compare_functions[$f['compare']])) {
$f['compare'] = $compare_functions[$f['compare']];
}
else {
$f['compare'] = $default_compare;
}
}
if ($f['column'] === TRUE && isset($this->entityInfo->fields[$f['field']])) {
$f['column'] = $this->entityInfo->fields[$f['field']]['column'];
}
$f['paths_count'] = count($f['paths']);
if (!$f['paths_count'] && $f['default_action'] == 'default_value') {
if ($f['column']) {
$this->staticFields[$f['field']][$f['column']] = $f['default_value'];
}
else {
$this->staticFields[$f['field']] = $f['default_value'];
}
$f['prefilters'] = $f['filters'] = FALSE;
continue;
}
foreach (array(
'prefilters',
'filters',
) as $fgroup) {
if ($f[$fgroup]) {
$ok = TRUE;
foreach ($f[$fgroup] as &$v) {
if (!$this->filters
->add($fgroup, $f['field'], $v['function'], $v['params'])) {
$ok = FALSE;
$this
->addError(sprintf('Could not add function %s in %s for %s field because is not defined', $v['function'], $fgroup, $f['field']));
if ($this->breakOnUndefinedFilter) {
$this
->addError('Import stopped because a filter function is missing!');
return FALSE;
}
}
}
$f[$fgroup] = $ok;
}
else {
$f[$fgroup] = FALSE;
}
}
$f['paths'] = array_map($fp, $f['paths']);
$this->fields[$f['field']] =& $f;
}
unset($f);
foreach ($this->staticFields as $f => &$value) {
if (is_scalar($value)) {
if (!isset($this->fieldsInfo[$f])) {
$this->fieldsInfo[$f] = array(
'field' => $f,
'column' => FALSE,
'update_mode' => $merge_classes[static::UPDATE_COMBINE],
'overwrite' => $merge_classes[static::UPDATE_COMBINE]
->overwriteEmpty(),
'cardinality' => 1,
);
}
$this->baseEntity[$f] = $value;
unset($this->staticFields[$f]);
}
elseif (!isset($this->fieldsInfo[$f])) {
$this->fieldsInfo[$f] = array(
'field' => $f,
'column' => isset($this->entityInfo->fields[$f]) ? $this->entityInfo->fields[$f]['column'] : 'value',
'update_mode' => $merge_classes[static::UPDATE_COMBINE],
'overwrite' => $merge_classes[static::UPDATE_COMBINE]
->overwriteEmpty(),
'cardinality' => isset($this->entityInfo->fields[$f]['cardinality']) ? $this->entityInfo->fields[$f]['cardinality'] : 1,
);
}
}
if (!isset($this->baseEntity[$lk]) && !isset($this->fields[$lk])) {
$this->baseEntity[$lk] = \Drupal\Core\Language\Language::LANGCODE_NOT_SPECIFIED;
}
if (isset($this->baseEntity[$lk])) {
$lang = $this->baseEntity[$lk];
foreach ($this->staticFields as $f => &$value) {
if (!isset($this->fields[$f])) {
$this->baseEntity[$f][$lang][] = $value;
unset($this->staticFields[$f]);
}
}
}
else {
$this->fields = array_merge(array(
$lk => NULL,
), $this->fields);
$this->baseEntity[$lk] = \Drupal\Core\Language\Language::LANGCODE_NOT_SPECIFIED;
}
return TRUE;
}
public function setCustomSettings(array $settings) {
return TRUE;
}
protected function hasContent(&$var) {
if (is_scalar($var)) {
return (string) $var !== '';
}
return !empty($var);
}
protected function attachField(array &$entity, &$field, &$value) {
if ($field['column']) {
$fn =& $field['field'];
$fc =& $field['column'];
$lang =& $entity[$this->entityInfo->langKey];
if (!isset($entity[$fn][$lang])) {
$entity[$fn][$lang] = array();
}
$ef =& $entity[$fn][$lang];
$st = FALSE;
if (!empty($this->staticFields[$fn])) {
$st =& $this->staticFields[$fn];
}
if (!is_array($value)) {
$value = array(
$value,
);
}
elseif ($field['cardinality'] != -1 && ($cnt = count($value)) > $field['cardinality']) {
array_splice($value, $field['cardinality'], $cnt - $field['cardinality'], NULL);
}
if ($st) {
$fa = array(
$fc => NULL,
);
$fv =& $fa[$fc];
foreach ($value as &$v) {
if (is_object($v)) {
$ef[] = (array) $v + $st;
}
else {
$fv = $v;
$ef[] = $fa + $st;
}
}
}
else {
foreach ($value as &$v) {
if (is_object($v)) {
$ef[] = (array) $v;
}
else {
$ef[][$fc] = $v;
}
}
}
}
else {
if (is_array($value) || is_object($value)) {
$value = reset($value);
}
$entity[$field['field']] = $value;
}
}
protected function &createEntity(&$item) {
$entity = $this->baseEntity;
$this->current =& $entity;
if ($this->uniq === NULL) {
$entity[static::TEMP_HASH] = NULL;
}
else {
$uniq = $this->reader
->map($item, $this->uniq);
if (is_array($uniq)) {
$uniq = isset($uniq[0]) ? $uniq[0] : reset($uniq);
}
if ($this->uniqAlter) {
$uniq = call_user_func($this->uniqAlter, $uniq);
}
$entity[static::TEMP_HASH] = $this->hashes
->hash($uniq);
}
foreach ($this->fields as $name => &$field) {
$val = NULL;
$i = -1;
while (++$i < $field['paths_count']) {
$val = $this->reader
->map($item, $field['paths'][$i]);
if ($field['prefilters']) {
$this->prefilteredValue = $this->filters
->apply("prefilters", $name, $val);
if (!$this
->hasContent($this->prefilteredValue)) {
$val = $this->prefilteredValue = NULL;
continue;
}
}
if ($this
->hasContent($val)) {
if ($field['filters']) {
$val = $this->filters
->apply("filters", $name, $val);
if (!$this
->hasContent($val)) {
$val = NULL;
}
}
break;
}
else {
$val = NULL;
}
}
if ($val === NULL) {
switch ($field['default_action']) {
case static::ACTION_DEFAULT_VALUE:
default:
$val = $field['default_value'];
break;
case static::ACTION_DEFAULT_FILTERED_VALUE:
$val = $this->filters
->apply("filters", $name, $field['default_value']);
break;
case static::ACTION_SKIP_ITEM:
$this->report['skipped']++;
$this->current = $this->prefilteredValue = NULL;
return $this->NULL;
case static::ACTION_IGNORE_FIELD:
$this->prefilteredValue = NULL;
continue 2;
}
}
$this->prefilteredValue = NULL;
$this
->attachField($entity, $field, $val);
unset($val);
}
unset($this->current);
$this->current = NULL;
return $entity;
}
protected function combineEntities(array &$new, $curr) {
$changed = FALSE;
$lang =& $new[$this->entityInfo->langKey];
foreach ($this->fieldsInfo as &$field) {
$fn =& $field['field'];
$fc =& $field['column'];
if ($fc) {
if (isset($new[$fn][$lang])) {
if (isset($curr->{$fn}[$lang])) {
if ($field['update_mode']
->merge($curr->{$fn}[$lang], $new[$fn][$lang], $field)) {
$changed = TRUE;
}
}
else {
$curr->{$fn}[$lang] = $new[$fn][$lang];
$changed = TRUE;
}
}
elseif ($field['overwrite'] && isset($curr->{$fn}[$lang])) {
unset($curr->{$fn}[$lang]);
if (empty($curr->{$fn})) {
unset($curr->{$fn});
}
$changed = TRUE;
}
}
else {
if (isset($new[$fn])) {
if (isset($curr->{$fn}) && $curr->{$fn} == $new[$fn]) {
continue;
}
$curr->{$fn} = $new[$fn];
$changed = TRUE;
unset($new[$fn]);
}
elseif ($field['overwrite'] && isset($curr->{$fn})) {
unset($curr->{$fn});
$changed = TRUE;
}
}
}
return $changed;
}
protected function saveEntities(array &$entities) {
static $loadArr = array(
NULL,
);
$total = count($entities);
$this->report['total'] += $total;
$ids = $this->hashes
->get();
$createCallback =& $this->entityInfo->createCallback;
$saveCallback =& $this->entityInfo->saveCallback;
$beforeCombine =& $this->beforeCombine;
$afterCombine =& $this->afterCombine;
$beforeCreate =& $this->beforeCreate;
$beforeSave =& $this->beforeSave;
$afterSave =& $this->afterSave;
$loadId =& $loadArr[0];
$i = -1;
while (++$i < $total) {
$entity =& $entities[$i];
unset($entities[$i]);
if ($entity == NULL) {
continue;
}
$hash =& $entity[static::TEMP_HASH];
unset($entity[static::TEMP_HASH]);
if ($hash !== NULL && isset($ids[$hash])) {
if ($this->skipImportedItems || $ids[$hash]->expire == FeedImportHashManager::MARK_PROTECTED) {
$this->report[$this->skipImportedItems ? 'skipped' : 'protected_skipped']++;
unset($ids[$hash]);
continue;
}
$loadId = $ids[$hash]->entity_id;
$current = \Drupal::entityManager()
->getStorage($this->entityInfo->name);
if (!isset($current[$loadId])) {
$this->report['missing']++;
$this->orphanHashes[] = $ids[$hash]->id;
unset($ids[$hash], $current);
continue;
}
$current =& $current[$loadId];
$this->staticCacheEntities++;
$changed = FALSE;
if ($beforeCombine) {
switch ($beforeCombine($entity, $current, $changed)) {
case static::ENTITY_RESCHEDULE:
$this->report['rescheduled']++;
goto ADD_HASH_UPDATE;
case static::ENTITY_SKIP:
$this->report['skipped']++;
goto UNSET_VARS;
case static::ENTITY_NO_COMBINE:
goto COMBINE_FINISHED;
case static::ENTITY_MARK_PROTECTED:
$this->report['protected']++;
$this->hashes
->protect($ids[$hash]->id);
goto UNSET_VARS;
}
}
$changed = $this
->combineEntities($entity, $current);
if ($afterCombine) {
switch ($afterCombine($entity, $current, $changed)) {
case static::ENTITY_RESCHEDULE:
$this->report['rescheduled']++;
goto ADD_HASH_UPDATE;
case static::ENTITY_SKIP:
$this->report['skipped']++;
goto UNSET_VARS;
case static::ENTITY_MARK_PROTECTED:
$this->report['protected']++;
$this->hashes
->protect($ids[$hash]->id);
goto UNSET_VARS;
}
}
COMBINE_FINISHED:
unset($entity);
if ($changed) {
if ($saveCallback) {
$saveCallback($current);
}
elseif (method_exists($current, 'save')) {
if (!$current
->save()) {
unset($current);
continue;
}
}
elseif ($this->entityInfo->controller->canSave) {
if (!$this->entityInfo->controller
->save($current)) {
unset($current);
continue;
}
}
else {
unset($current);
continue;
}
$this->report['updated']++;
$afterSave && $afterSave($current, FALSE);
}
else {
$this->report['rescheduled']++;
}
ADD_HASH_UPDATE:
$this->hashes
->update($ids[$hash]->id);
UNSET_VARS:
unset($ids[$hash], $current, $entity, $hash);
}
else {
if ($this->updatesOnly) {
$this->report['skipped']++;
goto UNSET_E_VARS;
}
if ($beforeCreate) {
switch ($beforeCreate($entity)) {
case static::ENTITY_SKIP:
$this->report['skipped']++;
goto UNSET_E_VARS;
case static::ENTITY_SKIP_CREATE:
goto ENTITY_SAVE;
case static::ENTITY_SKIP_SAVE:
goto ENTITY_SAVED;
}
}
if ($createCallback) {
$entity = $createCallback($entity, $this->entityInfo->name);
}
elseif ($this->entityInfo->controller->canCreate) {
$entity = $this->entityInfo->controller
->create($entity);
}
else {
$entity = (object) $entity;
}
ENTITY_SAVE:
if ($beforeSave) {
switch ($beforeSave($entity)) {
case static::ENTITY_SKIP:
$this->report['skipped']++;
goto UNSET_E_VARS;
case static::ENTITY_SKIP_SAVE:
goto ENTITY_SAVED;
}
}
if (!$entity) {
unset($entity);
continue;
}
elseif ($saveCallback) {
$saveCallback($entity);
}
elseif (method_exists($entity, 'save')) {
if (!$entity
->save()) {
unset($entity);
continue;
}
}
elseif ($this->entityInfo->controller->canSave) {
if (!$this->entityInfo->controller
->save($entity)) {
unset($entity);
continue;
}
}
else {
unset($entity);
continue;
}
ENTITY_SAVED:
$this->staticCacheEntities++;
$afterSave && $afterSave($entity, TRUE);
if ($hash !== NULL) {
$this->hashes
->insert($entity->{$this->entityInfo->idKey}, $hash);
}
$this->report['new']++;
UNSET_E_VARS:
unset($entity, $hash);
}
if ($this->staticCacheEntities == $this->resetStaticCache) {
$this->entityInfo->controller->canResetCache && $this->entityInfo->controller
->resetCache();
$this->staticCacheEntities = 0;
}
}
}
public function process() {
if (!ini_get('safe_mode')) {
set_time_limit(0);
}
$this->report['started'] = time();
$entities = array();
$reader = $this->reader;
if ($this->itemsCount) {
$current = 0;
while ($item = $reader
->get()) {
$entities[] =& $this
->createEntity($item);
if (++$current == $this->itemsCount) {
unset($item);
$this
->saveEntities($entities);
$current = 0;
$entities = array();
}
}
}
else {
while ($item = $reader
->get()) {
$entities[] =& $this
->createEntity($item);
}
}
if ($entities) {
$this
->saveEntities($entities);
}
unset($entities, $reader);
if ($this->staticCacheEntities && $this->resetStaticCache) {
$this->entityInfo->controller->canResetCache && $this->entityInfo->controller
->resetCache();
$this->staticCacheEntities = 0;
}
$this->hashes
->updateCommit();
$this->hashes
->insertCommit();
if ($this->orphanHashes) {
$this->hashes
->delete($this->orphanHashes);
$this->orphanHashes = array();
}
$this->report['finished'] = time();
$report = $this->report;
$this->report = array(
'total' => 0,
'updated' => 0,
'new' => 0,
'rescheduled' => 0,
'skipped' => 0,
'protected' => 0,
'protected_skipped' => 0,
'missing' => 0,
'started' => 0,
'finished' => 0,
'errors' => array(),
);
return $report;
}
}