rec_transfer.module in Recommender API 6.3
Same filename and directory in other branches
This is the module file for Recommender Data Transfer
File
rec_transfer/rec_transfer.moduleView source
<?php
/**
* @file
* This is the module file for Recommender Data Transfer
*/
// Implements hook_menu().
function rec_transfer_menu() {
$items = array();
$items['admin/settings/recommender/transfer'] = array(
'title' => 'Cloud Service',
'description' => 'Configure recommender to transfer data to cloud service using HTTP',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'rec_transfer_settings_form',
),
'access arguments' => array(
'administer recommender',
),
);
return $items;
}
function rec_transfer_settings_form() {
$form = array();
$form['rec_transfer_endpoint'] = array(
'#type' => 'textfield',
'#title' => t('Service endpoint'),
'#description' => t('Please specify the service endpoint of the cloud.'),
'#default_value' => variable_get('rec_transfer_endpoint', ''),
'#required' => TRUE,
);
$form['rec_transfer_apikey'] = array(
'#type' => 'textfield',
'#title' => t('API Key'),
'#description' => t('Please specify a valid API key.'),
'#default_value' => variable_get('rec_transfer_apikey', ''),
//'#maxlength' => 16,
'#required' => TRUE,
);
return system_settings_form($form);
}
/**
* Implements hook_cron().
*/
function rec_transfer_cron() {
rec_transfer_handle_commands(TRUE);
}
function rec_transfer_handle_commands($cron = FALSE) {
if (is_null(variable_get('rec_transfer_apikey', NULL)) || is_null(variable_get('rec_transfer_endpoint', NULL))) {
watchdog('rec_transfer', 'You must specify service endpoint and apikey.');
return;
}
// retrieve the ping server commands
$rows = db_query("SELECT id FROM {async_command} WHERE status IS NULL AND app='recommender' AND command='pingMe()'");
while ($row = db_fetch_object($rows)) {
$ready_check = rec_transfer_check_service('ready');
if (!$ready_check['success']) {
async_command_update_command($row->id, 0, 'Remote service is not ready. Additional message: ' . $ready_check['message'], time());
}
else {
async_command_update_command($row->id, 1, 'Pong from remote service: ' . $ready_check['message'], time());
}
}
// retrieve a list of commands waiting to be executed.
$rows = db_query("SELECT id, eid FROM {async_command} WHERE status IS NULL AND app='recommender' AND command='runRecommender()'");
while ($row = db_fetch_object($rows)) {
$command = array(
'command_id' => $row->id,
'recommender_id' => $row->eid,
);
rec_transfer_upload($command);
}
// retrieve a list of commands pending or running, check status and download results if necessary
$rows = db_query("SELECT id, eid FROM {async_command} WHERE status=2 AND app='recommender' AND command='runRecommender()'");
while ($row = db_fetch_object($rows)) {
$command = array(
'command_id' => $row->id,
'recommender_id' => $row->eid,
);
rec_transfer_download($command);
}
}
function rec_transfer_upload($command) {
$command_id = $command['command_id'];
$recommender_id = $command['recommender_id'];
// check whether the service is available or not.
$ready_check = rec_transfer_check_service('ready');
if (!$ready_check['success']) {
async_command_update_command($command_id, 0, 'Remote service is not ready. Additional message: ' . $ready_check['message'], time());
return;
}
$pref_filename = rec_transfer_output_preference($command_id, $recommender_id);
$rec_params = db_result(db_query('SELECT params FROM {recommender_app} WHERE id = %d', $recommender_id));
// D6 uses PHP serialize, needs to change to JSON to accomodate D7
$rec_params = json_encode(unserialize($rec_params));
$upload_success = rec_transfer_upload_preference($pref_filename, array(
'recommender_params' => $rec_params,
));
//db_query("UPDATE {async_command} SET control='REMT', status='PEND' WHERE id=:id", array(':id' => $row->id));
if ($upload_success) {
// attention: still remains 'PENDING'?
//async_command_update_command($command_id, array('status' => 'RUNN')) ;
async_command_update_command($command_id, 2, 'Data uploading or uploaded. Waiting to download results.', time());
}
else {
async_command_update_command($command_id, 0, 'Data upload error.', time());
}
}
function rec_transfer_download($command) {
$command_id = $command['command_id'];
$recommender_id = $command['recommender_id'];
// check whether the service is finished or not.
$status_check = rec_transfer_check_service('status', array(
'id' => $command_id,
));
if (!$status_check['success']) {
async_command_update_command($command_id, 0, 'Remote service is not successful running this command. Additional message: ' . $status_check['message'], time());
return;
}
if (isset($status_check['status']) && $status_check['status'] == 'OKOK') {
// download file.
$s1 = rec_transfer_import_results('similarity', $command_id, $recommender_id);
$s2 = rec_transfer_import_results('prediction', $command_id, $recommender_id);
if ($s1 && $s2) {
async_command_update_command($command_id, 1, 'Command running successful. Message from remote service: ' . $status_check['message'], time());
}
else {
// FIXME: retry download. don't need to recompute everything again.
async_command_update_command($command_id, 0, 'Cannot download or import recommendations from remote service. Consider retry.', time());
}
}
// else {
// $options = array(
// 'message' => 'Message from remote service: '. $status_check['message'],
// 'checkpoint' => time(), // only update checkpoint. not end time.
// );
// if (!empty($status_check['status'])) {
// $options['status'] = $status_check['status'];
// }
// async_command_update_command($command_id, $options);
// }
}
function rec_transfer_check_service($what, $options = array()) {
// since we are using CURL anyway, we'll use CURL instead of drupal_http_request
$ch = curl_init();
curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
curl_setopt($ch, CURLOPT_POST, TRUE);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint', '') . '/check');
$post = array(
'apikey' => variable_get('rec_transfer_apikey', ''),
'what' => $what,
) + $options;
curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
$response = curl_exec($ch);
//var_dump($response);
$data = json_decode($response, TRUE);
$results = array(
'success' => $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200,
'http_code' => curl_getinfo($ch, CURLINFO_HTTP_CODE),
'response' => $response,
'data' => $data,
'message' => '',
);
if ($response == FALSE) {
$results['message'] = 'Cannot connect to the service.';
}
else {
if (isset($data['status'])) {
$results['status'] = $data['status'];
}
$results['message'] = $data['message'];
}
curl_close($ch);
return $results;
}
function rec_transfer_output_preference($command_id, $recommender_id) {
$apikey = variable_get('rec_transfer_apikey', NULL);
// drupal_create_filename() is not necessary because we want to overwrite the file if exists.
$filename = variable_get('file_temporary_path', file_directory_temp()) . "/{$apikey}-{$command_id}";
$fp = gzopen($filename, 'wb9');
$rec_params = unserialize(db_result(db_query('SELECT params FROM {recommender_app} WHERE id=%d', $recommender_id)));
// FIXME: what if $rec_params['table'] == <FILE>?
if ($rec_params['table'] == '<BUILTIN>') {
$sql = 'SELECT source_eid, target_eid, score, updated FROM {recommender_preference} WHERE app_id=' . $recommender_id;
$rec_params['fields'] = array(
'source_eid',
'target_eid',
'score',
'updated',
);
}
else {
if (isset($rec_params['sql']) && strtoupper(substr($rec_params['sql'], 0, strlen('SELECT '))) === 'SELECT ') {
$sql = $rec_params['sql'];
}
else {
$fields = $rec_params['fields'];
foreach ($fields as $pos => $field) {
$fields[$pos] = is_null($field) ? 'NULL' : $field;
}
$sql = 'SELECT ' . implode(',', $fields) . ' FROM ' . $rec_params['table'];
}
}
$users_dict = array();
$items_dict = array();
$rows = db_query($sql);
while ($row = db_fetch_array($rows)) {
$line = array(
$row[$rec_params['fields'][0]],
$row[$rec_params['fields'][1]],
is_null($rec_params['fields'][2]) ? NULL : $row[$rec_params['fields'][2]],
is_null($rec_params['fields'][3]) ? NULL : $row[$rec_params['fields'][3]],
);
$users_dict[$line[0]] = 1;
$items_dict[$line[1]] = 1;
// TODO: test failuire; also make sure CSV is well formed (eg. double quotes, etc)
gzwrite($fp, implode(',', $line) . "\n");
}
gzclose($fp);
//print $filename;
return $filename;
}
function rec_transfer_upload_preference($filename, $options = array()) {
// TODO: need to check curl first.
// TODO: could consider to use stream_ copy_ to_ stream instead. drupal_http_client() is not good here because we need to read big file into memory first.
$ch = curl_init();
// curl_setopt($ch, CURLOPT_HEADER, FALSE); // TRUE to include the header in the output.
// curl_setopt($ch, CURLOPT_VERBOSE, FALSE);
// curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint', '') . '/upload');
curl_setopt($ch, CURLOPT_ENCODING, "gzip");
// setup compression
curl_setopt($ch, CURLOPT_POST, TRUE);
// same as <input type="file" name="file_box">
$post = array(
'preference' => "@{$filename}",
) + $options;
curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
$response = curl_exec($ch);
$upload_success = $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200;
curl_close($ch);
return $upload_success;
}
function rec_transfer_download_results($download_file, $save_file) {
$fp = fopen($save_file, 'w');
$ch = curl_init();
curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
curl_setopt($ch, CURLOPT_ENCODING, "gzip");
curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint', '') . "/download/{$download_file}");
curl_setopt($ch, CURLOPT_FILE, $fp);
$response = curl_exec($ch);
$download_success = $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200;
curl_close($ch);
fclose($fp);
return $download_success;
}
function rec_transfer_import_results($what, $command_id, $recommender_id) {
$apikey = variable_get('rec_transfer_apikey', '');
if ($what == 'similarity') {
$table = '{recommender_similarity}';
$suffix = 'simi';
}
else {
if ($what == 'prediction') {
$table = '{recommender_prediction}';
$suffix = 'pred';
}
else {
assert(FALSE);
}
}
$download_file = "{$apikey}-{$command_id}.{$suffix}";
$save_file = variable_get('file_temporary_path', file_directory_temp()) . '/' . $download_file;
$download_success = rec_transfer_download_results($download_file, $save_file);
if ($download_success) {
// FIXME: if new data import fails, we'll need to re-import old data.
// also, perhaps need to think about incremental update. then we don't want to delete old data.
db_query("DELETE FROM {$table} WHERE app_id=%d", $recommender_id);
$fp = fopen($save_file, 'r');
while (($row = fgetcsv($fp)) !== FALSE) {
db_query("INSERT INTO {$table}(app_id, source_eid, target_eid, score, updated) VALUES(%d, %d, %d, %f, %d)", $recommender_id, $row[0], $row[1], $row[2], $row[3]);
}
fclose($fp);
return TRUE;
}
else {
return FALSE;
}
}