You are here

rec_transfer.module in Recommender API 7.4

This is the module file for Recommender Data Transfer

File

rec_transfer/rec_transfer.module
View source
<?php

/**
 * @file
 * This is the module file for Recommender Data Transfer
 */

// Implements hook_menu().
function rec_transfer_menu() {
  $items = array();
  $items['admin/config/search/recommender/transfer'] = array(
    'title' => 'Cloud Service',
    'description' => 'Configure recommender to transfer data to cloud service using HTTP',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'rec_transfer_settings_form',
    ),
    'access arguments' => array(
      'administer recommender',
    ),
  );
  return $items;
}
function rec_transfer_settings_form() {
  $form = array();
  $form['rec_transfer_endpoint'] = array(
    '#type' => 'textfield',
    '#title' => t('Service endpoint'),
    '#description' => t('Please specify the service endpoint of the cloud.'),
    '#default_value' => variable_get('rec_transfer_endpoint', ''),
    '#required' => TRUE,
    '#element_validate' => array(
      'rec_transfer_validate_url',
    ),
  );
  $form['rec_transfer_apikey'] = array(
    '#type' => 'textfield',
    '#title' => t('API Key'),
    '#description' => t('Please specify a valid API key.'),
    '#default_value' => variable_get('rec_transfer_apikey', ''),
    //'#maxlength' => 16,
    '#required' => TRUE,
    '#element_validate' => array(
      'rec_transfer_validate_apikey',
    ),
  );
  return system_settings_form($form);
}
function rec_transfer_validate_url($element, &$form_state) {
  $value = $element['#value'];
  if (!valid_url($value) || strrpos($value, '/') == strlen($value) - 1) {
    form_error($element, t('%name must be a valid URL without trailing slash "/".', array(
      '%name' => $element['#title'],
    )));
  }
}
function rec_transfer_validate_apikey($element, &$form_state) {
  $value = $element['#value'];
  if (strlen($value) != 16) {
    form_error($element, t('%name must be a valid API Key with length 16.', array(
      '%name' => $element['#title'],
    )));
  }
}

/**
 * Implements hook_cron().
 */
function rec_transfer_cron() {
  rec_transfer_handle_commands(TRUE);
}

/**
 * Implements hook_cron_queue_info().
 */
function rec_transfer_cron_queue_info() {
  return array(
    'rec_transfer_upload' => array(
      'worker callback' => 'rec_transfer_upload',
      'time' => 600,
    ),
    'rec_transfer_download' => array(
      'worker callback' => 'rec_transfer_download',
      'time' => 600,
    ),
  );
}
function rec_transfer_handle_commands($cron = FALSE) {
  if (is_null(variable_get('rec_transfer_apikey')) || is_null(variable_get('rec_transfer_endpoint'))) {
    watchdog('rec_transfer', 'You must specify service endpoint and apikey.');
    return;
  }

  // retrieve the ping server commands
  $rows = db_query("SELECT id FROM {async_command} WHERE status IS NULL AND app='recommender' AND command='PingMe'");
  foreach ($rows as $row) {
    $ready_check = rec_transfer_check_service('ready');
    if (!$ready_check['success']) {
      async_command_update_command($row->id, array(
        'status' => 'FAIL',
        'message' => 'Remote service is not ready. Additional message: ' . $ready_check['message'],
        'end' => time(),
      ));
    }
    else {
      async_command_update_command($row->id, array(
        'status' => 'OKOK',
        'message' => 'Pong from remote service: ' . $ready_check['message'],
        'end' => time(),
      ));
    }
  }

  // retrieve a list of commands waiting to be executed.
  $rows = db_query("SELECT id, id1 FROM {async_command} WHERE status IS NULL AND app='recommender' AND command='RunRecommender'");
  $upload_queue = DrupalQueue::get('rec_transfer_upload');
  foreach ($rows as $row) {
    $command = array(
      'command_id' => $row->id,
      'recommender_id' => $row->id1,
    );
    if (!$cron) {
      rec_transfer_upload($command);
    }
    else {
      $upload_queue
        ->createItem($command);
    }
  }

  // retrieve a list of commands pending or running, check status and download results if necessary
  $rows = db_query("SELECT id, id1 FROM {async_command} WHERE status IN ('PEND', 'RUNN') AND app='recommender' AND command='RunRecommender'");
  $download_queue = DrupalQueue::get('rec_transfer_download');
  foreach ($rows as $row) {
    $command = array(
      'command_id' => $row->id,
      'recommender_id' => $row->id1,
    );
    if (!$cron) {
      rec_transfer_download($command);
    }
    else {
      $download_queue
        ->createItem($command);
    }
  }
}
function rec_transfer_upload($command) {
  $command_id = $command['command_id'];
  $recommender_id = $command['recommender_id'];

  // this could get executed from cron, where command_id and recommender_id are not valid
  // attention: perhaps need to check recommender_id too if the recommender_app is disabled where upload is still in process.
  $num = async_command_update_command($command_id, array(
    "control" => 'REMT',
    'status' => 'PEND',
    'start' => time(),
  ));
  if ($num != 1) {
    return;
  }

  // check whether the service is available or not.
  $ready_check = rec_transfer_check_service('ready');
  if (!$ready_check['success']) {
    async_command_update_command($command_id, array(
      'status' => 'FAIL',
      'message' => 'Remote service is not ready. Additional message: ' . $ready_check['message'],
      'end' => time(),
    ));
    return;
  }
  $pref_filename = rec_transfer_output_preference($command_id, $recommender_id);
  $rec_params = db_query('SELECT params FROM {recommender_app} WHERE id = :id', array(
    ':id' => $recommender_id,
  ))
    ->fetchField();
  $upload_success = rec_transfer_upload_preference($pref_filename, array(
    'recommender_params' => $rec_params,
  ));

  //db_query("UPDATE {async_command} SET control='REMT', status='PEND' WHERE id=:id", array(':id' => $row->id));
  if ($upload_success) {

    // attention: still remains 'PENDING'?

    //async_command_update_command($command_id, array('status' => 'RUNN')) ;
  }
}
function rec_transfer_download($command) {
  $command_id = $command['command_id'];
  $recommender_id = $command['recommender_id'];

  // if command_id doesn't exist anymore, simply ignore the rest of the code.
  // attention: perhaps need to check 'recommender_id' too.
  $num = db_query('SELECT COUNT(id) FROM {async_command} WHERE id=:id', array(
    ':id' => $command_id,
  ))
    ->fetchField();
  if ($num != 1) {
    return;
  }

  // check whether the service is finished or not.
  $status_check = rec_transfer_check_service('status', array(
    'id' => $command_id,
  ));
  if (!$status_check['success']) {
    async_command_update_command($command_id, array(
      'status' => 'FAIL',
      'message' => 'Remote service is not successful running this command. Additional message: ' . $status_check['message'],
      'end' => time(),
    ));
    return;
  }
  if (isset($status_check['status']) && $status_check['status'] == 'OKOK') {

    // download file.
    $s1 = rec_transfer_import_results('similarity', $command_id, $recommender_id);
    $s2 = rec_transfer_import_results('prediction', $command_id, $recommender_id);
    if ($s1 && $s2) {
      async_command_update_command($command_id, array(
        'status' => 'OKOK',
        'message' => 'Command running successful. Message from remote service: ' . $status_check['message'],
        'end' => time(),
      ));
    }
    else {

      // FIXME: retry download. don't need to recompute everything again.
      async_command_update_command($command_id, array(
        'status' => 'FAIL',
        'message' => 'Cannot download or import recommendations from remote service. Consider retry.',
        'end' => time(),
      ));
    }
  }
  else {
    $options = array(
      'message' => 'Message from remote service: ' . $status_check['message'],
      'checkpoint' => time(),
    );
    if (!empty($status_check['status'])) {
      $options['status'] = $status_check['status'];
    }
    async_command_update_command($command_id, $options);
  }
}
function rec_transfer_check_service($what, $options = array()) {

  // since we are using CURL anyway, we'll use CURL instead of drupal_http_request
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
  curl_setopt($ch, CURLOPT_POST, TRUE);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
  curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint') . '/check');
  $post = array(
    'apikey' => variable_get('rec_transfer_apikey'),
    'what' => $what,
  ) + $options;
  curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
  $response = curl_exec($ch);

  //var_dump($response);
  $data = drupal_json_decode($response);
  $results = array(
    'success' => $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200,
    'http_code' => curl_getinfo($ch, CURLINFO_HTTP_CODE),
    'response' => $response,
    'data' => $data,
    'message' => '',
  );
  if ($response == FALSE) {
    $results['message'] = 'Cannot connect to the service.';
  }
  else {
    if (isset($data['status'])) {
      $results['status'] = $data['status'];
    }
    $results['message'] = $data['message'];
  }
  curl_close($ch);
  return $results;
}
function rec_transfer_output_preference($command_id, $recommender_id) {
  $apikey = variable_get('rec_transfer_apikey');

  // drupal_create_filename() is not necessary because we want to overwrite the file if exists.
  $filename = variable_get('file_temporary_path', file_directory_temp()) . "/{$apikey}-{$command_id}";
  $fp = gzopen($filename, 'wb9');
  $rec_params = json_decode(db_query('SELECT params FROM {recommender_app} WHERE id=:id', array(
    ':id' => $recommender_id,
  ))
    ->fetchField(), TRUE);

  // FIXME: what if $rec_params['table'] == <FILE>?
  if ($rec_params['table'] == '<BUILTIN>') {
    $sql = 'SELECT source_eid, target_eid, score, updated FROM {recommender_preference} WHERE app_id=' . $recommender_id;
    $rec_params['fields'] = array(
      'source_eid',
      'target_eid',
      'score',
      'updated',
    );
  }
  else {
    if (strtoupper(substr($rec_params['table'], 0, strlen('SELECT '))) === 'SELECT ') {
      $sql = $rec_params['table'];
    }
    else {
      $fields = $rec_params['fields'];
      foreach ($fields as $pos => $field) {
        $fields[$pos] = is_null($field) ? 'NULL' : $field;
      }
      $sql = 'SELECT ' . implode(',', $fields) . ' FROM ' . $rec_params['table'];
    }
  }
  $users_dict = array();
  $items_dict = array();
  $rows = db_query($sql, array(), array(
    'fetch' => PDO::FETCH_ASSOC,
  ));
  foreach ($rows as $row) {
    $line = array(
      $row[$rec_params['fields'][0]],
      $row[$rec_params['fields'][1]],
      is_null($rec_params['fields'][2]) ? NULL : $row[$rec_params['fields'][2]],
      is_null($rec_params['fields'][3]) ? NULL : $row[$rec_params['fields'][3]],
    );
    $users_dict[$line[0]] = 1;
    $items_dict[$line[1]] = 1;

    // TODO: test failuire; also make sure CSV is well formed (eg. double quotes, etc)
    gzwrite($fp, implode(',', $line) . "\n");
  }
  gzclose($fp);

  //print $filename;
  async_command_update_command($command_id, array(
    'number1' => count($users_dict),
    'number2' => count($items_dict),
  ));
  return $filename;
}
function rec_transfer_upload_preference($filename, $options = array()) {

  // TODO: need to check curl first.
  // TODO: could consider to use stream_ copy_ to_ stream instead. drupal_http_client() is not good here because we need to read big file into memory first.
  $ch = curl_init();

  // curl_setopt($ch, CURLOPT_HEADER, FALSE); // TRUE to include the header in the output.
  // curl_setopt($ch, CURLOPT_VERBOSE, FALSE);
  // curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
  curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
  curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint') . '/upload');
  curl_setopt($ch, CURLOPT_ENCODING, "gzip");

  // setup compression
  curl_setopt($ch, CURLOPT_POST, TRUE);

  // same as <input type="file" name="file_box">
  $post = array(
    'preference' => "@{$filename}",
  ) + $options;
  curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
  $response = curl_exec($ch);
  $upload_success = $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200;
  curl_close($ch);
  return $upload_success;
}
function rec_transfer_download_results($download_file, $save_file) {
  $fp = fopen($save_file, 'w');
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_USERAGENT, "Drupal (Recommender)");
  curl_setopt($ch, CURLOPT_ENCODING, "gzip");
  curl_setopt($ch, CURLOPT_URL, variable_get('rec_transfer_endpoint') . "/download/{$download_file}");
  curl_setopt($ch, CURLOPT_FILE, $fp);
  $response = curl_exec($ch);
  $download_success = $response && curl_getinfo($ch, CURLINFO_HTTP_CODE) == 200;
  curl_close($ch);
  fclose($fp);
  return $download_success;
}
function rec_transfer_import_results($what, $command_id, $recommender_id) {
  $apikey = variable_get('rec_transfer_apikey');
  if ($what == 'similarity') {
    $table = 'recommender_similarity';
    $suffix = 'simi';
    $field = 'number3';
  }
  else {
    if ($what == 'prediction') {
      $table = 'recommender_prediction';
      $suffix = 'pred';
      $field = 'number4';
    }
    else {
      assert(FALSE);
    }
  }
  $download_file = "{$apikey}-{$command_id}.{$suffix}";
  $save_file = variable_get('file_temporary_path', file_directory_temp()) . '/' . $download_file;
  $download_success = rec_transfer_download_results($download_file, $save_file);
  if ($download_success) {

    // FIXME: if new data import fails, we'll need to re-import old data.
    // also, perhaps need to think about incremental update. then we don't want to delete old data.
    db_delete($table)
      ->condition('app_id', $recommender_id)
      ->execute();
    $count = 0;
    $insert = db_insert($table)
      ->fields(array(
      'app_id',
      'source_eid',
      'target_eid',
      'score',
      'updated',
    ));
    $fp = fopen($save_file, 'r');
    while (($row = fgetcsv($fp)) !== FALSE) {

      // TODO: summarize data stats
      $count++;
      $insert
        ->values(array(
        $recommender_id,
        $row[0],
        $row[1],
        $row[2],
        $row[3],
      ));
      if ($count % 1000 == 0) {
        $insert
          ->execute();

        // periodically flush the results.
      }
    }
    $insert
      ->execute();
    fclose($fp);
    async_command_update_command($command_id, array(
      $field => $count,
    ));
    return TRUE;
  }
  else {
    return FALSE;
  }
}