function node_import_do_task in Node import 6
Import a number of rows from the specified task. Should only be called from within hook_cron() or from a JS callback as this function may take a long time.
The function ends when $count $units have been finished. For example
node_import_do_task($task, 'all');
node_import_do_task($task, 'rows', 10);
node_import_do_task($task, 'bytes', 4096);
node_import_do_task($task, 'ms', 1000);
Parameters
$task: Array. The task to continue. Note that this is passed by reference! So you can check the status of the task after running this function without having to query the database.
$unit: String. Either 'rows', 'bytes', 'ms' (milliseconds) or 'all'. Defaults to 'all'.
$count: Integer. Number of $units to do. Defaults to 0 (in which case exactly one row will be imported if $unit != 'all').
Return value
The status of each imported row (error or not) is stored in the database. @see node_import_constants.
2 calls to node_import_do_task()
- node_import_do_all_tasks in ./
node_import.inc - Import a number of rows from all available tasks. Should only be called from within hook_cron() or from a JS callback as this function may take a long time.
- node_import_js in ./
node_import.inc - JS callback to continue the specified task and returns the status of it. This function will take at most one second.
File
- ./
node_import.inc, line 858 - Public API of the Node import module.
Code
function node_import_do_task(&$task, $unit = 'all', $count = 0) {
global $node_import_can_continue;
$node_import_can_continue = TRUE;
if ($task['status'] != NODE_IMPORT_STATUS_DONE && node_import_lock_acquire()) {
global $user;
$backup_user = $user;
if ($task['uid'] != $user->uid) {
session_save_session(FALSE);
$user = user_load(array(
'uid' => $task['uid'],
));
}
$taskid = $task['taskid'];
$file_offset = 0;
$byte_count = 0;
$row_count = 0;
timer_start('node_import:do_task:' . $taskid);
$data = array();
switch ($task['status']) {
case NODE_IMPORT_STATUS_PENDING:
if ($task['file_offset'] == 0 && $task['has_headers']) {
list($file_offset, $data) = node_import_read_from_file($task['file']->filepath, $file_offset, $task['file_options']);
}
else {
$file_offset = $task['file_offset'];
}
break;
case NODE_IMPORT_STATUS_ERROR:
$task['status'] = NODE_IMPORT_STATUS_DONE;
$file_offset = $task['file']->filesize;
//TODO
break;
}
module_invoke_all('node_import_task', $task, 'continue');
while ($task['status'] != NODE_IMPORT_STATUS_DONE) {
list($new_offset, $data) = node_import_read_from_file($task['file']->filepath, $file_offset, $task['file_options']);
if (is_array($data)) {
switch ($task['status']) {
case NODE_IMPORT_STATUS_PENDING:
db_query("DELETE FROM {node_import_status} WHERE taskid = %d AND file_offset = %d", $taskid, $file_offset);
db_query("INSERT INTO {node_import_status} (taskid, file_offset, errors) VALUES (%d, %d, '%s')", $taskid, $file_offset, serialize(array()));
break;
case NODE_IMPORT_STATUS_ERROR:
db_query("UPDATE {node_import_status} SET errors = '%s', status = %d WHERE taskid = %d AND file_offset = %d", serialize(array()), NODE_IMPORT_STATUS_PENDING, $taskid, $file_offset);
break;
}
db_query("UPDATE {node_import_tasks} SET file_offset = %d, changed = %d WHERE taskid = %d", $new_offset, time(), $taskid);
$task['file_offset'] = $new_offset;
$errors = node_import_create($task['type'], $data, $task['map'], $task['defaults'], $task['options'], FALSE);
if (is_array($errors)) {
db_query("UPDATE {node_import_status} SET status = %d, errors = '%s' WHERE taskid = %d AND file_offset = %d", NODE_IMPORT_STATUS_ERROR, serialize($errors), $taskid, $file_offset);
db_query("UPDATE {node_import_tasks} SET row_error = row_error + 1 WHERE taskid = %d", $taskid);
$task['row_error']++;
}
else {
db_query("UPDATE {node_import_status} SET status = %d, objid = %d WHERE taskid = %d AND file_offset = %d", NODE_IMPORT_STATUS_DONE, $errors, $taskid, $file_offset);
db_query("UPDATE {node_import_tasks} SET row_done = row_done + 1 WHERE taskid = %d", $taskid);
$task['row_done']++;
}
$byte_count += $new_offset - $file_offset;
$row_count++;
}
else {
db_query("UPDATE {node_import_tasks} SET status = %d, file_offset = %d WHERE taskid = %d", NODE_IMPORT_STATUS_DONE, $task['file']->filesize, $taskid);
$task['status'] = NODE_IMPORT_STATUS_DONE;
$task['file_offset'] = $task['file']->filesize;
}
switch ($task['status']) {
case NODE_IMPORT_STATUS_PENDING:
$file_offset = $new_offset;
break;
case NODE_IMPORT_STATUS_ERROR:
$file_offset = $task['file']->filesize;
//TODO
break;
}
if ($node_import_can_continue && ($unit == 'all' || $unit == 'bytes' && $byte_count < $count || $unit == 'rows' && $row_count < $count || $unit == 'ms' && timer_read('node_import:do_task:' . $taskid) < $count)) {
continue;
}
break;
}
module_invoke_all('node_import_task', $task, 'pause');
// Cleanup before exit.
$user = $backup_user;
session_save_session(TRUE);
timer_stop('node_import:do_task:' . $taskid);
node_import_lock_release();
}
}