You are here

computing.module in Drupal Computing 7.2

Same filename and directory in other branches
  1. 7 computing.module

File

computing.module
View source
<?php

/**
 * Drupal Computing is a framework that facilitates distributed computing between Drupal and external programs written in non-PHP languages such as Java and Python.
 * This is the module file that takes care the Drupal site. The Java/Python client code is located at https://github.com/danithaca/drupal-computing.
 */
define('COMPUTING_MODULE_ADMIN_PATH', 'admin/config/system/computing');
require_once 'computing.entity.inc';

/**
 * Implements hook_help().
 */
function computing_help($path, $args) {
  $output = '';
  switch ($path) {
    case 'admin/help#computing':
    case COMPUTING_MODULE_ADMIN_PATH:
      $output = '<p>' . t('Drupal Computing is a framework that facilitates distributed computing between Drupal and external programs written in non-PHP languages such as Java and Python. Go to !link and submit a command with necessary data to pass on to external programs. Or browse and administer !record.', array(
        '!link' => l('Command Console', COMPUTING_MODULE_ADMIN_PATH . '/list'),
        '!record' => l('the list of Computing Records', COMPUTING_MODULE_ADMIN_PATH . '/records'),
      )) . '</p>';
      break;
  }
  return $output;
}

/**
 * Implements hook_menu().
 */
function computing_menu() {
  $items = array();
  $items[COMPUTING_MODULE_ADMIN_PATH] = array(
    'title' => 'Computing',
    'description' => 'Drupal Computing module overview page.',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'computing_overview_form',
    ),
    'access arguments' => array(
      'administer computing module',
    ),
    'file' => 'computing.admin.inc',
  );
  $items[COMPUTING_MODULE_ADMIN_PATH . '/overview'] = array(
    'type' => MENU_DEFAULT_LOCAL_TASK,
    'title' => 'Overview',
    'weight' => -10,
  );
  $items[COMPUTING_MODULE_ADMIN_PATH . '/list'] = array(
    'type' => MENU_LOCAL_TASK,
    'title' => 'Command Console',
    'description' => 'Submit a command in the form of a computing record for remote agent to process.',
    'page callback' => 'computing_list_command_page',
    'access arguments' => array(
      'administer computing records',
    ),
    'file' => 'computing.admin.inc',
    'weight' => -9,
  );
  $items[COMPUTING_MODULE_ADMIN_PATH . "/add/%/%"] = array(
    'type' => MENU_CALLBACK,
    'title' => 'Add Command Form',
    'description' => 'The form for each command to accept user input.',
    'page callback' => 'computing_add_command',
    'page arguments' => array(
      5,
      6,
    ),
    'access arguments' => array(
      'administer computing records',
    ),
    'file' => 'computing.admin.inc',
  );
  return $items;
}

/**
 * Implements hook_permission().
 */
function computing_permission() {
  $permissions = array(
    'administer computing module' => array(
      'title' => t('Administer computing module'),
      'description' => t('Administer computing module on the module level.'),
    ),
    'administer computing records' => array(
      'title' => t('Administer computing records'),
      'description' => t('View, create, edit and delete all computing records as Drupal entities.'),
    ),
  );
  $permissions['access computing services endpoints'] = array(
    'title' => t('Access computing services endpoints'),
    'description' => t('Allow access computing module services endpoints'),
  );

  //Generate permissions per model

  /*foreach (computing_get_applications() as $app) {
      $app_name = check_plain($app->application);
      $permissions += array(
        "create $app_name computing entity" => array(
          'title' => t('%app_name: Create new computing entity', array('%app_name' => $app->label)),
        ),
        "edit any $app_name computing entity" => array(
          'title' => t('%app_name: Edit any computing entity', array('%app_name' => $app->label)),
        ),
        "view any $app_name computing entity" => array(
          'title' => t('%app_name: View any computing entity', array('%app_name' => $app->label)),
        ),
      );
    }*/
  return $permissions;
}

/**
 * Check user permissions
 * @param $perms mixed: either a string or an array of permissions to check.
 * @param null $account
 * @return bool: TRUE if the user has all permissions, FALSE otherwise.
 */
function computing_access($perms, $account = NULL) {
  if (is_string($perms)) {
    $perms = array(
      $perms,
    );
  }
  $return = TRUE;
  foreach ($perms as $perm) {
    $return = $return && user_access($perm, $account);
    if (!$return) {
      break;
    }
  }
  return $return;
}

/**
 * Get a list of Computing Application entities.
 *
 * @param string $application_name: if specified, then return only the application with the given application name. Or NULL to return all applications.
 * @return array|mixed
 */
function computing_get_applications($application_name = NULL) {

  // entity_load will get the Entity controller for our model entity and call the load
  // function of that object - we are loading entities by name here.
  $applications = entity_load_multiple_by_name('computing_application', isset($application_name) ? array(
    $application_name,
  ) : FALSE);
  return isset($application_name) ? reset($applications) : $applications;
}

/**
 * Create a computing record and save it as an entity.
 *
 * @param $app_name string: the name of the application to process the record, which is also the "bundle" type of the entity.
 * @param $command string: the command to execute in the application.
 * @param $label string: human readable explanation
 * @param $input array: this should be an array to pass to the agent for executing. if it's a special type, override it in $options.
 * @param $options array: extra options to save in the computing record. will override the value data.
 * @return int: the id for the newly created computing record, or FALSE if not successful.
 */
function computing_create($app_name, $command, $label, $input, $options = array()) {

  //drupal_debug(func_get_args(), 'computing_create');

  // initial settings.
  $values = array(
    'application' => $app_name,
    'command' => $command,
    'label' => $label,
    'uid' => user_is_logged_in() ? $GLOBALS['user']->uid : 0,
    'status' => 'RDY',
    'weight' => 0,
  );
  if ($input != NULL) {

    // this is to take care of mixed data type to save in db as blob.
    $values['input'] = drupal_json_encode($input);

    //$values['input'] = $input;
  }

  // set additional settings. will override the other values.
  if (!empty($options)) {
    $values = $options + $values;
  }

  // create the entity in memory
  $record = computing_record_create_entity($values);

  // try to persist.
  entity_save('computing_record', $record);
  if (!empty($record->id)) {

    // only when it's successful should we invoke rules.
    // note: here input is a json encoded string.
    rules_invoke_event('computing_event_created', $record);

    // "id" seems to be a "string", which causes problem in json encoding.
    return (int) $record->id;
  }
  else {
    return FALSE;
  }
}

/**
 * Load a computing entity by id; returns the object.
 * Input/Output will be loaded as object, not as json encoded strings.
 *
 * @param $id int computing record id.
 * @return object the computing entity or false if not exist.
 */
function computing_load($id) {
  $record = entity_load_single('computing_record', $id);
  if (!empty($record->input)) {
    $record->input = drupal_json_decode($record->input);
  }
  if (!empty($record->output)) {
    $record->output = drupal_json_decode($record->output);
  }
  return $record;
}

/**
 * Update an existing computing entity and persist any changes.
 * Input/Output should NOT be encoded as json string before calling this: this function will take care of encoding/decoding
 *
 * @param object $record:
 *   should be a PHP object. however, if it's an array, we'll convert to object first.
 * @return bool TRUE if successfully updated, false if not.
 */
function computing_update($record) {

  //drupal_debug($record, 'computing_update');
  if (is_array($record)) {
    $record = (object) $record;
  }
  if (!empty($record->input)) {
    $record->input = drupal_json_encode($record->input);
  }
  if (!empty($record->output)) {
    $record->output = drupal_json_encode($record->output);
  }
  $record->changed = time();

  // copy from EntityAPIController. set original record. $record->original will be unset in $entity->save()
  // note that here input/output are already encoded into json string.
  if (!isset($record->original)) {
    $record->original = entity_load_unchanged('computing_record', $record->id);
  }

  // this is pass by reference. also note that $record->original is unset in entity_save() so we want to keep it here.
  $original = $record->original;
  $return = (bool) entity_save('computing_record', $record);
  if ($return) {

    // NOTE: here input/output are json encoded strings.
    $record->original = $original;
    rules_invoke_event('computing_event_updated', $record);

    //drupal_debug($original, "computing_update_old");

    //drupal_debug($record, "computing_update_new");
  }
  return $return;
}

/**
 * Only update and persist the given field. This is supposed to save bandwidth between Drupal and Agent by only updating one field instead of the entire object.
 * It only handles the database properties of the entity, or single value field. Use your own functions if need to override to handle more advanced data types.
 *
 * @param $id int: the entity of of the computing record to be updated.
 * @param $field_name string: the name of the field.
 * @param $field_value object: the value of the field. if field name is "input" or "output", do json encode (to avoid json encode, use computing_update() directly instead).
 * @return bool: TRUE if successful. FALSE if not successful.
 */
function computing_update_field($id, $field_name, $field_value) {
  $entity_info = entity_get_info('computing_record');
  $record = computing_load($id);
  if ($record && isset($record->application)) {
    $fields_info = field_info_instances('computing_record', $record->application);
    if (in_array($field_name, $entity_info['schema_fields_sql']['base table'])) {

      // this is a basic property.
      $record->{$field_name} = $field_value;
      return computing_update($record);
    }
    else {
      if (array_key_exists($field_name, $fields_info)) {

        // use $wrapper to avoid direct set multi-value field, etc.
        $wrapper = entity_metadata_wrapper('computing_record', $record);
        $wrapper->{$field_name}
          ->set($field_value);
        return computing_update($wrapper
          ->value());
      }
    }
  }

  // all other cases return FALSE.
  return FALSE;
}

// this is the old approach. use new approach with entity_save() instead.

//function computing_update_field($id, $field_name, $field_value) {

//  $entity_info = entity_get_info('computing');
//
//  // test if the field is in the base table, or is an entity field.
//  if (in_array($field_name, $entity_info['schema_fields_sql']['base table'])) {
//    // handle input/output field
//    if ($field_name == 'input' || $field_name == 'output') {
//      $field_value = drupal_json_encode($field_value);
//    }
//    $changes = array(
//      'id' => $id,
//      $field_name => $field_value,
//      'changed' => time(),
//    );
//    return (bool) drupal_write_record('computing', $changes, 'id');
//  }
//
//  // else, test if it's a valid field
//  $record = computing_load($id);
//  if ($record && isset($record->application)) {
//    $fields_info = field_info_instances('computing', $record->application);
//    if (array_key_exists($field_name, $fields_info)) {
//      // field exists here.
//      $wrapper = entity_metadata_wrapper('computing', $record);
//      $wrapper->{$field_name}->set($field_value);
//      $wrapper->changed = time();
//      $result = $wrapper->save();
//      return !empty($result);
//      //return computing_update($record);
//    }
//  }
//
//  // if it's not set previously, return FALSE.
//  return FALSE;

//}

/**
 * Return an object and mark it as being processed. Use lock system to make sure status is not changed during the process.
 * Another approach is to use while(TRUE) following the logic from SystemQueue:claimItem().
 *
 * @param $app_name string: the name of the application to claim an record on.
 * @param $record_id int: an optional record ID to specify whether to claim a specific record or any record of an application.
 * @return object: one computing record from the application in "RDY" status, or FALSE if not found.
 */
function computing_claim($app_name, $record_id = NULL) {
  while (TRUE) {
    if (lock_acquire('computing_claim', 10)) {
      $query = db_select('computing_record')
        ->fields('computing_record', array(
        'id',
      ))
        ->condition('application', $app_name)
        ->condition('status', 'RDY')
        ->orderBy('weight')
        ->orderBy('created')
        ->range(0, 1);
      if ($record_id) {
        $query
          ->condition('id', $record_id);
      }
      $candidate_id = $query
        ->execute()
        ->fetchField();
      if ($candidate_id) {
        computing_update_field($candidate_id, 'status', 'RUN');
        $return = computing_load($candidate_id);
        rules_invoke_event('computing_event_claimed', $return);
      }
      else {
        $return = FALSE;
      }

      // always release lock after operations.
      lock_release('computing_claim');
      return $return;
    }
    else {

      // if lock_acquire fails, wait a few seconds and try again.
      lock_wait('computing_claim', 5);
    }
  }

  // end of while(TRUE)
}

// Below is an obsolete implementation using the idea from DrupalSystemQueue.class.

//function computing_claim($app_name, $record_id=NULL) {

//  //drupal_debug(func_get_args(), "computing_claim");
//  while (TRUE) {
//    // get the id of a qualified record.
//    $candidate_id = db_query_range("SELECT id FROM {computing_record} WHERE application = :application AND status = 'RDY' ORDER BY weight ASC, created ASC", 0, 1, array(':application' => $app_name))->fetchField();
//
//    if ($candidate_id) {
//      // update only when the status is still 'RDY'.
//      $update = db_update('computing')
//        ->fields(array('status' => 'RUN'))
//        ->condition('id', $candidate_id)
//        ->condition('status', 'RDY');
//
//      // this is only successful when the status is still 'RDY', ie, not claimed by another agent.
//      if ($update->execute()) {
//        // update "status" field will not lead to real execution, but this will change "changed" field so it can trigger "save" events.
//        computing_update_field($candidate_id, 'status', 'RUN');
//        $record = computing_load($candidate_id);
//        rules_invoke_event('computing_event_claimed', $record);
//        return $record;
//      }
//      // no "else" here to do the loop again.
//    }
//    else {
//      // if not successful, return FALSE
//      return FALSE;
//    }
//  }

//}

/**
 * Release the claimed record, assuming that the record is claimed by the
 *
 * @param $id The record ID to be released back to 'RDY' status.
 * @return boolean: TRUE if successfully released the record. FALSE otherwise.
 */
function computing_release($id) {
  $record = computing_load($id);
  if ($record && isset($record->status) && $record->status == 'RUN') {
    rules_invoke_event('computing_event_released', $record);
    return computing_update_field($id, 'status', 'RDY');
  }
  else {
    return FALSE;
  }
}

/**
 * When agent finishes a computational task, call this function to persist results back to Drupal.
 *
 * @param $id
 * @param $status
 * @param $message
 * @param mixed $output
 *   Results in object or array, will be encoded in json before persistence.
 * @param array $options
 * @return bool
 */
function computing_finish($id, $status, $message, $output, $options = array()) {

  // check computing record status. If it's not set as Running from computing_claim(), then abort.
  $record = computing_load($id);
  if (!$record || $record->status != 'RUN') {
    return FALSE;
  }
  $record->status = $status;
  $record->message = $message;
  $record->output = $output;
  if (!empty($options) && is_array($options)) {
    foreach ($options as $field_name => $field_value) {
      _computing_set_field($record, $field_name, $field_value);
    }
  }
  $return = computing_update($record);
  if ($return) {

    // needs to reload the record, because input/output would have been encoded inside of computing_update().
    rules_invoke_event('computing_event_finished', computing_load($id));
  }
  return $return;
}

/**
 * Set the value of $record entity.
 * TODO: This is supposed to be done entirely with EntityWrapper. don't know how.
 *
 * @param $record
 * @param $field_name
 * @param $field_value
 */
function _computing_set_field($record, $field_name, $field_value) {
  $entity_info = entity_get_info('computing_record');
  $fields_info = field_info_instances('computing_record', $record->application);
  if (in_array($field_name, $entity_info['schema_fields_sql']['base table'])) {

    // this is a basic property.
    $record->{$field_name} = $field_value;
  }
  else {
    if (array_key_exists($field_name, $fields_info)) {

      // use $wrapper to avoid direct set multi-value field, etc.
      $wrapper = entity_metadata_wrapper('computing_record', $record);
      $wrapper->{$field_name}
        ->set($field_value);
    }
  }
}

/**
 * Return the record that has run successfully most recently.
 */
function computing_last_success($app_name, $command) {
  $query = db_select('computing_record')
    ->fields('computing_record', array(
    'id',
  ))
    ->condition('application', $app_name)
    ->condition('command', $command)
    ->condition('status', 'SCF')
    ->orderBy('changed', 'DESC')
    ->orderBy('id', 'DESC')
    ->range(0, 1);
  $id = $query
    ->execute()
    ->fetchField();
  if ($id) {
    return computing_load($id);
  }
  else {
    return FALSE;
  }
}

/**
 * Expose site info through Services.module.
 */
function computing_site_info() {
  return array(
    'drupal_version' => VERSION,
    'drupal_time' => time(),
  );
}

/**
 * Check the status of a computing record, bypassing Drupal cache.
 */
function computing_get_status($id) {
  return db_query('SELECT status FROM {computing_record} WHERE id = :id', array(
    ':id' => $id,
  ))
    ->fetchField();
}

/**
 * Get the readable name for Computing Record status.
 */
function computing_status_readable($acronym = NULL) {
  $table = array(
    'RDY' => t('Ready'),
    // newly created record to be processed.
    'SCF' => t('Successful'),
    // processed successful
    'FLD' => t('Failed'),
    // for some reason processing is not successful

    //'DON' => t('Done'),         // process done. does not indicate whether it's successful or not.
    'ABD' => t('Aborted'),
    // the application didn't generate error, but computing is aborted for other reasons such as timeout.
    'RUN' => t('Running'),
  );
  if ($acronym === NULL) {
    return $table;
  }
  else {
    return array_key_exists($acronym, $table) ? $table[$acronym] : FALSE;
  }
}

/**
 * Implements hook_ctools_plugin_api().
 * Exposes services.module support.
 */
function computing_ctools_plugin_api($owner, $api) {
  if ($owner == 'services' && $api == 'services') {
    return array(
      'version' => 3,
      'file' => 'computing.services.inc',
    );
  }
}

/**
 * Implements hook_views_api().
 */
function computing_views_api() {
  return array(
    'api' => 3,
    'path' => drupal_get_path('module', 'computing'),
  );
}

/**
 * Implements hook_computing_data().
 * See README for details.
 */
function computing_computing_data() {
  return array(
    'computing' => array(
      'echo' => array(
        'title' => 'Echo message',
        'description' => 'Echos input message as output message.',
        //'form callback' => 'computing_command_form',
        'form elements callback' => 'computing_echo_optional_form_elements',
        'result callback' => 'computing_echo_handle_result',
        'file' => array(
          'type' => 'inc',
          'module' => 'computing',
          'name' => 'computing.admin',
        ),
      ),
    ),
  );
}

/**
 * A helper function to get command definition in hook_computing_data().
 * No need to use cache because it's not going to be accessed a lot.
 */
function computing_fetch_data($app_name = NULL, $command_name = NULL) {
  $computing_data = module_invoke_all("computing_data");
  drupal_alter('computing_data', $computing_data);
  if ($app_name != NULL && $command_name != NULL) {
    if (isset($computing_data[$app_name][$command_name]) && is_array($computing_data[$app_name][$command_name])) {

      // check data and return modified data.
      $command_data = $computing_data[$app_name][$command_name];
      if (empty($command_data['title'])) {
        $command_data['title'] = strtoupper($command_name);
      }
      if (empty($command_data['description'])) {
        $command_data['description'] = t('A command for !command', array(
          '!command' => $command_data['title'],
        ));
      }
      return $command_data;
    }

    // all other cases return FALSE.
    return FALSE;
  }
  else {
    return $computing_data;
  }
}

Functions

Namesort descending Description
computing_access Check user permissions
computing_claim Return an object and mark it as being processed. Use lock system to make sure status is not changed during the process. Another approach is to use while(TRUE) following the logic from SystemQueue:claimItem().
computing_computing_data Implements hook_computing_data(). See README for details.
computing_create Create a computing record and save it as an entity.
computing_ctools_plugin_api Implements hook_ctools_plugin_api(). Exposes services.module support.
computing_fetch_data A helper function to get command definition in hook_computing_data(). No need to use cache because it's not going to be accessed a lot.
computing_finish When agent finishes a computational task, call this function to persist results back to Drupal.
computing_get_applications Get a list of Computing Application entities.
computing_get_status Check the status of a computing record, bypassing Drupal cache.
computing_help Implements hook_help().
computing_last_success Return the record that has run successfully most recently.
computing_load Load a computing entity by id; returns the object. Input/Output will be loaded as object, not as json encoded strings.
computing_menu Implements hook_menu().
computing_permission Implements hook_permission().
computing_release Release the claimed record, assuming that the record is claimed by the
computing_site_info Expose site info through Services.module.
computing_status_readable Get the readable name for Computing Record status.
computing_update Update an existing computing entity and persist any changes. Input/Output should NOT be encoded as json string before calling this: this function will take care of encoding/decoding
computing_update_field Only update and persist the given field. This is supposed to save bandwidth between Drupal and Agent by only updating one field instead of the entire object. It only handles the database properties of the entity, or single value field. Use your own…
computing_views_api Implements hook_views_api().
_computing_set_field Set the value of $record entity. TODO: This is supposed to be done entirely with EntityWrapper. don't know how.

Constants

Namesort descending Description
COMPUTING_MODULE_ADMIN_PATH Drupal Computing is a framework that facilitates distributed computing between Drupal and external programs written in non-PHP languages such as Java and Python. This is the module file that takes care the Drupal site. The Java/Python client code is…