You are here

function migrate_content_process_import in Migrate 6

Import objects from the specified content set

Parameters

$mcsid: ID of the content set to clear

$options: Keyed array of optional options: itemlimit - Maximum number of items to process timelimit - Unix timestamp after which to stop processing idlist - Comma-separated list of source IDs to process, instead of proceeding through all unmigrated rows feedback - Keyed array controlling status feedback to the caller function - PHP function to call, passing a message to be displayed frequency - How often to call the function frequency_unit - How to interpret frequency (items or seconds)

Return value

Status of the migration process:

2 calls to migrate_content_process_import()
migrate_content_process_batch in ./migrate.module
Process all enabled migration processes in a browser, using the Batch API to break it into manageable chunks.
_drush_migrate_do_import in ./migrate.drush.inc

File

./migrate.module, line 567
This module provides tools at "administer >> content >> migrate" for analyzing data from various sources and importing them into Drupal tables.

Code

function migrate_content_process_import($mcsid, &$options = array()) {
  $tblinfo = db_fetch_object(db_query("SELECT *\n                                   FROM {migrate_content_sets}\n                                   WHERE mcsid=%d", $mcsid));
  if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
    return MIGRATE_RESULT_IN_PROGRESS;
  }
  else {
    db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d", MIGRATE_STATUS_IMPORTING, $mcsid);
  }
  $itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
  $timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
  $idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
  $lastfeedback = time();
  if (isset($options['feedback'])) {
    $feedback = $options['feedback']['function'];
    $frequency = isset($options['feedback']['frequency']) ? $options['feedback']['frequency'] : NULL;
    $frequency_unit = isset($options['feedback']['frequency_unit']) ? $options['feedback']['frequency_unit'] : NULL;
  }
  $description = $tblinfo->description;
  $desttype = $tblinfo->desttype;
  $view_name = $tblinfo->view_name;
  $view_args = $tblinfo->view_args;
  $contenttype = $tblinfo->contenttype;
  $sourcekey = $tblinfo->sourcekey;
  $maptable = migrate_map_table_name($mcsid);
  $msgtablename = migrate_message_table_name($mcsid);
  $processstart = microtime(TRUE);
  $memory_limit = _migrate_memory_limit();

  // Assume success until proven otherwise
  $return = MIGRATE_RESULT_COMPLETED;
  $collist = db_query("SELECT srcfield, destfield, default_value\n                       FROM {migrate_content_mappings}\n                       WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '')\n                       ORDER BY mcmid", $mcsid);
  $fields = array();
  while ($row = db_fetch_object($collist)) {
    $fields[$row->destfield]['srcfield'] = $row->srcfield;
    $fields[$row->destfield]['default_value'] = $row->default_value;
  }
  $tblinfo->fields = $fields;
  $tblinfo->maptable = $maptable;

  // We pick up everything in the input view that is not already imported, and
  // not already errored out
  // Emulate views execute(), so we can scroll through the results ourselves
  $view = views_get_view($view_name);
  if (!$view) {
    if ($feedback) {
      $feedback(t('View !view does not exist - either (re)create this view, or
        remove the content set using it.', array(
        '!view' => $view_name,
      )));
    }
    return MIGRATE_RESULT_FAILED;
  }

  // Identify the content set being processed. Simplifies $view alterations.
  $view->migrate_content_set = $tblinfo;
  $view->is_cacheable = FALSE;
  if ($view_args) {
    $view
      ->set_arguments(explode('/', $view_args));
  }
  $view
    ->build();

  // Let modules modify the view just prior to executing it.
  foreach (module_implements('views_pre_execute') as $module) {
    $function = $module . '_views_pre_execute';
    $function($view);
  }
  if (isset($view->base_database)) {
    $viewdb = $view->base_database;
  }
  else {
    $viewdb = 'default';
  }

  // Add a left join to the map table, and only include rows not in the map
  $join = new views_join();

  // Views prepends <base_table>_ to column names other than the base table's
  // primary key - we need to strip that here for the join to work. But, it's
  // common for tables to have the tablename beginning field names (e.g.,
  // table cms with PK cms_id). Deal with that as well...
  $baselen = drupal_strlen($view->base_table);
  if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) {

    // So, which case is it? Ask the schema module...
    db_set_active($viewdb);
    $inspect = schema_invoke('inspect', db_prefix_tables('{' . $view->base_table . '}'));
    db_set_active('default');
    $tableschema = $inspect[$view->base_table];
    $sourcefield = $tableschema['fields'][$sourcekey];
    if (!$sourcefield) {
      $joinkey = drupal_substr($sourcekey, $baselen + 1);
      $sourcefield = $tableschema['fields'][$joinkey];
      if (!$sourcefield) {
        if ($feedback) {
          $feedback(t("In view !view, can't find key !key for table !table", array(
            '!view' => $view_name,
            '!key' => $sourcekey,
            '!table' => $view->base_table,
          )));
        }
        return MIGRATE_RESULT_FAILED;
      }
    }
    else {
      $joinkey = $sourcekey;
    }
  }
  else {
    $joinkey = $sourcekey;
  }
  $join
    ->construct($maptable, $view->base_table, $joinkey, 'sourceid');
  $view->query
    ->add_relationship($maptable, $join, $view->base_table);

  // We want both unimported and unupdated content
  $where = "{$maptable}.sourceid IS NULL OR {$maptable}.needs_update = 1";

  // And as long as we have the map table, get the destination ID, the
  // import hook will need it to identify the existing destination object
  $view->query
    ->add_field($maptable, 'destid', 'destid');
  $view->query
    ->add_where(0, $where, $view->base_table);

  // Ditto for the errors table
  $join = new views_join();
  $join
    ->construct($msgtablename, $view->base_table, $joinkey, 'sourceid');
  $view->query
    ->add_relationship($msgtablename, $join, $view->base_table);
  $view->query
    ->add_where(0, "{$msgtablename}.sourceid IS NULL", $view->base_table);

  // If running over a selected list of IDs, pass those in to the query
  if ($idlist) {
    $where_args = $idlist_array = array_map('trim', explode(',', $idlist));
    if (is_numeric($idlist_array[0])) {
      $placeholders = db_placeholders($idlist_array, 'int');
    }
    else {
      $placeholders = db_placeholders($idlist_array, 'varchar');
    }
    array_unshift($where_args, $view->base_table);
    $view->query
      ->add_where($view->options['group'], $view->base_table . ".{$joinkey} IN ({$placeholders})", $where_args);
  }

  // We can't seem to get $view->build() to rebuild build_info, so go straight into the query object
  $query = $view->query
    ->query();
  $query = db_rewrite_sql($query, $view->base_table, $view->base_field, array(
    'view' => &$view,
  ));
  if ($idlist) {

    // Merge idlist into args since build_info hasn't been rebuilt.
    $args = array_merge($view->build_info['query_args'], $idlist_array);
  }
  else {
    $args = $view->build_info['query_args'];
  }
  $replacements = module_invoke_all('views_query_substitutions', $view);
  $query = str_replace(array_keys($replacements), $replacements, $query);
  if (is_array($args)) {
    foreach ($args as $id => $arg) {
      $args[$id] = str_replace(array_keys($replacements), $replacements, $arg);
    }
  }

  // Now, make the current db name explicit if content set is pulling tables from another DB
  if ($viewdb != 'default') {
    global $db_url;
    $url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url);
    $currdb = drupal_substr($url['path'], 1);
    $query = str_replace('{' . $maptable . '}', $currdb . '.' . '{' . $maptable . '}', $query);
    $query = str_replace('{' . $msgtablename . '}', $currdb . '.' . '{' . $msgtablename . '}', $query);
    db_set_active($viewdb);
  }

  //drupal_set_message($query);
  timer_start('execute view query');
  if ($itemlimit) {
    $importlist = db_query_range($query, $args, 0, $itemlimit);
  }
  else {
    $importlist = db_query($query, $args);
  }
  timer_stop('execute view query');
  if ($viewdb != 'default') {
    db_set_active('default');
  }
  $imported = 0;
  timer_start('db_fetch_object');
  while ($row = db_fetch_object($importlist)) {
    timer_stop('db_fetch_object');

    // Recheck status - permits dynamic interruption of cron jobs
    $sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
    $status = db_result(db_query($sql, $mcsid));
    if ($status != MIGRATE_STATUS_IMPORTING) {
      $return = MIGRATE_RESULT_STOPPED;
      break;
    }

    // Check for time out if there is time info present
    if (isset($timelimit) && time() >= $timelimit) {
      $return = MIGRATE_RESULT_INCOMPLETE;
      break;
    }

    // Check for closeness to memory limit
    $usage = memory_get_usage();
    $pct_memory = $usage / $memory_limit;
    if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
      if (isset($feedback)) {
        $feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch', array(
          '!pct' => round($pct_memory * 100),
          '!usage' => $usage,
          '!limit' => $memory_limit,
        )));
      }
      $return = MIGRATE_RESULT_INCOMPLETE;
      break;
    }
    if (isset($feedback)) {
      if ($frequency_unit == 'seconds' && time() - $lastfeedback >= $frequency || $frequency_unit == 'items' && $imported >= $frequency) {
        $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, MIGRATE_RESULT_INCOMPLETE);
        $feedback($message);
        $lastfeedback = time();
        $imported = 0;
      }
    }
    timer_start('import hooks');
    $errors = migrate_invoke_all("import_{$contenttype}", $tblinfo, $row);
    timer_stop('import hooks');

    // Ok, we're done. Preview the node or save it (if no errors).
    if (count($errors)) {
      $success = TRUE;
      foreach ($errors as $error) {
        if (!isset($error['level'])) {
          $error['level'] = MIGRATE_MESSAGE_ERROR;
        }
        if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) {
          $success = FALSE;
        }
        db_query("INSERT INTO {" . $msgtablename . "}\n                  (sourceid, level, message)\n                  VALUES('%s', %d, '%s')", $row->{$sourcekey}, $error['level'], $error['message']);
      }
      if ($success) {
        $imported++;
      }
    }
    else {
      $imported++;
    }
    timer_start('db_fetch_object');
  }
  timer_stop('db_fetch_object');
  $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $return);

  // Remember we're done
  $tblinfo->status = MIGRATE_STATUS_IDLE;
  if ($return == MIGRATE_RESULT_COMPLETED) {
    $tblinfo->lastimported = date('Y-m-d H:i:s');
  }
  if (isset($feedback)) {
    $feedback($message);
  }
  watchdog('migrate', $message);
  drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
  return $return;
}