function migrate_content_process_import in Migrate 6
Import objects from the specified content set
Parameters
$mcsid: ID of the content set to clear
$options: Keyed array of optional options: itemlimit - Maximum number of items to process timelimit - Unix timestamp after which to stop processing idlist - Comma-separated list of source IDs to process, instead of proceeding through all unmigrated rows feedback - Keyed array controlling status feedback to the caller function - PHP function to call, passing a message to be displayed frequency - How often to call the function frequency_unit - How to interpret frequency (items or seconds)
Return value
Status of the migration process:
2 calls to migrate_content_process_import()
- migrate_content_process_batch in ./
migrate.module - Process all enabled migration processes in a browser, using the Batch API to break it into manageable chunks.
- _drush_migrate_do_import in ./
migrate.drush.inc
File
- ./
migrate.module, line 567 - This module provides tools at "administer >> content >> migrate" for analyzing data from various sources and importing them into Drupal tables.
Code
function migrate_content_process_import($mcsid, &$options = array()) {
$tblinfo = db_fetch_object(db_query("SELECT *\n FROM {migrate_content_sets}\n WHERE mcsid=%d", $mcsid));
if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
return MIGRATE_RESULT_IN_PROGRESS;
}
else {
db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d", MIGRATE_STATUS_IMPORTING, $mcsid);
}
$itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
$timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
$idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = isset($options['feedback']['frequency']) ? $options['feedback']['frequency'] : NULL;
$frequency_unit = isset($options['feedback']['frequency_unit']) ? $options['feedback']['frequency_unit'] : NULL;
}
$description = $tblinfo->description;
$desttype = $tblinfo->desttype;
$view_name = $tblinfo->view_name;
$view_args = $tblinfo->view_args;
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
$processstart = microtime(TRUE);
$memory_limit = _migrate_memory_limit();
// Assume success until proven otherwise
$return = MIGRATE_RESULT_COMPLETED;
$collist = db_query("SELECT srcfield, destfield, default_value\n FROM {migrate_content_mappings}\n WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '')\n ORDER BY mcmid", $mcsid);
$fields = array();
while ($row = db_fetch_object($collist)) {
$fields[$row->destfield]['srcfield'] = $row->srcfield;
$fields[$row->destfield]['default_value'] = $row->default_value;
}
$tblinfo->fields = $fields;
$tblinfo->maptable = $maptable;
// We pick up everything in the input view that is not already imported, and
// not already errored out
// Emulate views execute(), so we can scroll through the results ourselves
$view = views_get_view($view_name);
if (!$view) {
if ($feedback) {
$feedback(t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array(
'!view' => $view_name,
)));
}
return MIGRATE_RESULT_FAILED;
}
// Identify the content set being processed. Simplifies $view alterations.
$view->migrate_content_set = $tblinfo;
$view->is_cacheable = FALSE;
if ($view_args) {
$view
->set_arguments(explode('/', $view_args));
}
$view
->build();
// Let modules modify the view just prior to executing it.
foreach (module_implements('views_pre_execute') as $module) {
$function = $module . '_views_pre_execute';
$function($view);
}
if (isset($view->base_database)) {
$viewdb = $view->base_database;
}
else {
$viewdb = 'default';
}
// Add a left join to the map table, and only include rows not in the map
$join = new views_join();
// Views prepends <base_table>_ to column names other than the base table's
// primary key - we need to strip that here for the join to work. But, it's
// common for tables to have the tablename beginning field names (e.g.,
// table cms with PK cms_id). Deal with that as well...
$baselen = drupal_strlen($view->base_table);
if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) {
// So, which case is it? Ask the schema module...
db_set_active($viewdb);
$inspect = schema_invoke('inspect', db_prefix_tables('{' . $view->base_table . '}'));
db_set_active('default');
$tableschema = $inspect[$view->base_table];
$sourcefield = $tableschema['fields'][$sourcekey];
if (!$sourcefield) {
$joinkey = drupal_substr($sourcekey, $baselen + 1);
$sourcefield = $tableschema['fields'][$joinkey];
if (!$sourcefield) {
if ($feedback) {
$feedback(t("In view !view, can't find key !key for table !table", array(
'!view' => $view_name,
'!key' => $sourcekey,
'!table' => $view->base_table,
)));
}
return MIGRATE_RESULT_FAILED;
}
}
else {
$joinkey = $sourcekey;
}
}
else {
$joinkey = $sourcekey;
}
$join
->construct($maptable, $view->base_table, $joinkey, 'sourceid');
$view->query
->add_relationship($maptable, $join, $view->base_table);
// We want both unimported and unupdated content
$where = "{$maptable}.sourceid IS NULL OR {$maptable}.needs_update = 1";
// And as long as we have the map table, get the destination ID, the
// import hook will need it to identify the existing destination object
$view->query
->add_field($maptable, 'destid', 'destid');
$view->query
->add_where(0, $where, $view->base_table);
// Ditto for the errors table
$join = new views_join();
$join
->construct($msgtablename, $view->base_table, $joinkey, 'sourceid');
$view->query
->add_relationship($msgtablename, $join, $view->base_table);
$view->query
->add_where(0, "{$msgtablename}.sourceid IS NULL", $view->base_table);
// If running over a selected list of IDs, pass those in to the query
if ($idlist) {
$where_args = $idlist_array = array_map('trim', explode(',', $idlist));
if (is_numeric($idlist_array[0])) {
$placeholders = db_placeholders($idlist_array, 'int');
}
else {
$placeholders = db_placeholders($idlist_array, 'varchar');
}
array_unshift($where_args, $view->base_table);
$view->query
->add_where($view->options['group'], $view->base_table . ".{$joinkey} IN ({$placeholders})", $where_args);
}
// We can't seem to get $view->build() to rebuild build_info, so go straight into the query object
$query = $view->query
->query();
$query = db_rewrite_sql($query, $view->base_table, $view->base_field, array(
'view' => &$view,
));
if ($idlist) {
// Merge idlist into args since build_info hasn't been rebuilt.
$args = array_merge($view->build_info['query_args'], $idlist_array);
}
else {
$args = $view->build_info['query_args'];
}
$replacements = module_invoke_all('views_query_substitutions', $view);
$query = str_replace(array_keys($replacements), $replacements, $query);
if (is_array($args)) {
foreach ($args as $id => $arg) {
$args[$id] = str_replace(array_keys($replacements), $replacements, $arg);
}
}
// Now, make the current db name explicit if content set is pulling tables from another DB
if ($viewdb != 'default') {
global $db_url;
$url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url);
$currdb = drupal_substr($url['path'], 1);
$query = str_replace('{' . $maptable . '}', $currdb . '.' . '{' . $maptable . '}', $query);
$query = str_replace('{' . $msgtablename . '}', $currdb . '.' . '{' . $msgtablename . '}', $query);
db_set_active($viewdb);
}
//drupal_set_message($query);
timer_start('execute view query');
if ($itemlimit) {
$importlist = db_query_range($query, $args, 0, $itemlimit);
}
else {
$importlist = db_query($query, $args);
}
timer_stop('execute view query');
if ($viewdb != 'default') {
db_set_active('default');
}
$imported = 0;
timer_start('db_fetch_object');
while ($row = db_fetch_object($importlist)) {
timer_stop('db_fetch_object');
// Recheck status - permits dynamic interruption of cron jobs
$sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
$status = db_result(db_query($sql, $mcsid));
if ($status != MIGRATE_STATUS_IMPORTING) {
$return = MIGRATE_RESULT_STOPPED;
break;
}
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$return = MIGRATE_RESULT_INCOMPLETE;
break;
}
// Check for closeness to memory limit
$usage = memory_get_usage();
$pct_memory = $usage / $memory_limit;
if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
if (isset($feedback)) {
$feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch', array(
'!pct' => round($pct_memory * 100),
'!usage' => $usage,
'!limit' => $memory_limit,
)));
}
$return = MIGRATE_RESULT_INCOMPLETE;
break;
}
if (isset($feedback)) {
if ($frequency_unit == 'seconds' && time() - $lastfeedback >= $frequency || $frequency_unit == 'items' && $imported >= $frequency) {
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, MIGRATE_RESULT_INCOMPLETE);
$feedback($message);
$lastfeedback = time();
$imported = 0;
}
}
timer_start('import hooks');
$errors = migrate_invoke_all("import_{$contenttype}", $tblinfo, $row);
timer_stop('import hooks');
// Ok, we're done. Preview the node or save it (if no errors).
if (count($errors)) {
$success = TRUE;
foreach ($errors as $error) {
if (!isset($error['level'])) {
$error['level'] = MIGRATE_MESSAGE_ERROR;
}
if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) {
$success = FALSE;
}
db_query("INSERT INTO {" . $msgtablename . "}\n (sourceid, level, message)\n VALUES('%s', %d, '%s')", $row->{$sourcekey}, $error['level'], $error['message']);
}
if ($success) {
$imported++;
}
}
else {
$imported++;
}
timer_start('db_fetch_object');
}
timer_stop('db_fetch_object');
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $return);
// Remember we're done
$tblinfo->status = MIGRATE_STATUS_IDLE;
if ($return == MIGRATE_RESULT_COMPLETED) {
$tblinfo->lastimported = date('Y-m-d H:i:s');
}
if (isset($feedback)) {
$feedback($message);
}
watchdog('migrate', $message);
drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
return $return;
}