rec_transfer.module in Recommender API 7.5
Same filename and directory in other branches
This is the module file for Recommender Data Transfer
File
rec_transfer/rec_transfer.moduleView source
<?php
/**
* @file
* This is the module file for Recommender Data Transfer
*/
// Implements hook_menu().
function rec_transfer_menu() {
$items = array();
$items['admin/config/search/recommender/transfer'] = array(
'title' => 'Cloud Service',
'description' => 'Configure recommender to transfer data to cloud service using HTTP',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'rec_transfer_settings_form',
),
'access arguments' => array(
'administer recommender',
),
);
return $items;
}
function rec_transfer_settings_form() {
$form = array();
$form['rec_transfer_endpoint'] = array(
'#type' => 'textfield',
'#title' => t('Service endpoint'),
'#description' => t('Please specify the service endpoint of the cloud.'),
'#default_value' => variable_get('rec_transfer_endpoint', ''),
'#required' => TRUE,
'#element_validate' => array(
'rec_transfer_validate_url',
),
);
$form['rec_transfer_apikey'] = array(
'#type' => 'textfield',
'#title' => t('API Key'),
'#description' => t('Please specify a valid API key.'),
'#default_value' => variable_get('rec_transfer_apikey', ''),
//'#maxlength' => 16,
'#required' => TRUE,
'#element_validate' => array(
'rec_transfer_validate_apikey',
),
);
return system_settings_form($form);
}
function rec_transfer_validate_url($element, &$form_state) {
$value = $element['#value'];
if (!valid_url($value) || strrpos($value, '/') == strlen($value) - 1) {
form_error($element, t('%name must be a valid URL without trailing slash "/".', array(
'%name' => $element['#title'],
)));
}
}
function rec_transfer_validate_apikey($element, &$form_state) {
$value = $element['#value'];
if (strlen($value) != 16) {
form_error($element, t('%name must be a valid API Key with length 16.', array(
'%name' => $element['#title'],
)));
}
}
/**
* Implements hook_cron().
*/
function rec_transfer_cron() {
rec_transfer_handle_commands(TRUE);
}
/**
* Implements hook_cron_queue_info().
*/
function rec_transfer_cron_queue_info() {
return array(
'rec_transfer_upload' => array(
'worker callback' => 'rec_transfer_upload',
'time' => 600,
),
'rec_transfer_download' => array(
'worker callback' => 'rec_transfer_download',
'time' => 600,
),
);
}
function rec_transfer_handle_commands($cron = FALSE) {
if (is_null(variable_get('rec_transfer_apikey')) || is_null(variable_get('rec_transfer_endpoint'))) {
watchdog('rec_transfer', 'You must specify service endpoint and apikey.');
return;
}
// retrieve the ping server commands
$rows = db_query("SELECT id FROM {async_command} WHERE status IS NULL AND app='recommender' AND command='PingMe'");
foreach ($rows as $row) {
$ready_check = rec_transfer_check_service('ready');
if (!$ready_check['success']) {
async_command_update_command($row->id, array(
'status' => 'FAIL',
'message' => 'Remote service is not ready. Additional message: ' . $ready_check['message'],
'end' => time(),
));
}
else {
async_command_update_command($row->id, array(
'status' => 'OKOK',
'message' => 'Pong from remote service: ' . $ready_check['message'],
'end' => time(),
));
}
}
// retrieve a list of commands waiting to be executed.
$rows = db_query("SELECT id, id1 FROM {async_command} WHERE status IS NULL AND app='recommender' AND command='RunRecommender'");
$upload_queue = DrupalQueue::get('rec_transfer_upload');
foreach ($rows as $row) {
$command = array(
'command_id' => $row->id,
'recommender_id' => $row->id1,
);
if (!$cron) {
rec_transfer_upload($command);
}
else {
$upload_queue
->createItem($command);
}
}
// retrieve a list of commands pending or running, check status and download results if necessary
$rows = db_query("SELECT id, id1 FROM {async_command} WHERE status IN ('PEND', 'RUNN') AND app='recommender' AND command='RunRecommender'");
$download_queue = DrupalQueue::get('rec_transfer_download');
foreach ($rows as $row) {
$command = array(
'command_id' => $row->id,
'recommender_id' => $row->id1,
);
if (!$cron) {
rec_transfer_download($command);
}
else {
$download_queue
->createItem($command);
}
}
}
function rec_transfer_upload($command) {
$command_id = $command['command_id'];
$recommender_id = $command['recommender_id'];
// this could get executed from cron, where command_id and recommender_id are not valid
// attention: perhaps need to check recommender_id too if the recommender_app is disabled where upload is still in process.
$num = async_command_update_command($command_id, array(
"control" => 'REMT',
'status' => 'PEND',
'start' => time(),
));
if ($num != 1) {
return;
}
// check whether the service is available or not.
$ready_check = rec_transfer_check_service('ready');
if (!$ready_check['success']) {
async_command_update_command($command_id, array(
'status' => 'FAIL',
'message' => 'Remote service is not ready. Additional message: ' . $ready_check['message'],
'end' => time(),
));
return;
}
$pref_filename = rec_transfer_output_preference($command_id, $recommender_id);
$rec_params = db_query('SELECT params FROM {recommender_app} WHERE id = :id', array(
':id' => $recommender_id,
))
->fetchField();
$upload_success = rec_transfer_upload_preference($pref_filename, array(
'recommender_params' => $rec_params,
));
//db_query("UPDATE {async_command} SET control='REMT', status='PEND' WHERE id=:id", array(':id' => $row->id));
if ($upload_success) {
// attention: still remains 'PENDING'?
//async_command_update_command($command_id, array('status' => 'RUNN')) ;
}
}
function rec_transfer_download($command) {
$command_id = $command['command_id'];
$recommender_id = $command['recommender_id'];
// if command_id doesn't exist anymore, simply ignore the rest of the code.
// attention: perhaps need to check 'recommender_id' too.
$num = db_query('SELECT COUNT(id) FROM {async_command} WHERE id=:id', array(
':id' => $command_id,
))
->fetchField();
if ($num != 1) {
return;
}
// check whether the service is finished or not.
$status_check = rec_transfer_check_service('status', array(
'id' => $command_id,
));
if (!$status_check['success']) {
async_command_update_command($command_id, array(
'status' => 'FAIL',
'message' => 'Remote service is not successful running this command. Additional message: ' . $status_check['message'],
'end' => time(),
));
return;
}
if (isset($status_check['status']) && $status_check['status'] == 'OKOK') {
// download file.
$s1 = rec_transfer_import_results('similarity', $command_id, $recommender_id);
$s2 = rec_transfer_import_results('prediction', $command_id, $recommender_id);
if ($s1 && $s2) {
async_command_update_command($command_id, array(
'status' => 'OKOK',
'message' => 'Command running successful. Message from remote service: ' . $status_check['message'],
'end' => time(),
));
}
else {
// FIXME: retry download. don't need to recompute everything again.
async_command_update_command($command_id, array(
'status' => 'FAIL',
'message' => 'Cannot download or import recommendations from remote service. Consider retry.',
'end' => time(),
));
}
}
else {
$options = array(
'message' => 'Message from remote service: ' . $status_check['message'],
'checkpoint' => time(),
);
if (!empty($status_check['status'])) {
$options['status'] = $status_check['status'];
}
async_command_update_command($command_id, $options);
}
}
function rec_transfer_check_service($what, $options = array()) {
// since we are using CURL anyway, we'll use CURL instead of drupal_http_request
$ch = curl_init();
curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
curl_setopt($ch, CURLOPT_POST, TRUE);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint') . '/check');
$post = array(
'apikey' => variable_get('rec_transfer_apikey'),
'what' => $what,
) + $options;
curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
$response = curl_exec($ch);
//var_dump($response);
$data = drupal_json_decode($response);
$results = array(
'success' => $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200,
'http_code' => curl_getinfo($ch, CURLINFO_HTTP_CODE),
'response' => $response,
'data' => $data,
'message' => '',
);
if ($response == FALSE) {
$results['message'] = 'Cannot connect to the service.';
}
else {
if (isset($data['status'])) {
$results['status'] = $data['status'];
}
$results['message'] = $data['message'];
}
curl_close($ch);
return $results;
}
function rec_transfer_output_preference($command_id, $recommender_id) {
$apikey = variable_get('rec_transfer_apikey');
// drupal_create_filename() is not necessary because we want to overwrite the file if exists.
$filename = variable_get('file_temporary_path', file_directory_temp()) . "/{$apikey}-{$command_id}";
$fp = gzopen($filename, 'wb9');
$rec_params = json_decode(db_query('SELECT params FROM {recommender_app} WHERE id=:id', array(
':id' => $recommender_id,
))
->fetchField(), TRUE);
// FIXME: what if $rec_params['table'] == <FILE>?
if ($rec_params['table'] == '<BUILTIN>') {
$sql = 'SELECT source_id, target_id, score, updated FROM {recommender_preference} WHERE app_id=' . $recommender_id;
$rec_params['fields'] = array(
'source_id',
'target_id',
'score',
'updated',
);
}
else {
if (strtoupper(substr($rec_params['table'], 0, strlen('SELECT '))) === 'SELECT ') {
$sql = $rec_params['table'];
}
else {
$fields = $rec_params['fields'];
foreach ($fields as $pos => $field) {
$fields[$pos] = is_null($field) ? 'NULL' : $field;
}
$sql = 'SELECT ' . implode(',', $fields) . ' FROM ' . $rec_params['table'];
}
}
$users_dict = array();
$items_dict = array();
$rows = db_query($sql, array(), array(
'fetch' => PDO::FETCH_ASSOC,
));
foreach ($rows as $row) {
$line = array(
$row[$rec_params['fields'][0]],
$row[$rec_params['fields'][1]],
is_null($rec_params['fields'][2]) ? NULL : $row[$rec_params['fields'][2]],
is_null($rec_params['fields'][3]) ? NULL : $row[$rec_params['fields'][3]],
);
$users_dict[$line[0]] = 1;
$items_dict[$line[1]] = 1;
// TODO: test failuire; also make sure CSV is well formed (eg. double quotes, etc)
gzwrite($fp, implode(',', $line) . "\n");
}
gzclose($fp);
//print $filename;
async_command_update_command($command_id, array(
'number1' => count($users_dict),
'number2' => count($items_dict),
));
return $filename;
}
function rec_transfer_upload_preference($filename, $options = array()) {
// TODO: need to check curl first.
// TODO: could consider to use stream_ copy_ to_ stream instead. drupal_http_client() is not good here because we need to read big file into memory first.
$ch = curl_init();
// curl_setopt($ch, CURLOPT_HEADER, FALSE); // TRUE to include the header in the output.
// curl_setopt($ch, CURLOPT_VERBOSE, FALSE);
// curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint') . '/upload');
curl_setopt($ch, CURLOPT_ENCODING, "gzip");
// setup compression
curl_setopt($ch, CURLOPT_POST, TRUE);
// same as <input type="file" name="file_box">
$post = array(
'preference' => "@{$filename}",
) + $options;
curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
$response = curl_exec($ch);
$upload_success = $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200;
curl_close($ch);
return $upload_success;
}
function rec_transfer_download_results($download_file, $save_file) {
$fp = fopen($save_file, 'w');
$ch = curl_init();
curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
curl_setopt($ch, CURLOPT_ENCODING, "gzip");
curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint') . "/download/{$download_file}");
curl_setopt($ch, CURLOPT_FILE, $fp);
$response = curl_exec($ch);
$download_success = $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200;
curl_close($ch);
fclose($fp);
return $download_success;
}
function rec_transfer_import_results($what, $command_id, $recommender_id) {
$apikey = variable_get('rec_transfer_apikey');
if ($what == 'similarity') {
$table = 'recommender_similarity';
$suffix = 'simi';
$field = 'number3';
}
else {
if ($what == 'prediction') {
$table = 'recommender_prediction';
$suffix = 'pred';
$field = 'number4';
}
else {
assert(FALSE);
}
}
$download_file = "{$apikey}-{$command_id}.{$suffix}";
$save_file = variable_get('file_temporary_path', file_directory_temp()) . '/' . $download_file;
$download_success = rec_transfer_download_results($download_file, $save_file);
if ($download_success) {
// FIXME: if new data import fails, we'll need to re-import old data.
// also, perhaps need to think about incremental update. then we don't want to delete old data.
db_delete($table)
->condition('app_id', $recommender_id)
->execute();
$count = 0;
$insert = db_insert($table)
->fields(array(
'app_id',
'source_id',
'target_id',
'score',
'updated',
));
$fp = fopen($save_file, 'r');
while (($row = fgetcsv($fp)) !== FALSE) {
// TODO: summarize data stats
$count++;
$insert
->values(array(
$recommender_id,
$row[0],
$row[1],
$row[2],
$row[3],
));
if ($count % 1000 == 0) {
$insert
->execute();
// periodically flush the results.
}
}
$insert
->execute();
fclose($fp);
async_command_update_command($command_id, array(
$field => $count,
));
return TRUE;
}
else {
return FALSE;
}
}