You are here

rec_transfer.module in Recommender API 6.3

This is the module file for Recommender Data Transfer

File

rec_transfer/rec_transfer.module
View source
<?php

/**
 * @file
 * This is the module file for Recommender Data Transfer
 */

// Implements hook_menu().
function rec_transfer_menu() {
  $items = array();
  $items['admin/settings/recommender/transfer'] = array(
    'title' => 'Cloud Service',
    'description' => 'Configure recommender to transfer data to cloud service using HTTP',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'rec_transfer_settings_form',
    ),
    'access arguments' => array(
      'administer recommender',
    ),
  );
  return $items;
}
function rec_transfer_settings_form() {
  $form = array();
  $form['rec_transfer_endpoint'] = array(
    '#type' => 'textfield',
    '#title' => t('Service endpoint'),
    '#description' => t('Please specify the service endpoint of the cloud.'),
    '#default_value' => variable_get('rec_transfer_endpoint', ''),
    '#required' => TRUE,
  );
  $form['rec_transfer_apikey'] = array(
    '#type' => 'textfield',
    '#title' => t('API Key'),
    '#description' => t('Please specify a valid API key.'),
    '#default_value' => variable_get('rec_transfer_apikey', ''),
    //'#maxlength' => 16,
    '#required' => TRUE,
  );
  return system_settings_form($form);
}

/**
 * Implements hook_cron().
 */
function rec_transfer_cron() {
  rec_transfer_handle_commands(TRUE);
}
function rec_transfer_handle_commands($cron = FALSE) {
  if (is_null(variable_get('rec_transfer_apikey', NULL)) || is_null(variable_get('rec_transfer_endpoint', NULL))) {
    watchdog('rec_transfer', 'You must specify service endpoint and apikey.');
    return;
  }

  // retrieve the ping server commands
  $rows = db_query("SELECT id FROM {async_command} WHERE status IS NULL AND app='recommender' AND command='pingMe()'");
  while ($row = db_fetch_object($rows)) {
    $ready_check = rec_transfer_check_service('ready');
    if (!$ready_check['success']) {
      async_command_update_command($row->id, 0, 'Remote service is not ready. Additional message: ' . $ready_check['message'], time());
    }
    else {
      async_command_update_command($row->id, 1, 'Pong from remote service: ' . $ready_check['message'], time());
    }
  }

  // retrieve a list of commands waiting to be executed.
  $rows = db_query("SELECT id, eid FROM {async_command} WHERE status IS NULL AND app='recommender' AND command='runRecommender()'");
  while ($row = db_fetch_object($rows)) {
    $command = array(
      'command_id' => $row->id,
      'recommender_id' => $row->eid,
    );
    rec_transfer_upload($command);
  }

  // retrieve a list of commands pending or running, check status and download results if necessary
  $rows = db_query("SELECT id, eid FROM {async_command} WHERE status=2 AND app='recommender' AND command='runRecommender()'");
  while ($row = db_fetch_object($rows)) {
    $command = array(
      'command_id' => $row->id,
      'recommender_id' => $row->eid,
    );
    rec_transfer_download($command);
  }
}
function rec_transfer_upload($command) {
  $command_id = $command['command_id'];
  $recommender_id = $command['recommender_id'];

  // check whether the service is available or not.
  $ready_check = rec_transfer_check_service('ready');
  if (!$ready_check['success']) {
    async_command_update_command($command_id, 0, 'Remote service is not ready. Additional message: ' . $ready_check['message'], time());
    return;
  }
  $pref_filename = rec_transfer_output_preference($command_id, $recommender_id);
  $rec_params = db_result(db_query('SELECT params FROM {recommender_app} WHERE id = %d', $recommender_id));

  // D6 uses PHP serialize, needs to change to JSON to accomodate D7
  $rec_params = json_encode(unserialize($rec_params));
  $upload_success = rec_transfer_upload_preference($pref_filename, array(
    'recommender_params' => $rec_params,
  ));

  //db_query("UPDATE {async_command} SET control='REMT', status='PEND' WHERE id=:id", array(':id' => $row->id));
  if ($upload_success) {

    // attention: still remains 'PENDING'?

    //async_command_update_command($command_id, array('status' => 'RUNN')) ;
    async_command_update_command($command_id, 2, 'Data uploading or uploaded. Waiting to download results.', time());
  }
  else {
    async_command_update_command($command_id, 0, 'Data upload error.', time());
  }
}
function rec_transfer_download($command) {
  $command_id = $command['command_id'];
  $recommender_id = $command['recommender_id'];

  // check whether the service is finished or not.
  $status_check = rec_transfer_check_service('status', array(
    'id' => $command_id,
  ));
  if (!$status_check['success']) {
    async_command_update_command($command_id, 0, 'Remote service is not successful running this command. Additional message: ' . $status_check['message'], time());
    return;
  }
  if (isset($status_check['status']) && $status_check['status'] == 'OKOK') {

    // download file.
    $s1 = rec_transfer_import_results('similarity', $command_id, $recommender_id);
    $s2 = rec_transfer_import_results('prediction', $command_id, $recommender_id);
    if ($s1 && $s2) {
      async_command_update_command($command_id, 1, 'Command running successful. Message from remote service: ' . $status_check['message'], time());
    }
    else {

      // FIXME: retry download. don't need to recompute everything again.
      async_command_update_command($command_id, 0, 'Cannot download or import recommendations from remote service. Consider retry.', time());
    }
  }

  // else {
  //   $options = array(
  //     'message' => 'Message from remote service: '. $status_check['message'],
  //     'checkpoint' => time(), // only update checkpoint. not end time.
  //   );
  //   if (!empty($status_check['status'])) {
  //     $options['status'] = $status_check['status'];
  //   }
  //   async_command_update_command($command_id, $options);
  // }
}
function rec_transfer_check_service($what, $options = array()) {

  // since we are using CURL anyway, we'll use CURL instead of drupal_http_request
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
  curl_setopt($ch, CURLOPT_POST, TRUE);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
  curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint', '') . '/check');
  $post = array(
    'apikey' => variable_get('rec_transfer_apikey', ''),
    'what' => $what,
  ) + $options;
  curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
  $response = curl_exec($ch);

  //var_dump($response);
  $data = json_decode($response, TRUE);
  $results = array(
    'success' => $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200,
    'http_code' => curl_getinfo($ch, CURLINFO_HTTP_CODE),
    'response' => $response,
    'data' => $data,
    'message' => '',
  );
  if ($response == FALSE) {
    $results['message'] = 'Cannot connect to the service.';
  }
  else {
    if (isset($data['status'])) {
      $results['status'] = $data['status'];
    }
    $results['message'] = $data['message'];
  }
  curl_close($ch);
  return $results;
}
function rec_transfer_output_preference($command_id, $recommender_id) {
  $apikey = variable_get('rec_transfer_apikey', NULL);

  // drupal_create_filename() is not necessary because we want to overwrite the file if exists.
  $filename = variable_get('file_temporary_path', file_directory_temp()) . "/{$apikey}-{$command_id}";
  $fp = gzopen($filename, 'wb9');
  $rec_params = unserialize(db_result(db_query('SELECT params FROM {recommender_app} WHERE id=%d', $recommender_id)));

  // FIXME: what if $rec_params['table'] == <FILE>?
  if ($rec_params['table'] == '<BUILTIN>') {
    $sql = 'SELECT source_eid, target_eid, score, updated FROM {recommender_preference} WHERE app_id=' . $recommender_id;
    $rec_params['fields'] = array(
      'source_eid',
      'target_eid',
      'score',
      'updated',
    );
  }
  else {
    if (isset($rec_params['sql']) && strtoupper(substr($rec_params['sql'], 0, strlen('SELECT '))) === 'SELECT ') {
      $sql = $rec_params['sql'];
    }
    else {
      $fields = $rec_params['fields'];
      foreach ($fields as $pos => $field) {
        $fields[$pos] = is_null($field) ? 'NULL' : $field;
      }
      $sql = 'SELECT ' . implode(',', $fields) . ' FROM ' . $rec_params['table'];
    }
  }
  $users_dict = array();
  $items_dict = array();
  $rows = db_query($sql);
  while ($row = db_fetch_array($rows)) {
    $line = array(
      $row[$rec_params['fields'][0]],
      $row[$rec_params['fields'][1]],
      is_null($rec_params['fields'][2]) ? NULL : $row[$rec_params['fields'][2]],
      is_null($rec_params['fields'][3]) ? NULL : $row[$rec_params['fields'][3]],
    );
    $users_dict[$line[0]] = 1;
    $items_dict[$line[1]] = 1;

    // TODO: test failuire; also make sure CSV is well formed (eg. double quotes, etc)
    gzwrite($fp, implode(',', $line) . "\n");
  }
  gzclose($fp);

  //print $filename;
  return $filename;
}
function rec_transfer_upload_preference($filename, $options = array()) {

  // TODO: need to check curl first.
  // TODO: could consider to use stream_ copy_ to_ stream instead. drupal_http_client() is not good here because we need to read big file into memory first.
  $ch = curl_init();

  // curl_setopt($ch, CURLOPT_HEADER, FALSE); // TRUE to include the header in the output.
  // curl_setopt($ch, CURLOPT_VERBOSE, FALSE);
  // curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
  curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
  curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint', '') . '/upload');
  curl_setopt($ch, CURLOPT_ENCODING, "gzip");

  // setup compression
  curl_setopt($ch, CURLOPT_POST, TRUE);

  // same as <input type="file" name="file_box">
  $post = array(
    'preference' => "@{$filename}",
  ) + $options;
  curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
  $response = curl_exec($ch);
  $upload_success = $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200;
  curl_close($ch);
  return $upload_success;
}
function rec_transfer_download_results($download_file, $save_file) {
  $fp = fopen($save_file, 'w');
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
  curl_setopt($ch, CURLOPT_ENCODING, "gzip");
  curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint', '') . "/download/{$download_file}");
  curl_setopt($ch, CURLOPT_FILE, $fp);
  $response = curl_exec($ch);
  $download_success = $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200;
  curl_close($ch);
  fclose($fp);
  return $download_success;
}
function rec_transfer_import_results($what, $command_id, $recommender_id) {
  $apikey = variable_get('rec_transfer_apikey', '');
  if ($what == 'similarity') {
    $table = '{recommender_similarity}';
    $suffix = 'simi';
  }
  else {
    if ($what == 'prediction') {
      $table = '{recommender_prediction}';
      $suffix = 'pred';
    }
    else {
      assert(FALSE);
    }
  }
  $download_file = "{$apikey}-{$command_id}.{$suffix}";
  $save_file = variable_get('file_temporary_path', file_directory_temp()) . '/' . $download_file;
  $download_success = rec_transfer_download_results($download_file, $save_file);
  if ($download_success) {

    // FIXME: if new data import fails, we'll need to re-import old data.
    // also, perhaps need to think about incremental update. then we don't want to delete old data.
    db_query("DELETE FROM {$table} WHERE app_id=%d", $recommender_id);
    $fp = fopen($save_file, 'r');
    while (($row = fgetcsv($fp)) !== FALSE) {
      db_query("INSERT INTO {$table}(app_id, source_eid, target_eid, score, updated) VALUES(%d, %d, %d, %f, %d)", $recommender_id, $row[0], $row[1], $row[2], $row[3]);
    }
    fclose($fp);
    return TRUE;
  }
  else {
    return FALSE;
  }
}