You are here

function node_import_do_task in Node import 6

Import a number of rows from the specified task. Should only be called from within hook_cron() or from a JS callback as this function may take a long time.

The function ends when $count $units have been finished. For example

node_import_do_task($task, 'all');
node_import_do_task($task, 'rows', 10);
node_import_do_task($task, 'bytes', 4096);
node_import_do_task($task, 'ms', 1000);

Parameters

$task: Array. The task to continue. Note that this is passed by reference! So you can check the status of the task after running this function without having to query the database.

$unit: String. Either 'rows', 'bytes', 'ms' (milliseconds) or 'all'. Defaults to 'all'.

$count: Integer. Number of $units to do. Defaults to 0 (in which case exactly one row will be imported if $unit != 'all').

Return value

The status of each imported row (error or not) is stored in the database. @see node_import_constants.

2 calls to node_import_do_task()
node_import_do_all_tasks in ./node_import.inc
Import a number of rows from all available tasks. Should only be called from within hook_cron() or from a JS callback as this function may take a long time.
node_import_js in ./node_import.inc
JS callback to continue the specified task and returns the status of it. This function will take at most one second.

File

./node_import.inc, line 858
Public API of the Node import module.

Code

function node_import_do_task(&$task, $unit = 'all', $count = 0) {
  global $node_import_can_continue;
  $node_import_can_continue = TRUE;
  if ($task['status'] != NODE_IMPORT_STATUS_DONE && node_import_lock_acquire()) {
    global $user;
    $backup_user = $user;
    if ($task['uid'] != $user->uid) {
      session_save_session(FALSE);
      $user = user_load(array(
        'uid' => $task['uid'],
      ));
    }
    $taskid = $task['taskid'];
    $file_offset = 0;
    $byte_count = 0;
    $row_count = 0;
    timer_start('node_import:do_task:' . $taskid);
    $data = array();
    switch ($task['status']) {
      case NODE_IMPORT_STATUS_PENDING:
        if ($task['file_offset'] == 0 && $task['has_headers']) {
          list($file_offset, $data) = node_import_read_from_file($task['file']->filepath, $file_offset, $task['file_options']);
        }
        else {
          $file_offset = $task['file_offset'];
        }
        break;
      case NODE_IMPORT_STATUS_ERROR:
        $task['status'] = NODE_IMPORT_STATUS_DONE;
        $file_offset = $task['file']->filesize;

        //TODO
        break;
    }
    module_invoke_all('node_import_task', $task, 'continue');
    while ($task['status'] != NODE_IMPORT_STATUS_DONE) {
      list($new_offset, $data) = node_import_read_from_file($task['file']->filepath, $file_offset, $task['file_options']);
      if (is_array($data)) {
        switch ($task['status']) {
          case NODE_IMPORT_STATUS_PENDING:
            db_query("DELETE FROM {node_import_status} WHERE taskid = %d AND file_offset = %d", $taskid, $file_offset);
            db_query("INSERT INTO {node_import_status} (taskid, file_offset, errors) VALUES (%d, %d, '%s')", $taskid, $file_offset, serialize(array()));
            break;
          case NODE_IMPORT_STATUS_ERROR:
            db_query("UPDATE {node_import_status} SET errors = '%s', status = %d WHERE taskid = %d AND file_offset = %d", serialize(array()), NODE_IMPORT_STATUS_PENDING, $taskid, $file_offset);
            break;
        }
        db_query("UPDATE {node_import_tasks} SET file_offset = %d, changed = %d WHERE taskid = %d", $new_offset, time(), $taskid);
        $task['file_offset'] = $new_offset;
        $errors = node_import_create($task['type'], $data, $task['map'], $task['defaults'], $task['options'], FALSE);
        if (is_array($errors)) {
          db_query("UPDATE {node_import_status} SET status = %d, errors = '%s' WHERE taskid = %d AND file_offset = %d", NODE_IMPORT_STATUS_ERROR, serialize($errors), $taskid, $file_offset);
          db_query("UPDATE {node_import_tasks} SET row_error = row_error + 1 WHERE taskid = %d", $taskid);
          $task['row_error']++;
        }
        else {
          db_query("UPDATE {node_import_status} SET status = %d, objid = %d WHERE taskid = %d AND file_offset = %d", NODE_IMPORT_STATUS_DONE, $errors, $taskid, $file_offset);
          db_query("UPDATE {node_import_tasks} SET row_done = row_done + 1 WHERE taskid = %d", $taskid);
          $task['row_done']++;
        }
        $byte_count += $new_offset - $file_offset;
        $row_count++;
      }
      else {
        db_query("UPDATE {node_import_tasks} SET status = %d, file_offset = %d WHERE taskid = %d", NODE_IMPORT_STATUS_DONE, $task['file']->filesize, $taskid);
        $task['status'] = NODE_IMPORT_STATUS_DONE;
        $task['file_offset'] = $task['file']->filesize;
      }
      switch ($task['status']) {
        case NODE_IMPORT_STATUS_PENDING:
          $file_offset = $new_offset;
          break;
        case NODE_IMPORT_STATUS_ERROR:
          $file_offset = $task['file']->filesize;

          //TODO
          break;
      }
      if ($node_import_can_continue && ($unit == 'all' || $unit == 'bytes' && $byte_count < $count || $unit == 'rows' && $row_count < $count || $unit == 'ms' && timer_read('node_import:do_task:' . $taskid) < $count)) {
        continue;
      }
      break;
    }
    module_invoke_all('node_import_task', $task, 'pause');

    // Cleanup before exit.
    $user = $backup_user;
    session_save_session(TRUE);
    timer_stop('node_import:do_task:' . $taskid);
    node_import_lock_release();
  }
}