You are here

migrate.module in Migrate 6

Same filename and directory in other branches
  1. 6.2 migrate.module
  2. 7.2 migrate.module

This module provides tools at "administer >> content >> migrate" for analyzing data from various sources and importing them into Drupal tables.

File

migrate.module
View source
<?php

define('MIGRATE_API_VERSION', 1);
define('MIGRATE_ACCESS_BASIC', 'basic migration tools');
define('MIGRATE_ACCESS_ADVANCED', 'advanced migration tools');
define('MIGRATE_MESSAGE_ERROR', 1);
define('MIGRATE_MESSAGE_WARNING', 2);
define('MIGRATE_MESSAGE_NOTICE', 3);
define('MIGRATE_MESSAGE_INFORMATIONAL', 4);
define('MIGRATE_STATUS_IDLE', 0);
define('MIGRATE_STATUS_IMPORTING', 1);
define('MIGRATE_STATUS_CLEARING', 2);
define('MIGRATE_RESULT_COMPLETED', 1);
define('MIGRATE_RESULT_INCOMPLETE', 2);
define('MIGRATE_RESULT_STOPPED', 3);
define('MIGRATE_RESULT_FAILED', 4);
define('MIGRATE_RESULT_IN_PROGRESS', 5);
define('MIGRATE_MEMORY_THRESHOLD', 0.8);

/**
 * @file
 * This module provides tools at "administer >> content >> migrate"
 * for analyzing data from various sources and importing them into Drupal tables.
 */

/**
 * Call a migrate hook. Like module_invoke_all, but gives modules a chance
 * to do one-time initialization before any of their hooks are called, and
 * adds "migrate" to the hook name.
 * @param $hook
 *  Hook to invoke (e.g., 'types', 'fields_node', etc.)
 * @return
 *  Merged array of results.
 */
function migrate_invoke_all($hook) {

  // Let modules do any one-time initialization (e.g., including migration support files)
  global $_migrate_inited;
  if (!isset($_migrate_inited)) {
    module_invoke_all('migrate_init');
    $_migrate_inited = TRUE;
  }
  $args = func_get_args();
  $hookfunc = "migrate" . "_{$hook}";
  unset($args[0]);
  $return = array();
  $modulelist = module_implements($hookfunc);
  foreach ($modulelist as $module) {
    $function = $module . '_' . $hookfunc;
    $result = call_user_func_array($function, $args);
    if (isset($result) && is_array($result)) {
      $return = array_merge_recursive($return, $result);
    }
    elseif (isset($result)) {
      $return[] = $result;
    }
  }
  return $return;
}

/**
 * Call a destination hook (e.g., hook_migrate_prepare_node). Use this version
 * for hooks with the precise signature below, so that the object can be passed by
 * reference.
 *
 * @param $hook
 *  Hook to invoke (e.g., 'types', 'fields_node', etc.)
 * @param $object
 *  Destination object being built - passed by reference, so hooks can modify it
 * @param $tblinfo
 *  Metadata about the content set
 * @param $row
 *  The raw source row.
 * @return
 *  Merged array of results.
 */
function migrate_destination_invoke_all($hook, &$object, $tblinfo, $row) {

  // Let modules do any one-time initialization (e.g., including migration support files)
  global $_migrate_inited;
  if (!isset($_migrate_inited)) {
    module_invoke_all('migrate_init');
    $_migrate_inited = TRUE;
  }

  // We could have used module_invoke_all, but unfortunately
  // module_invoke_all passes all arguments by value.
  $errors = array();
  $hook = 'migrate_' . $hook;
  foreach (module_implements($hook) as $module_name) {
    $function = $module_name . '_' . $hook;
    if (function_exists($function)) {
      timer_start($function);
      $errors = array_merge($errors, (array) $function($object, $tblinfo, $row));
      timer_stop($function);
    }
  }
  return $errors;
}

/**
 * Save a new or updated content set
 *
 * @param $content_set
 *  An array or object representing the content set. This is passed by reference (so
 *  when adding a new content set the ID can be set)
 * @param $options
 *  Array of additional options for saving the content set. Currently:
 *    base_table: The base table of the view - if provided, we don't need
 *                to load the view.
 *    base_database: The database of the base table - if base_table is present
 *                and base_database omitted, it defaults to 'default'
 * @return
 *  The ID of the content set that was saved, or NULL if nothing was saved
 */
function migrate_save_content_set(&$content_set, $options = array()) {

  // Deal with objects internally (but remember if we need to put the parameter
  // back to an array)
  if (is_array($content_set)) {
    $was_array = TRUE;
    $content_set = (object) $content_set;
  }
  else {
    $was_array = FALSE;
  }
  $schema_change = FALSE;

  // Update or insert the content set record as appropriate
  if (isset($content_set->mcsid)) {

    // If machine_name changes, need to rename the map/message tables
    $old_machine_name = db_query("SELECT machine_name FROM {migrate_content_sets}\n                                  WHERE mcsid=%d", $content_set->mcsid);
    if ($old_machine_name != $content_set->machine_name) {
      $old_maptablename = migrate_map_table_name($content_set->mcsid);
      $old_msgtablename = migrate_message_table_name($content_set->mcsid);
    }
    drupal_write_record('migrate_content_sets', $content_set, 'mcsid');
    if (isset($old_maptablename) && db_table_exists($old_maptablename)) {
      $ret = array();
      $new_maptablename = migrate_map_table_name($content_set->mcsid);
      db_rename_table($ret, $old_maptablename, $new_maptablename);
      $schema_change = TRUE;
    }
    if (isset($old_msgtablename) && db_table_exists($old_msgtablename)) {
      $ret = array();
      $new_msgtablename = migrate_message_table_name($content_set->mcsid);
      db_rename_table($ret, $old_msgtablename, $new_msgtablename);
      $schema_change = TRUE;
    }
  }
  else {
    drupal_write_record('migrate_content_sets', $content_set);
  }

  // Create or modify map and message tables
  $maptablename = migrate_map_table_name($content_set->mcsid);
  $msgtablename = migrate_message_table_name($content_set->mcsid);

  // TODO: For now, PK must be in base_table
  // If the caller tells us the base table of the view, we don't need
  // to load the view (which would not work when called from hook_install())
  if (isset($options['base_table'])) {
    $tablename = $options['base_table'];
    if (isset($options['base_database'])) {
      $tabledb = $options['base_database'];
    }
    else {
      $tabledb = 'default';
    }
  }
  else {

    // Get the proper field definition for the sourcekey
    $view = views_get_view($content_set->view_name);
    if (!$view) {
      drupal_set_message(t('View !view does not exist - either (re)create this view, or
        remove the content set using it.', array(
        '!view' => $content_set->view_name,
      )));
      return NULL;
    }

    // Must do this to load the database
    $views_version = (string) views_api_version();
    if (views_api_version() >= '3') {
      $view
        ->init_display('default');
    }
    $view
      ->init_query();
    if (isset($view->base_database)) {
      $tabledb = $view->base_database;
    }
    else {
      $tabledb = 'default';
    }
    $tablename = $view->base_table;
  }
  $sourceschema = _migrate_inspect_schema($tablename, $tabledb);

  // If the PK of the content set is defined, make sure we have a mapping table
  if (isset($content_set->sourcekey) && $content_set->sourcekey) {
    $sourcefield = $sourceschema['fields'][$content_set->sourcekey];

    // The field name might be <table>_<column>...
    if (!$sourcefield) {
      $sourcekey = drupal_substr($content_set->sourcekey, drupal_strlen($tablename) + 1);
      $sourcefield = $sourceschema['fields'][$sourcekey];
    }

    // But - we don't want serial fields to behave serially, so change to int
    if ($sourcefield['type'] == 'serial') {
      $sourcefield['type'] = 'int';
    }
    if (!db_table_exists($maptablename)) {
      $schema = _migrate_map_table_schema($sourcefield);
      db_create_table($ret, $maptablename, $schema);

      // Expose map table to views
      if (module_exists('tw')) {
        tw_add_tables(array(
          $maptablename,
        ));
        tw_add_fk($maptablename, 'destid');
      }
      $schema = _migrate_message_table_schema($sourcefield);
      db_create_table($ret, $msgtablename, $schema);

      // Expose messages table to views
      if (module_exists('tw')) {
        tw_add_tables(array(
          $msgtablename,
        ));
        tw_add_fk($msgtablename, 'sourceid');
      }
      $schema_change = TRUE;
    }
    else {

      // TODO: Deal with varchar->int case where there is existing non-int data
      $desired_schema = _migrate_map_table_schema($sourcefield);
      $actual_schema = _migrate_inspect_schema($maptablename);
      if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
        $ret = array();
        db_drop_primary_key($ret, $maptablename);
        db_change_field($ret, $maptablename, 'sourceid', 'sourceid', $sourcefield, array(
          'primary key' => array(
            'sourceid',
          ),
        ));
        if (module_exists('tw')) {
          tw_perform_analysis($maptablename);
        }
        $schema_change = TRUE;
      }
      $desired_schema = _migrate_message_table_schema($sourcefield);
      $actual_schema = _migrate_inspect_schema($msgtablename);
      if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
        $ret = array();
        db_drop_index($ret, $msgtablename, 'sourceid');
        db_change_field($ret, $msgtablename, 'sourceid', 'sourceid', $sourcefield, array(
          'indexes' => array(
            'sourceid' => array(
              'sourceid',
            ),
          ),
        ));
        if (module_exists('tw')) {
          tw_perform_analysis($maptablename);
        }
        $schema_change = TRUE;
      }
    }

    // Make sure the schema gets updated to reflect changes
    if ($schema_change) {
      cache_clear_all('schema', 'cache');
    }
  }
  if ($was_array) {
    $content_set = (array) $content_set;
    return $content_set['mcsid'];
  }
  else {
    return $content_set->mcsid;
  }
}

/**
 * Save a new or updated content mapping
 *
 * @param $mapping
 *  An object representing the mapping. This is passed by reference (so
 *  when adding a new mapping the ID can be set)
 * @return
 *  The ID of the mapping that was saved, or NULL if nothing was saved
 */
function migrate_save_content_mapping(&$mapping) {
  if ($mapping->mcmid) {
    drupal_write_record('migrate_content_mappings', $mapping, 'mcmid');
  }
  else {
    drupal_write_record('migrate_content_mappings', $mapping);
  }
  return $mapping->mcmid;
}

/**
 * Delete the specified content set, including map and message tables.
 *
 * @param $mcsid
 *  Unique identifier of the content set to delete.
 */
function migrate_delete_content_set($mcsid) {

  // First, remove the map and message tables from the Table Wizard, and drop them
  $ret = array();
  $maptable = migrate_map_table_name($mcsid);
  $msgtable = migrate_message_table_name($mcsid);
  if (db_table_exists($maptable)) {
    tw_remove_tables(array(
      $maptable,
      $msgtable,
    ));
    db_drop_table($ret, $maptable);
    db_drop_table($ret, $msgtable);
  }

  // Then, delete the content set data
  $sql = "DELETE FROM {migrate_content_mappings} WHERE mcsid=%d";
  db_query($sql, $mcsid);
  $sql = "DELETE FROM {migrate_content_sets} WHERE mcsid=%d";
  db_query($sql, $mcsid);
}

/**
 * Delete the specified content mapping.
 *
 * @param $mcmid
 *  Unique identifier of the mapping to delete.
 */
function migrate_delete_content_mapping($mcmid) {
  $sql = "DELETE FROM {migrate_content_mappings} WHERE mcmid=%d";
  db_query($sql, $mcmid);
}

/**
 * Convenience function for generating a message array
 *
 * @param $message
 *  Text describing the error condition
 * @param $type
 *  One of the MIGRATE_MESSAGE constants, identifying the level of error
 * @return
 *  Structured array suitable for return from an import hook
 */
function migrate_message($message, $type = MIGRATE_MESSAGE_ERROR) {
  $error = array(
    'level' => $type,
    'message' => $message,
  );
  return $error;
}

/**
 * Add a mapping from source ID to destination ID for the specified content set
 *
 * @param $mcsid
 *  ID of the content set being processed
 * @param $sourceid
 *  Primary key value from the source
 * @param $destid
 *  Primary key value from the destination
 */
function migrate_add_mapping($mcsid, $sourceid, $destid) {
  static $maptables = array();
  if (!isset($maptables[$mcsid])) {
    $maptables[$mcsid] = migrate_map_table_name($mcsid);
  }
  $needs_update = db_result(db_query('SELECT needs_update
                                      FROM {' . $maptables[$mcsid] . "}\n                                      WHERE sourceid='%s'", $sourceid));
  if ($needs_update == 1) {
    db_query('UPDATE {' . $maptables[$mcsid] . "}\n              SET needs_update=0\n              WHERE sourceid='%s'", $sourceid);
  }
  elseif ($needs_update !== 0) {
    db_query('INSERT INTO {' . $maptables[$mcsid] . "}\n              (sourceid, destid, needs_update)\n              VALUES('%s', %d, 0)", $sourceid, $destid);
  }
}

/**
 * Clear migrated objects from the specified content set
 *
 * @param $mcsid
 *  ID of the content set to clear
 * @param $options
 *  Keyed array of optional options:
 *    itemlimit - Maximum number of items to process
 *    timelimit - Unix timestamp after which to stop processing
 *    idlist - Comma-separated list of source IDs to process, instead of proceeding through
 *      all unmigrated rows
 *    feedback - Keyed array controlling status feedback to the caller
 *      function - PHP function to call, passing a message to be displayed
 *      frequency - How often to call the function
 *      frequency_unit - How to interpret frequency (items or seconds)
 *
 * @return
 *  Status of the migration process:
 */
function migrate_content_process_clear($mcsid, &$options = array()) {
  $itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
  $timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
  $idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
  $lastfeedback = time();
  if (isset($options['feedback'])) {
    $feedback = $options['feedback']['function'];
    $frequency = isset($options['feedback']['frequency']) ? $options['feedback']['frequency'] : NULL;
    $frequency_unit = isset($options['feedback']['frequency_unit']) ? $options['feedback']['frequency_unit'] : NULL;
  }
  $result = db_query("SELECT *\n                      FROM {migrate_content_sets}\n                      WHERE mcsid=%d", $mcsid);
  $tblinfo = db_fetch_object($result);
  $tblinfo->maptable = $maptable;
  $description = $tblinfo->description;
  if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
    return MIGRATE_RESULT_IN_PROGRESS;
  }
  else {
    db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d", MIGRATE_STATUS_CLEARING, $mcsid);
  }
  $desttype = $tblinfo->desttype;
  $contenttype = $tblinfo->contenttype;
  $sourcekey = $tblinfo->sourcekey;
  $maptable = migrate_map_table_name($mcsid);
  $msgtablename = migrate_message_table_name($mcsid);
  $processstart = microtime(TRUE);
  $memory_limit = _migrate_memory_limit();

  // If this content set is set up to update existing content, we don't
  // want to delete the content on clear, just the map/message tables
  $sql = "SELECT srcfield FROM {migrate_content_mappings}\n          WHERE mcsid=%d AND primary_key=1";
  $srcfield = db_result(db_query($sql, $mcsid));
  if ($srcfield) {
    $full_clear = FALSE;
  }
  else {
    $full_clear = TRUE;
  }

  // Assume success until proven otherwise
  $return = MIGRATE_RESULT_COMPLETED;
  $deleted = 0;
  $args = array();
  if ($idlist) {
    $args = array_map('trim', explode(',', $idlist));
    if (is_numeric($args[0])) {
      $placeholders = db_placeholders($args, 'int');
    }
    else {
      $placeholders = db_placeholders($args, 'varchar');
    }
    $sql = "SELECT sourceid,destid FROM {" . $maptable . "} WHERE sourceid IN ({$placeholders})";
  }
  else {
    $sql = "SELECT sourceid,destid FROM {" . $maptable . "}";
  }
  timer_start('delete query');
  if ($itemlimit) {
    $deletelist = db_query_range($sql, $args, 0, $itemlimit);
  }
  else {
    $deletelist = db_query($sql, $args);
  }
  timer_stop('delete query');
  while ($row = db_fetch_object($deletelist)) {

    // Recheck status - permits dynamic interruption of jobs
    $sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
    $status = db_result(db_query($sql, $mcsid));
    if ($status != MIGRATE_STATUS_CLEARING) {
      $return = MIGRATE_RESULT_STOPPED;
      break;
    }

    // Check for time out if there is time info present
    if (isset($timelimit) && time() >= $timelimit) {
      $return = MIGRATE_RESULT_INCOMPLETE;
      break;
    }

    // Check for closeness to memory limit
    $usage = memory_get_usage();
    $pct_memory = $usage / $memory_limit;
    if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
      if (isset($feedback)) {
        $feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch', array(
          '!pct' => round($pct_memory * 100),
          '!usage' => $usage,
          '!limit' => $memory_limit,
        )));
      }
      $return = MIGRATE_RESULT_INCOMPLETE;
      break;
    }
    if (isset($feedback)) {
      if ($frequency_unit == 'seconds' && time() - $lastfeedback >= $frequency || $frequency_unit == 'items' && $deleted >= $frequency) {
        $message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, MIGRATE_RESULT_INCOMPLETE);
        $feedback($message);
        $lastfeedback = time();
        $deleted = 0;
      }
    }

    // @TODO: Should return success/failure. Problem: node_delete doesn't return anything...
    if ($full_clear) {
      timer_start('delete hooks');
      migrate_invoke_all("delete_{$contenttype}", $tblinfo, $row->destid);
      timer_stop('delete hooks');
    }
    timer_start('clear map/msg');
    db_query("DELETE FROM {" . $maptable . "} WHERE sourceid='%s'", $row->sourceid);
    db_query("DELETE FROM {" . $msgtablename . "} WHERE sourceid='%s' AND level=%d", $row->sourceid, MIGRATE_MESSAGE_INFORMATIONAL);
    timer_stop('clear map/msg');
    $deleted++;
  }

  // Mark that we're done
  $sql = "UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d";
  db_query($sql, MIGRATE_STATUS_IDLE, $mcsid);

  // If we've completed a total clear, make sure all messages are gone
  if ($return == MIGRATE_RESULT_COMPLETED && !$idlist && !$itemlimit) {
    db_query('TRUNCATE TABLE {' . $msgtablename . '}');
  }
  else {
    if ($return != MIGRATE_RESULT_INCOMPLETE) {

      // Remove old messages before beginning new import process
      db_query("DELETE FROM {" . $msgtablename . "} WHERE level <> %d", MIGRATE_MESSAGE_INFORMATIONAL);
    }
  }
  $message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $return);
  if (isset($feedback)) {
    $feedback($message);
  }
  return $return;
}

/**
 * Import objects from the specified content set
 *
 * @param $mcsid
 *  ID of the content set to clear
 * @param $options
 *  Keyed array of optional options:
 *    itemlimit - Maximum number of items to process
 *    timelimit - Unix timestamp after which to stop processing
 *    idlist - Comma-separated list of source IDs to process, instead of proceeding through
 *      all unmigrated rows
 *    feedback - Keyed array controlling status feedback to the caller
 *      function - PHP function to call, passing a message to be displayed
 *      frequency - How often to call the function
 *      frequency_unit - How to interpret frequency (items or seconds)
 *
 * @return
 *  Status of the migration process:
 */
function migrate_content_process_import($mcsid, &$options = array()) {
  $tblinfo = db_fetch_object(db_query("SELECT *\n                                   FROM {migrate_content_sets}\n                                   WHERE mcsid=%d", $mcsid));
  if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
    return MIGRATE_RESULT_IN_PROGRESS;
  }
  else {
    db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d", MIGRATE_STATUS_IMPORTING, $mcsid);
  }
  $itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
  $timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
  $idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
  $lastfeedback = time();
  if (isset($options['feedback'])) {
    $feedback = $options['feedback']['function'];
    $frequency = isset($options['feedback']['frequency']) ? $options['feedback']['frequency'] : NULL;
    $frequency_unit = isset($options['feedback']['frequency_unit']) ? $options['feedback']['frequency_unit'] : NULL;
  }
  $description = $tblinfo->description;
  $desttype = $tblinfo->desttype;
  $view_name = $tblinfo->view_name;
  $view_args = $tblinfo->view_args;
  $contenttype = $tblinfo->contenttype;
  $sourcekey = $tblinfo->sourcekey;
  $maptable = migrate_map_table_name($mcsid);
  $msgtablename = migrate_message_table_name($mcsid);
  $processstart = microtime(TRUE);
  $memory_limit = _migrate_memory_limit();

  // Assume success until proven otherwise
  $return = MIGRATE_RESULT_COMPLETED;
  $collist = db_query("SELECT srcfield, destfield, default_value\n                       FROM {migrate_content_mappings}\n                       WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '')\n                       ORDER BY mcmid", $mcsid);
  $fields = array();
  while ($row = db_fetch_object($collist)) {
    $fields[$row->destfield]['srcfield'] = $row->srcfield;
    $fields[$row->destfield]['default_value'] = $row->default_value;
  }
  $tblinfo->fields = $fields;
  $tblinfo->maptable = $maptable;

  // We pick up everything in the input view that is not already imported, and
  // not already errored out
  // Emulate views execute(), so we can scroll through the results ourselves
  $view = views_get_view($view_name);
  if (!$view) {
    if ($feedback) {
      $feedback(t('View !view does not exist - either (re)create this view, or
        remove the content set using it.', array(
        '!view' => $view_name,
      )));
    }
    return MIGRATE_RESULT_FAILED;
  }

  // Identify the content set being processed. Simplifies $view alterations.
  $view->migrate_content_set = $tblinfo;
  $view->is_cacheable = FALSE;
  if ($view_args) {
    $view
      ->set_arguments(explode('/', $view_args));
  }
  $view
    ->build();

  // Let modules modify the view just prior to executing it.
  foreach (module_implements('views_pre_execute') as $module) {
    $function = $module . '_views_pre_execute';
    $function($view);
  }
  if (isset($view->base_database)) {
    $viewdb = $view->base_database;
  }
  else {
    $viewdb = 'default';
  }

  // Add a left join to the map table, and only include rows not in the map
  $join = new views_join();

  // Views prepends <base_table>_ to column names other than the base table's
  // primary key - we need to strip that here for the join to work. But, it's
  // common for tables to have the tablename beginning field names (e.g.,
  // table cms with PK cms_id). Deal with that as well...
  $baselen = drupal_strlen($view->base_table);
  if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) {

    // So, which case is it? Ask the schema module...
    db_set_active($viewdb);
    $inspect = schema_invoke('inspect', db_prefix_tables('{' . $view->base_table . '}'));
    db_set_active('default');
    $tableschema = $inspect[$view->base_table];
    $sourcefield = $tableschema['fields'][$sourcekey];
    if (!$sourcefield) {
      $joinkey = drupal_substr($sourcekey, $baselen + 1);
      $sourcefield = $tableschema['fields'][$joinkey];
      if (!$sourcefield) {
        if ($feedback) {
          $feedback(t("In view !view, can't find key !key for table !table", array(
            '!view' => $view_name,
            '!key' => $sourcekey,
            '!table' => $view->base_table,
          )));
        }
        return MIGRATE_RESULT_FAILED;
      }
    }
    else {
      $joinkey = $sourcekey;
    }
  }
  else {
    $joinkey = $sourcekey;
  }
  $join
    ->construct($maptable, $view->base_table, $joinkey, 'sourceid');
  $view->query
    ->add_relationship($maptable, $join, $view->base_table);

  // We want both unimported and unupdated content
  $where = "{$maptable}.sourceid IS NULL OR {$maptable}.needs_update = 1";

  // And as long as we have the map table, get the destination ID, the
  // import hook will need it to identify the existing destination object
  $view->query
    ->add_field($maptable, 'destid', 'destid');
  $view->query
    ->add_where(0, $where, $view->base_table);

  // Ditto for the errors table
  $join = new views_join();
  $join
    ->construct($msgtablename, $view->base_table, $joinkey, 'sourceid');
  $view->query
    ->add_relationship($msgtablename, $join, $view->base_table);
  $view->query
    ->add_where(0, "{$msgtablename}.sourceid IS NULL", $view->base_table);

  // If running over a selected list of IDs, pass those in to the query
  if ($idlist) {
    $where_args = $idlist_array = array_map('trim', explode(',', $idlist));
    if (is_numeric($idlist_array[0])) {
      $placeholders = db_placeholders($idlist_array, 'int');
    }
    else {
      $placeholders = db_placeholders($idlist_array, 'varchar');
    }
    array_unshift($where_args, $view->base_table);
    $view->query
      ->add_where($view->options['group'], $view->base_table . ".{$joinkey} IN ({$placeholders})", $where_args);
  }

  // We can't seem to get $view->build() to rebuild build_info, so go straight into the query object
  $query = $view->query
    ->query();
  $query = db_rewrite_sql($query, $view->base_table, $view->base_field, array(
    'view' => &$view,
  ));
  if ($idlist) {

    // Merge idlist into args since build_info hasn't been rebuilt.
    $args = array_merge($view->build_info['query_args'], $idlist_array);
  }
  else {
    $args = $view->build_info['query_args'];
  }
  $replacements = module_invoke_all('views_query_substitutions', $view);
  $query = str_replace(array_keys($replacements), $replacements, $query);
  if (is_array($args)) {
    foreach ($args as $id => $arg) {
      $args[$id] = str_replace(array_keys($replacements), $replacements, $arg);
    }
  }

  // Now, make the current db name explicit if content set is pulling tables from another DB
  if ($viewdb != 'default') {
    global $db_url;
    $url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url);
    $currdb = drupal_substr($url['path'], 1);
    $query = str_replace('{' . $maptable . '}', $currdb . '.' . '{' . $maptable . '}', $query);
    $query = str_replace('{' . $msgtablename . '}', $currdb . '.' . '{' . $msgtablename . '}', $query);
    db_set_active($viewdb);
  }

  //drupal_set_message($query);
  timer_start('execute view query');
  if ($itemlimit) {
    $importlist = db_query_range($query, $args, 0, $itemlimit);
  }
  else {
    $importlist = db_query($query, $args);
  }
  timer_stop('execute view query');
  if ($viewdb != 'default') {
    db_set_active('default');
  }
  $imported = 0;
  timer_start('db_fetch_object');
  while ($row = db_fetch_object($importlist)) {
    timer_stop('db_fetch_object');

    // Recheck status - permits dynamic interruption of cron jobs
    $sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
    $status = db_result(db_query($sql, $mcsid));
    if ($status != MIGRATE_STATUS_IMPORTING) {
      $return = MIGRATE_RESULT_STOPPED;
      break;
    }

    // Check for time out if there is time info present
    if (isset($timelimit) && time() >= $timelimit) {
      $return = MIGRATE_RESULT_INCOMPLETE;
      break;
    }

    // Check for closeness to memory limit
    $usage = memory_get_usage();
    $pct_memory = $usage / $memory_limit;
    if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
      if (isset($feedback)) {
        $feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch', array(
          '!pct' => round($pct_memory * 100),
          '!usage' => $usage,
          '!limit' => $memory_limit,
        )));
      }
      $return = MIGRATE_RESULT_INCOMPLETE;
      break;
    }
    if (isset($feedback)) {
      if ($frequency_unit == 'seconds' && time() - $lastfeedback >= $frequency || $frequency_unit == 'items' && $imported >= $frequency) {
        $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, MIGRATE_RESULT_INCOMPLETE);
        $feedback($message);
        $lastfeedback = time();
        $imported = 0;
      }
    }
    timer_start('import hooks');
    $errors = migrate_invoke_all("import_{$contenttype}", $tblinfo, $row);
    timer_stop('import hooks');

    // Ok, we're done. Preview the node or save it (if no errors).
    if (count($errors)) {
      $success = TRUE;
      foreach ($errors as $error) {
        if (!isset($error['level'])) {
          $error['level'] = MIGRATE_MESSAGE_ERROR;
        }
        if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) {
          $success = FALSE;
        }
        db_query("INSERT INTO {" . $msgtablename . "}\n                  (sourceid, level, message)\n                  VALUES('%s', %d, '%s')", $row->{$sourcekey}, $error['level'], $error['message']);
      }
      if ($success) {
        $imported++;
      }
    }
    else {
      $imported++;
    }
    timer_start('db_fetch_object');
  }
  timer_stop('db_fetch_object');
  $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $return);

  // Remember we're done
  $tblinfo->status = MIGRATE_STATUS_IDLE;
  if ($return == MIGRATE_RESULT_COMPLETED) {
    $tblinfo->lastimported = date('Y-m-d H:i:s');
  }
  if (isset($feedback)) {
    $feedback($message);
  }
  watchdog('migrate', $message);
  drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
  return $return;
}

/* Revisit
function migrate_content_process_all_action(&$dummy, $action_context, $a1, $a2) {
  migrate_content_process_all(time());
}
 */

/**
 * Process all enabled migration processes in a browser, using the Batch API
 * to break it into manageable chunks.
 *
 * @param $clearing
 *  Array of content set ids (keyed by content set id) to clear
 * @param $importing
 *  Array of content set ids (keyed by content set id) to import
 * @param $limit
 *  Maximum number of items to process
 * @param $idlist
 *  Comma-separated list of source IDs to process, instead of proceeding through
 *  all unmigrated rows
 * @param $context
 *  Batch API context structure
 */
function migrate_content_process_batch($clearing, $importing, $limit, $idlist, &$context) {

  // A zero max_execution_time means no limit - but let's set a reasonable
  // limit anyway
  $starttime = time();
  $maxexectime = ini_get('max_execution_time');
  if (!$maxexectime) {
    $maxexectime = 240;
  }

  // Initialize the Batch API context
  $context['finished'] = 0;

  // The Batch API progress bar will reflect the number of operations being
  // done (clearing/importing)
  if (!isset($context['sandbox']['numops'])) {
    $context['sandbox']['numops'] = count($clearing) + count($importing);
    $context['sandbox']['numopsdone'] = 0;
    $context['sandbox']['clearing'] = $clearing;
    $context['sandbox']['importing'] = $importing;
    $context['sandbox']['message'] = '';
    $context['sandbox']['times'] = array();
  }

  // For the timelimit, subtract more than enough time to clean up
  $options = array(
    'itemlimit' => $limit,
    'timelimit' => $starttime + ($maxexectime < 5 ? $maxexectime : $maxexectime - 5),
    'idlist' => $idlist,
    'feedback' => array(
      'function' => '_migrate_process_message',
    ),
  );
  global $_migrate_messages;
  if (!isset($_migrate_messages)) {
    $_migrate_messages = array();
  }

  // Work on the last clearing op (if any)
  if (count($context['sandbox']['clearing'])) {
    $row = db_fetch_object(db_query("SELECT mcsid,description FROM {migrate_content_sets}\n       WHERE mcsid IN (%s)\n       ORDER BY weight DESC\n       LIMIT 1", implode(',', $context['sandbox']['clearing'])));
    $status = migrate_content_process_clear($row->mcsid, $options);
    if ($status != MIGRATE_RESULT_INCOMPLETE) {
      unset($context['sandbox']['clearing'][$row->mcsid]);
    }
  }
  elseif (count($context['sandbox']['importing'])) {
    $row = db_fetch_object(db_query("SELECT mcsid,description FROM {migrate_content_sets}\n       WHERE mcsid IN (%s)\n       ORDER BY weight ASC\n       LIMIT 1", implode(',', $context['sandbox']['importing'])));
    if (variable_get('migrate_update', 0)) {
      migrate_content_set_update($row->mcsid);
      variable_set('migrate_update', 0);
    }
    $status = migrate_content_process_import($row->mcsid, $options);
    if ($status != MIGRATE_RESULT_INCOMPLETE) {
      unset($context['sandbox']['importing'][$row->mcsid]);
    }
  }
  else {
    $context['finished'] = 1;
  }

  // Make sure the entire process stops if requested
  if ($status == MIGRATE_RESULT_STOPPED) {
    $context['finished'] = 1;
  }
  if ($context['finished'] != 1) {
    if ($status != MIGRATE_RESULT_INCOMPLETE) {
      $context['sandbox']['numopsdone']++;
    }
    $context['finished'] = $context['sandbox']['numopsdone'] / $context['sandbox']['numops'];
  }
  foreach ($_migrate_messages as $message) {
    if (!isset($context['sandbox']['message'])) {
      $context['sandbox']['message'] = $message . '<br />';
    }
    else {
      $context['sandbox']['message'] .= $message . '<br />';
    }
    $context['message'] = $context['sandbox']['message'];
    $context['results'][] .= $message;
  }
  $context['message'] = $context['sandbox']['message'];

  // If requested save timers for eventual display
  if (variable_get('migrate_display_timers', 0)) {
    global $timers;
    foreach ($timers as $name => $timerec) {
      if (isset($timerec['time'])) {
        if (isset($context['sandbox']['times'][$name])) {
          $context['sandbox']['times'][$name] += $timerec['time'] / 1000;
        }
        else {
          $context['sandbox']['times'][$name] = $timerec['time'] / 1000;
        }
      }
    }

    // When all done, display the timers
    if ($context['finished'] == 1 && isset($context['sandbox']['times'])) {
      global $timers;
      arsort($context['sandbox']['times']);
      foreach ($context['sandbox']['times'] as $name => $total) {
        drupal_set_message("{$name}: " . round($total, 2));
      }
    }
  }
}

/**
 * Capture messages generated during an import or clear process
 */
function _migrate_process_message($message) {
  global $_migrate_messages;
  $_migrate_messages[] = $message;
}

/**
 * Prepare a content set for updating of existing items
 * @param $mcsid
 *  ID of the content set to update
 */
function migrate_content_set_update($mcsid) {
  $maptable = migrate_map_table_name($mcsid);
  db_query('UPDATE {' . $maptable . '} SET needs_update=1');
  $msgtable = migrate_message_table_name($mcsid);
  db_query('TRUNCATE TABLE {' . $msgtable . '}');
}

/**
 * Generate a progress message
 * @param $starttime
 *  microtime() when this piece of work began
 * @param $numitems
 *  Number of items that were processed
 * @param $description
 *  Name (description) of the content set being processed
 * @param $import
 *  TRUE for an import process, FALSE for a clearing process
 * @param $status
 *  Status of the work
 * @return
 *  Formatted message
 */
function _migrate_progress_message($starttime, $numitems, $description, $import = TRUE, $result = MIGRATE_RESULT_COMPLETED) {
  $time = microtime(TRUE) - $starttime;
  if ($time > 0) {
    $perminute = round(60 * $numitems / $time);
    $time = round($time, 1);
  }
  else {
    $perminute = '?';
  }
  if ($import) {
    switch ($result) {
      case MIGRATE_RESULT_COMPLETED:
        $basetext = "Imported !numitems in !time sec (!perminute/min) - done with '!description'";
        break;
      case MIGRATE_RESULT_FAILED:
        $basetext = "Imported !numitems in !time sec (!perminute/min) - failure with '!description'";
        break;
      case MIGRATE_RESULT_INCOMPLETE:
        $basetext = "Imported !numitems in !time sec (!perminute/min) - continuing with '!description'";
        break;
      case MIGRATE_RESULT_STOPPED:
        $basetext = "Imported !numitems in !time sec (!perminute/min) - stopped '!description'";
        break;
    }
  }
  else {
    switch ($result) {
      case MIGRATE_RESULT_COMPLETED:
        $basetext = "Deleted !numitems in !time sec (!perminute/min) - done with '!description'";
        break;
      case MIGRATE_RESULT_FAILED:
        $basetext = "Deleted !numitems in !time sec (!perminute/min) - failure with '!description'";
        break;
      case MIGRATE_RESULT_INCOMPLETE:
        $basetext = "Deleted !numitems in !time sec (!perminute/min) - continuing with '!description'";
        break;
      case MIGRATE_RESULT_STOPPED:
        $basetext = "Deleted !numitems in !time sec (!perminute/min) - stopped '!description'";
        break;
    }
  }
  $message = t($basetext, array(
    '!numitems' => $numitems,
    '!time' => $time,
    '!perminute' => $perminute,
    '!description' => $description,
  ));
  return $message;
}

/*
* Implementation of hook_init().
*/
function migrate_init() {

  // Loads the hooks for the supported modules.
  // TODO: Be more lazy - only load when really needed
  migrate_module_include();

  // Add main CSS functionality.
  drupal_add_css(drupal_get_path('module', 'migrate') . '/migrate.css');
}

/**
 * Implementation of hook_action_info().
 */

/* Revisit
function migrate_action_info() {
  $info['migrate_content_process_clear'] = array(
    'type' => 'migrate',
    'description' => t('Clear a migration content set'),
    'configurable' => FALSE,
    'hooks' => array(
      'cron' => array('run'),
    ),
  );
  $info['migrate_content_process_import'] = array(
    'type' => 'migrate',
    'description' => t('Import a migration content set'),
    'configurable' => FALSE,
    'hooks' => array(
      'cron' => array('run'),
    ),
  );
  $info['migrate_content_process_all_action'] = array(
    'type' => 'migrate',
    'description' => t('Perform all active migration processes'),
    'configurable' => FALSE,
    'hooks' => array(
      'cron' => array('run'),
    ),
  );
  return $info;
}
 */

/**
 * Implementation of hook_perm().
 */
function migrate_perm() {
  return array(
    MIGRATE_ACCESS_BASIC,
    MIGRATE_ACCESS_ADVANCED,
  );
}

/**
 * Implementation of hook_help().
 */
function migrate_help($page, $arg) {
  switch ($page) {
    case 'admin/content/migrate':
      return t('<p>Defined content sets are listed here. New content sets may be added
        below, and tasks may be executed directly in the browser. A process that is
        actively running will be <span class="migrate-running">highlighted</span>.</p>');
  }
}

/**
 * Implementation of hook_menu().
 */
function migrate_menu() {
  $items = array();
  $items['admin/content/migrate'] = array(
    'title' => 'Migrate',
    'description' => 'Monitor and control the creation of Drupal content from source data',
    'page callback' => 'migrate_dashboard',
    'access arguments' => array(
      MIGRATE_ACCESS_BASIC,
    ),
    'file' => 'migrate_pages.inc',
  );
  $items['admin/content/migrate/dashboard'] = array(
    'title' => 'Dashboard',
    'type' => MENU_DEFAULT_LOCAL_TASK,
    'weight' => -10,
  );
  $items['admin/content/migrate/settings'] = array(
    'title' => 'Settings',
    'description' => 'Migrate module settings',
    'weight' => 5,
    'page callback' => 'migrate_settings',
    'access arguments' => array(
      MIGRATE_ACCESS_ADVANCED,
    ),
    'file' => 'migrate_pages.inc',
    'type' => MENU_LOCAL_TASK,
  );
  $items['admin/content/migrate/content_set/%'] = array(
    'title' => 'Content set',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'migrate_content_set_mappings',
      4,
    ),
    'access arguments' => array(
      MIGRATE_ACCESS_ADVANCED,
    ),
    //    'type' => MENU_CALLBACK,
    'file' => 'migrate_pages.inc',
  );
  $items['admin/content/migrate/content_set/%/edit'] = array(
    'title' => 'Edit',
    'type' => MENU_DEFAULT_LOCAL_TASK,
  );
  $items['admin/content/migrate/content_set/%/export'] = array(
    'title' => 'Export',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'migrate_export_content_set',
      4,
    ),
    'access arguments' => array(
      MIGRATE_ACCESS_ADVANCED,
    ),
    'type' => MENU_LOCAL_TASK,
    'file' => 'migrate_pages.inc',
  );
  $items['migrate/xlat/%'] = array(
    'page callback' => 'migrate_xlat',
    'access arguments' => array(
      'access content',
    ),
    'page arguments' => array(
      2,
    ),
    'type' => MENU_CALLBACK,
  );
  return $items;
}

/**
 * Implementation of hook_schema_alter().
 */
function migrate_schema_alter(&$schema) {

  // Check for table existence - at install time, hook_schema_alter() may be called
  // before our install hook.
  if (db_table_exists('migrate_content_sets')) {
    $result = db_query("SELECT * FROM {migrate_content_sets}");
    while ($content_set = db_fetch_object($result)) {
      $maptablename = migrate_map_table_name($content_set->mcsid);
      $msgtablename = migrate_message_table_name($content_set->mcsid);

      // Get the proper field definition for the sourcekey
      $view = views_get_view($content_set->view_name);
      if (!$view) {
        drupal_set_message(t('View !view does not exist - either (re)create this view, or
          remove the migrate content set using it.', array(
          '!view' => $content_set->view_name,
        )));
        continue;
      }

      // Must do this to load the database
      $views_version = (string) views_api_version();
      if (views_api_version() >= '3') {
        $view
          ->init_display('default');
      }
      $view
        ->init_query();

      // TODO: For now, PK must be in base_table
      if (isset($view->base_database)) {
        $tabledb = $view->base_database;
      }
      else {
        $tabledb = 'default';
      }
      $tablename = $view->base_table;
      $sourceschema = _migrate_inspect_schema($tablename, $tabledb);

      // If the PK of the content set is defined, make sure we have a mapping table
      $sourcekey = $content_set->sourcekey;
      if ($sourcekey) {
        $sourcefield = $sourceschema['fields'][$sourcekey];
        if (!$sourcefield) {

          // strip base table name if views prepended it
          $baselen = drupal_strlen($tablename);
          if (!strncasecmp($sourcekey, $tablename . '_', $baselen + 1)) {
            $sourcekey = drupal_substr($sourcekey, $baselen + 1);
          }
          $sourcefield = $sourceschema['fields'][$sourcekey];
        }

        // We don't want serial fields to behave serially, so change to int
        if ($sourcefield['type'] == 'serial') {
          $sourcefield['type'] = 'int';
        }
        $schema[$maptablename] = _migrate_map_table_schema($sourcefield);
        $schema[$maptablename]['name'] = $maptablename;
        $schema[$msgtablename] = _migrate_message_table_schema($sourcefield);
        $schema[$msgtablename]['name'] = $msgtablename;
      }
    }
  }
}

/*
 * Translate URIs from an old site to the new one
 * Requires adding RewriteRules to .htaccess. For example, if the URLs
 * for news articles had the form
 * http://example.com/issues/news/[OldID].html, use this rule:
 *
 * RewriteRule ^issues/news/([0-9]+).html$ /migrate/xlat/node/$1 [L]
 *
 * @param $contenttype
 *  Content type to translate (e.g., 'node', 'user', etc.)
 * @param $oldid
 *  Primary key from input view
 */
function migrate_xlat($contenttype, $oldid) {
  if ($contenttype && $oldid) {
    $newid = _migrate_xlat_get_new_id($contenttype, $oldid);
    if ($newid) {
      $uri = migrate_invoke_all("xlat_{$contenttype}", $newid);
      drupal_goto($uri[0], NULL, NULL, 301);
    }
  }
}

/*
 * Helper function to translate an ID from a source file to the corresponding
 * Drupal-side ID (nid, uid, etc.)
 * Note that the result may be ambiguous - for example, if you are importing
 * nodes from different content sets, they might have overlapping source IDs.
 *
 * @param $contenttype
 *  Content type to translate (e.g., 'node', 'user', etc.)
 * @param $oldid
 *  Primary key from input view
 * @return
 *  Drupal-side ID of the object
 */
function _migrate_xlat_get_new_id($contenttype, $oldid) {
  $result = db_query("SELECT mcsid\n                      FROM {migrate_content_sets}\n                      WHERE contenttype='%s'", $contenttype);
  while ($row = db_fetch_object($result)) {
    static $maptables = array();
    if (!isset($maptables[$row->mcsid])) {
      $maptables[$row->mcsid] = migrate_map_table_name($row->mcsid);
    }
    $sql = "SELECT destid\n            FROM {" . $maptables[$row->mcsid] . "}\n            WHERE sourceid='%s'";
    $id = db_result(db_query($sql, $oldid));
    if ($id) {
      return $id;
    }
  }
  return NULL;
}

/**
 * Implementation of hook_theme().
 */
function migrate_theme() {
  return array(
    'migrate_mapping_table' => array(
      'arguments' => array(
        'form',
      ),
    ),
    '_migrate_dashboard_form' => array(
      'arguments' => array(
        'form' => NULL,
      ),
      'function' => 'theme_migrate_dashboard',
    ),
    '_migrate_settings_form' => array(
      'arguments' => array(
        'form' => NULL,
      ),
      'function' => 'theme_migrate_settings',
    ),
    'migrate_content_set_mappings' => array(
      'arguments' => array(
        'form' => NULL,
      ),
      'function' => 'theme_migrate_content_set_mappings',
    ),
  );
}

/**
 * Determine the name of the map table for this content set
 * @param $mcsid
 *  Unique identifier of the content set
 * @return
 *  The name of the map table
 */
function migrate_map_table_name($mcsid) {
  $machine_name = db_result(db_query("SELECT machine_name FROM {migrate_content_sets}\n                            WHERE mcsid=%d", $mcsid));
  return drupal_strtolower("migrate_map_{$machine_name}");
}

/**
 * Determine the name of the message table for this content set
 * @param $mcsid
 *  Unique identifier of the content set
 * @return
 *  The name of the message table
 */
function migrate_message_table_name($mcsid) {
  $machine_name = db_result(db_query("SELECT machine_name FROM {migrate_content_sets}\n                            WHERE mcsid=%d", $mcsid));
  return drupal_strtolower("migrate_msgs_{$machine_name}");
}

/**
 * Generate a map table schema record, given the source PK definition
 * @param $sourcefield
 *  Schema definition for the content set source's PK
 * @return
 *  Schema structure for a map table
 */
function _migrate_map_table_schema($sourcefield) {
  $schema = array(
    'description' => t('Mappings from source key to destination key'),
    'fields' => array(
      'sourceid' => $sourcefield,
      // @TODO: Assumes destination key is unsigned int
      'destid' => array(
        'type' => 'int',
        'unsigned' => TRUE,
        'not null' => TRUE,
      ),
      'needs_update' => array(
        'type' => 'int',
        'size' => 'tiny',
        'unsigned' => TRUE,
        'not null' => TRUE,
        'default' => (int) FALSE,
      ),
    ),
    'primary key' => array(
      'sourceid',
    ),
    'indexes' => array(
      'idkey' => array(
        'destid',
      ),
    ),
  );
  return $schema;
}

/**
 * Generate a message table schema record, given the source PK definition
 * @param $sourcefield
 *  Schema definition for the content set source's PK
 * @return
 *  Schema structure for a message table
 */
function _migrate_message_table_schema($sourcefield) {
  $schema = array(
    'description' => t('Import errors'),
    'fields' => array(
      'mceid' => array(
        'type' => 'serial',
        'unsigned' => TRUE,
        'not null' => TRUE,
      ),
      'sourceid' => $sourcefield,
      'level' => array(
        'type' => 'int',
        'unsigned' => TRUE,
        'not null' => TRUE,
        'default' => 1,
      ),
      'message' => array(
        'type' => 'text',
        'size' => 'medium',
        'not null' => TRUE,
      ),
    ),
    'primary key' => array(
      'mceid',
    ),
    'indexes' => array(
      'sourceid' => array(
        'sourceid',
      ),
    ),
  );
  return $schema;
}

/**
 * Implementation of hook_views_api().
 */
function migrate_views_api() {
  return array(
    'api' => '2.0',
  );
}

/**
 * Check to see if the advanced help module is installed, and if not put up
 * a message.
 *
 * Only call this function if the user is already in a position for this to
 * be useful.
 */
function migrate_check_advanced_help() {
  if (variable_get('migrate_hide_help_message', FALSE)) {
    return;
  }
  if (!module_exists('advanced_help')) {
    $filename = db_result(db_query("SELECT filename FROM {system} WHERE type = 'module' AND name = 'advanced_help'"));
    if ($filename && file_exists($filename)) {
      drupal_set_message(t('If you <a href="@modules">enable the advanced help module</a>,
              Migrate will provide more and better help. <a href="@hide">Hide this message.</a>', array(
        '@modules' => url('admin/build/modules'),
        '@hide' => url('admin/build/views/tools'),
      )));
    }
    else {
      drupal_set_message(t('If you install the advanced help module from !href, Migrate will provide more and better help. <a href="@hide">Hide this message.</a>', array(
        '!href' => l('http://drupal.org/project/advanced_help', 'http://drupal.org/project/advanced_help'),
        '@hide' => url('admin/content/migrate/settings'),
      )));
    }
  }
}

/**
 * Check if a date is valid and return the correct
 * timestamp to use. Returns -1 if the date is not
 * considered valid.
 */
function _migrate_valid_date($date) {
  if (empty($date)) {
    return -1;
  }
  if (is_numeric($date) && $date > -1) {
    return $date;
  }

  // strtotime() doesn't recognize dashes as separators, change to slashes
  $date = str_replace('-', '/', $date);
  $time = strtotime($date);
  if ($time < 0 || !$time) {

    // Handles form YYYY-MM-DD HH:MM:SS.garbage
    if (drupal_strlen($date) > 19) {
      $time = strtotime(drupal_substr($date, 0, 19));
      if ($time < 0 || !$time) {
        return -1;
      }
    }
  }
  return $time;
}

/**
 * Get the row count for a view, possibly filtered by arguments
 * @param $view
 *  Name of the view to query
 * @param $args
 *  Optional arguments to the view, separated by '/'
 * @return
 *  Number of rows in the view.
 */
function _migrate_get_view_count($view, $args = NULL) {
  if (is_string($view)) {
    $view = views_get_view($view);
  }

  // Force execution of count query, with minimal unnecessary results returned
  // @TODO: Find way to execute ONLY count query
  $view->pager['items_per_page'] = 1;
  $view->get_total_rows = TRUE;
  if ($args) {
    $view
      ->set_arguments(explode('/', $args));
  }
  $view
    ->execute();
  $rows = $view->total_rows;

  // TODO: Now, that's the total rows in the current source. However, it may be
  // (particularly with a content set that updates existing objects) that
  // previously-migrated content is no longer in the source, so the stats
  // (particularly Unimported) may be misleading. So, we would want to add in
  // anything in the map table not in the source. This is tricky, and may not
  // really be worth the performance impact, so we'll leave it alone for now.
  return $rows;
}

/**
 * Get the PHP memory_limit value in bytes
 */
function _migrate_memory_limit() {
  $value = trim(ini_get('memory_limit'));
  $last = strtolower($value[strlen($value) - 1]);
  switch ($last) {
    case 'g':
      $value *= 1024;
    case 'm':
      $value *= 1024;
    case 'k':
      $value *= 1024;
  }
  return $value;
}

/*
 * Implementation of hook_migrate_api().
 */
function migrate_migrate_api() {
  $api = array(
    'api' => 1,
    'path' => 'modules',
    'integration modules' => array(
      'comment' => array(
        'description' => t('Core migration support for the comment module'),
      ),
      'node' => array(
        'description' => t('Core migration support for the node module'),
      ),
      'profile' => array(
        'description' => t('Core migration support for the profile module'),
      ),
      'taxonomy' => array(
        'description' => t('Core migration support for the taxonomy module'),
      ),
      'user' => array(
        'description' => t('Core migration support for the user module'),
      ),
    ),
  );
  return $api;
}

// ------------------------------------------------------------------
// Include file helpers - @merlinofchoas: borrowing heavily from views.module

/**
 * Load views files on behalf of modules.
 */
function migrate_module_include() {
  foreach (migrate_get_module_apis() as $module => $info) {
    foreach ($info['integration modules'] as $intmod => $intmod_details) {
      $file = "{$info['path']}/{$intmod}.migrate.inc";
      if (file_exists($file) && $intmod_details['status'] == TRUE) {
        require_once $file;
      }
    }
  }
}

/**
 * Get a list of modules that support the current migrate API.
 */
function migrate_get_module_apis($reset = FALSE) {
  static $cache = NULL;
  if ($reset) {
    $cache = NULL;
  }
  if (!isset($cache)) {
    $cache = array();
    foreach (module_implements('migrate_api') as $module) {
      $function = $module . '_migrate_api';
      $info = $function();
      if (isset($info['api']) && $info['api'] == 1.0) {
        if (isset($info['path'])) {
          $info['path'] = drupal_get_path('module', $module) . '/' . $info['path'];
        }
        else {
          $info['path'] = drupal_get_path('module', $module);
        }
        if (!isset($info['integration modules'])) {
          $info['integration modules'] = array(
            $module => array(),
          );
        }
        $settings = variable_get('migrate_integration_settings', NULL);
        foreach ($info['integration modules'] as $intmod_name => $intmod_details) {

          // If the module was just entered as a string without details, we have to fix.
          if (!is_array($intmod_details)) {
            unset($info['integration modules'][$intmod_name]);
            $intmod_name = $intmod_details;
            $intmod_details = array();
          }
          $default_details = array(
            'description' => t('Support for the @intmod module.', array(
              '@intmod' => $intmod_name,
            )),
            'status' => TRUE,
          );

          // Allow override of defaults.
          $info['integration modules'][$intmod_name] = $intmod_details + $default_details;

          // Overwrite default status if set.
          if (isset($settings[$module][$intmod_name])) {
            $info['integration modules'][$intmod_name]['status'] = $settings[$module][$intmod_name];
          }
        }
        $cache[$module] = $info;
      }
      else {
        drupal_set_message(t('%function supports Migrate API version %modversion,
           Migrate module API version is %version - migration support not loaded.', array(
          '%function' => $function,
          '%modversion' => $info['api'],
          '%version' => MIGRATE_API_VERSION,
        )));
      }
    }
  }
  return $cache;
}

/**
 * Wrapper around schema_invoke('inspect'), to handle views tablenames
 * hacked to include "dbname.".
 */
function _migrate_inspect_schema($tablename = '', $tabledb = 'default') {
  static $inspect = array();

  // TW can treat external MySQL tables as internal, but we can revert
  // if we find a period in the tablename. Todo: might be better to build
  // this check direcly into schema_mysql.inc
  if (module_exists('tw') && strpos($tablename, '.')) {
    list($tabledb, $tablename) = explode('.', $tablename);
    $dbinfo = tw_get_dbinfo();
    foreach ($dbinfo as $dbkey => $db_detail) {
      if ($db_detail['name'] == $tabledb) {
        $tabledb = $dbkey;
        break;
      }
    }
  }
  if (!isset($inspect[$tabledb])) {
    db_set_active($tabledb);

    // TODO: schema_mysql_inspect($name = NULL) takes $name [tablename] which
    // limits the query to only get that table, which is probably a good idea.
    $inspect[$tabledb] = schema_invoke('inspect');
    db_set_active('default');
  }

  // If a tablename was not given, return the whole schema
  if ($tablename == '') {
    return $inspect[$tabledb];
  }

  // Return the schema for just this table.
  return $inspect[$tabledb][$tablename];
}

Functions

Namesort descending Description
migrate_add_mapping Add a mapping from source ID to destination ID for the specified content set
migrate_check_advanced_help Check to see if the advanced help module is installed, and if not put up a message.
migrate_content_process_batch Process all enabled migration processes in a browser, using the Batch API to break it into manageable chunks.
migrate_content_process_clear Clear migrated objects from the specified content set
migrate_content_process_import Import objects from the specified content set
migrate_content_set_update Prepare a content set for updating of existing items
migrate_delete_content_mapping Delete the specified content mapping.
migrate_delete_content_set Delete the specified content set, including map and message tables.
migrate_destination_invoke_all Call a destination hook (e.g., hook_migrate_prepare_node). Use this version for hooks with the precise signature below, so that the object can be passed by reference.
migrate_get_module_apis Get a list of modules that support the current migrate API.
migrate_help Implementation of hook_help().
migrate_init
migrate_invoke_all Call a migrate hook. Like module_invoke_all, but gives modules a chance to do one-time initialization before any of their hooks are called, and adds "migrate" to the hook name.
migrate_map_table_name Determine the name of the map table for this content set
migrate_menu Implementation of hook_menu().
migrate_message Convenience function for generating a message array
migrate_message_table_name Determine the name of the message table for this content set
migrate_migrate_api
migrate_module_include Load views files on behalf of modules.
migrate_perm Implementation of hook_perm().
migrate_save_content_mapping Save a new or updated content mapping
migrate_save_content_set Save a new or updated content set
migrate_schema_alter Implementation of hook_schema_alter().
migrate_theme Implementation of hook_theme().
migrate_views_api Implementation of hook_views_api().
migrate_xlat
_migrate_get_view_count Get the row count for a view, possibly filtered by arguments
_migrate_inspect_schema Wrapper around schema_invoke('inspect'), to handle views tablenames hacked to include "dbname.".
_migrate_map_table_schema Generate a map table schema record, given the source PK definition
_migrate_memory_limit Get the PHP memory_limit value in bytes
_migrate_message_table_schema Generate a message table schema record, given the source PK definition
_migrate_process_message Capture messages generated during an import or clear process
_migrate_progress_message Generate a progress message
_migrate_valid_date Check if a date is valid and return the correct timestamp to use. Returns -1 if the date is not considered valid.
_migrate_xlat_get_new_id

Constants