View source  
  <?php
function nodejs_generate_content_token() {
  return _drupal_hmac_base64(_drupal_random_bytes(512), drupal_get_private_key() . _drupal_get_hash_salt());
}
function nodejs_send_content_channel_message($message) {
  Nodejs::sendContentTokenMessage($message);
}
function nodejs_send_content_channel_token($channel, $anonymous_only = FALSE) {
  $message = (object) array(
    'token' => nodejs_generate_content_token(),
    'anonymousOnly' => $anonymous_only,
    'channel' => $channel,
  );
  $response = Nodejs::sendContentToken($message);
  if (isset($response->error)) {
    $args = array(
      '%token' => $message->token,
      '%code' => $response->code,
      '%error' => $response->error,
    );
    watchdog('nodejs', t('Error sending content token "%token" to the Node.js server: [%code] %error', $args));
    return FALSE;
  }
  
  drupal_add_js(array(
    'nodejs' => array(
      'contentTokens' => array(
        $channel => $message->token,
      ),
    ),
  ), array(
    'type' => 'setting',
  ));
  
  return $message->token;
}
function nodejs_kick_user($uid) {
  $response = Nodejs::kickUser($uid);
  if (isset($response->error)) {
    watchdog('nodejs', t('Error kicking uid "%uid" from the Node.js server: [%code] %error', array(
      '%uid' => $uid,
      '%code' => $response->code,
      '%error' => $response->error,
    )));
    return FALSE;
  }
  else {
    return $response;
  }
}
function nodejs_logout_user($token) {
  $response = Nodejs::logoutUser($token);
  if (isset($response->error)) {
    watchdog('nodejs', t('Error logging out token "%token" from the Node.js server: [%code] %error', array(
      '%token' => $token,
      '%code' => $response->code,
      '%error' => $response->error,
    )));
    return FALSE;
  }
  else {
    return $response;
  }
}
function nodejs_set_user_presence_list($uid, array $uids) {
  $response = Nodejs::setUserPresenceList($uid, $uids);
  if (isset($response->error)) {
    watchdog('nodejs', t('Error setting user presence list for uid "%uid", error from the Node.js server: [%code] %error', array(
      '%uid' => $uid,
      '%code' => $response->code,
      '%error' => $response->error,
    )));
    return FALSE;
  }
  else {
    return $response;
  }
}
function nodejs_broadcast_message($subject, $body) {
  $message = (object) array(
    'broadcast' => TRUE,
    'data' => (object) array(
      'subject' => $subject,
      'body' => $body,
    ),
    'channel' => 'nodejs_notify',
  );
  nodejs_enqueue_message($message);
}
function nodejs_send_channel_message($channel, $subject, $body) {
  $message = (object) array(
    'broadcast' => FALSE,
    'data' => (object) array(
      'subject' => $subject,
      'body' => $body,
    ),
    'channel' => $channel,
  );
  nodejs_enqueue_message($message);
}
function nodejs_send_user_message($uid, $subject, $body) {
  $message = (object) array(
    'broadcast' => FALSE,
    'data' => (object) array(
      'subject' => $subject,
      'body' => $body,
    ),
    'channel' => 'nodejs_user_' . $uid,
    'callback' => 'nodejsNotify',
  );
  nodejs_enqueue_message($message);
}
function nodejs_send_user_message_multiple($uids, $subject, $body) {
  if (!is_array($uids)) {
    $uids = explode(',', $uids);
  }
  foreach ($uids as $uid) {
    nodejs_send_user_message($uid, $subject, $body);
  }
}
function nodejs_send_role_message($role_name, $subject, $body) {
  $data = db_query("SELECT u.uid FROM {users} as u LEFT JOIN {users_roles} as ur on ur.uid = u.uid LEFT JOIN {role} as r on ur.rid = r.rid");
  while ($row = db_fetch_object($data)) {
    $uids[] = $row->uid;
  }
  nodejs_send_user_message_multiple($uids, $subject, $body);
}
function nodejs_init() {
  register_shutdown_function(array(
    'nodejs',
    'sendMessages',
  ));
  $_SESSION['nodejs_config'] = $nodejs_config = nodejs_get_config();
  if (isset($nodejs_config['serviceKey'])) {
    unset($nodejs_config['serviceKey']);
  }
  $socket_io_config = nodejs_get_socketio_js_config($nodejs_config);
  $hackity_hack = <<<JS
//--><!]]>
</script>
<script type="text/javascript" src="{<span class="php-variable">$socket_io_config</span>[<span class="php-string">'path'</span>]}"></script>
<script type="text/javascript">
<!--//--><![CDATA[//><!--]]
JS;
  drupal_add_js($hackity_hack, 'inline', 'header');
  drupal_add_js(drupal_get_path('module', 'nodejs') . '/nodejs.js', 'module', 'footer');
  if (isset($nodejs_config)) {
    foreach ($nodejs_config as $key => $value) {
      drupal_add_js(array(
        'nodejs' => array(
          $key => $value,
        ),
      ), 'setting');
    }
  }
  foreach (nodejs_get_js_handlers() as $handler_file) {
    drupal_add_js($handler_file, 'file', 'footer', FALSE, TRUE);
  }
}
function nodejs_get_socketio_js_config($nodejs_config) {
  $socket_io_config = array(
    'path' => variable_get('nodejs_socket_io_path', FALSE),
    'type' => variable_get('nodejs_socket_io_type', 'external'),
  );
  if (!$socket_io_config['path']) {
    $socket_io_config['path'] = $nodejs_config['scheme'] . '://' . $nodejs_config['host'] . ':' . $nodejs_config['port'] . $nodejs_config['resource'] . '/socket.io.js';
  }
  return $socket_io_config;
}
function nodejs_get_js_handlers() {
  $handlers = module_invoke_all('nodejs_handlers_info');
  drupal_alter('nodejs_js_handlers', $handlers);
  return $handlers;
}
function nodejs_menu() {
  return array(
    'admin/settings/nodejs' => array(
      'title' => 'Nodejs',
      'description' => t('Configure nodejs module.'),
      'position' => 'left',
      'page callback' => 'system_admin_menu_block_page',
      'access arguments' => array(
        'access administration pages',
      ),
      'file' => 'system.admin.inc',
      'file path' => drupal_get_path('module', 'system'),
    ),
    'admin/settings/nodejs/config' => array(
      'title' => t('Configuration'),
      'description' => t('Adjust node.js settings.'),
      'page callback' => 'drupal_get_form',
      'page arguments' => array(
        'nodejs_settings',
      ),
      'access arguments' => array(
        'administer site configuration',
      ),
      'file' => 'nodejs.admin.inc',
    ),
    'nodejs/message' => array(
      'title' => t('Message from Node.js server'),
      'page callback' => 'nodejs_message_handler',
      'access callback' => TRUE,
      'type' => MENU_CALLBACK,
    ),
    'nodejs/user/channel/add' => array(
      'title' => t('Add a channel to the Node.js server'),
      'page callback' => 'drupal_get_form',
      'page arguments' => array(
        'nodejs_add_user_to_channel_form',
      ),
      'access callback' => TRUE,
      'type' => MENU_CALLBACK,
    ),
  );
}
function nodejs_get_channels() {
  $response = Nodejs::getChannels();
  if (isset($response->error)) {
    watchdog('nodejs', t('Error getting channel list from Node.js server: [%code] %error', array(
      '%code' => $response->code,
      '%error' => $response->error,
    )));
    return array();
  }
  else {
    $channels = json_decode($response->data);
    return (array) $channels;
  }
}
function nodejs_add_user_to_channel_form() {
  $form = array();
  $form['nodejs_uid'] = array(
    '#type' => 'textfield',
    '#description' => t('The user uid to add to a channel.'),
    '#title' => t('User uid to add'),
  );
  $form['nodejs_channel'] = array(
    '#type' => 'textfield',
    '#description' => t('The name of the channel to give a user access to.'),
    '#title' => t('Channel to add'),
  );
  $form['nodejs_submit'] = array(
    '#type' => 'submit',
    '#value' => t('Add user'),
  );
  return $form;
}
function nodejs_add_user_to_channel_form_submit($form, &$form_state) {
  $values = (object) $form_state['values'];
  if (nodejs_add_user_to_channel($values->nodejs_uid, $values->nodejs_channel)) {
    drupal_set_message(t("Added uid %uid to %channel.", array(
      '%uid' => $values->nodejs_uid,
      '%channel' => $values->nodejs_channel,
    )));
  }
  else {
    drupal_set_message(t("Failed to add uid %uid to %channel.", array(
      '%uid' => ${$values}->nodejs_uid,
      '%channel' => $values->nodejs_channel,
    )), 'error');
  }
}
function nodejs_add_user_to_channel_form_validate($form, &$form_state) {
  $values = (object) $form_state['values'];
  if (!preg_match('/^\\d+$/', $values->nodejs_uid)) {
    form_set_error('nodejs_uid', t('Invalid uid - please enter a numeric uid.'));
  }
  if (!preg_match('/^([a-z0-9_]+)$/i', $values->nodejs_channel)) {
    form_set_error('nodejs_channel', t('Invalid channel name - only numbers, letters and underscores are allowed.'));
  }
}
function nodejs_enqueue_message(StdClass $message) {
  Nodejs::enqueueMessage($message);
}
function nodejs_send_message(StdClass $message) {
  return Nodejs::sendMessage($message);
}
function nodejs_nodejs_user_channels($account) {
  if (variable_get('nodejs_enable_userchannel', TRUE) && $account->uid) {
    return array(
      'nodejs_user_' . $account->uid,
    );
  }
  return array();
}
function nodejs_user_logout($account) {
  nodejs_logout_user($_SESSION['nodejs_config']['authToken']);
}
function nodejs_is_valid_service_key($service_key) {
  return $service_key == variable_get('nodejs_config_serviceKey', '');
}
function nodejs_message_handler() {
  if (!isset($_POST['serviceKey']) || !nodejs_is_valid_service_key($_POST['serviceKey'])) {
    drupal_json(array(
      'error' => t('Invalid service key.'),
    ));
    exit;
  }
  if (!isset($_POST['messageJson'])) {
    drupal_json(array(
      'error' => t('No message.'),
    ));
    exit;
  }
  $message = json_decode($_POST['messageJson'], TRUE);
  $response = array();
  switch ($message['messageType']) {
    case 'authenticate':
      $response = nodejs_auth_check($message);
      break;
    case 'userOffline':
      nodejs_user_set_offline($message['uid']);
      break;
    default:
      $handlers = array();
      foreach (module_implements('nodejs_message_callback') as $module) {
        $function = $module . '_nodejs_message_callback';
        $handlers += $function($message['messageType']);
      }
      foreach ($handlers as $callback) {
        $callback($message, $response);
      }
  }
  drupal_alter('nodejs_message_response', $response, $message);
  drupal_json($response ? $response : array(
    'error' => t('Not implemented'),
  ));
  exit;
}
function nodejs_auth_check($message) {
  $uid = db_result(db_query("SELECT uid FROM {sessions} WHERE MD5(sid) = '%s'", $message['authToken']));
  $auth_user = $uid > 0 ? user_load($uid) : drupal_anonymous_user();
  $auth_user->authToken = $message['authToken'];
  $auth_user->nodejsValidAuthToken = $uid !== FALSE;
  $auth_user->clientId = $message['clientId'];
  if ($auth_user->nodejsValidAuthToken) {
    
    $auth_user->channels = array();
    foreach (module_implements('nodejs_user_channels') as $module) {
      $function = $module . '_nodejs_user_channels';
      foreach ($function($auth_user) as $channel) {
        $auth_user->channels[] = $channel;
      }
    }
    
    $auth_user->presenceUids = array_unique(module_invoke_all('nodejs_user_presence_list', $auth_user));
    $nodejs_config = nodejs_get_config();
    $auth_user->serviceKey = $nodejs_config['serviceKey'];
    drupal_set_header('NodejsServiceKey: ' . $nodejs_config['serviceKey']);
    drupal_alter('nodejs_auth_user', $auth_user);
    if ($auth_user->uid) {
      nodejs_user_set_online($auth_user->uid);
    }
    $auth_user->contentTokens = isset($message['contentTokens']) ? $message['contentTokens'] : array();
  }
  return $auth_user;
}
function nodejs_user_set_online($uid) {
  db_query('INSERT INTO {nodejs_presence} (uid, login_time) VALUES (%d, %d)', $uid, time());
}
function nodejs_user_set_offline($uid) {
  db_query('DELETE FROM {nodejs_presence} WHERE uid = %d', $uid);
}
function nodejs_get_config() {
  $defaults = array(
    'scheme' => variable_get('nodejs_server_scheme', 'http'),
    'secure' => variable_get('nodejs_server_scheme', 'http') == 'https' ? 1 : 0,
    'host' => variable_get('nodejs_config_host', 'localhost'),
    'port' => variable_get('nodejs_config_port', '8080'),
    'resource' => variable_get('nodejs_config_resource', '/socket.io'),
    'authToken' => md5(session_id()),
    'serviceKey' => variable_get('nodejs_config_serviceKey', ''),
  );
  return variable_get('nodejs_config', array()) + $defaults;
}
function nodejs_get_url($config, $callback = '') {
  return $config['scheme'] . '://' . $config['host'] . ':' . $config['port'] . '/' . $callback;
}
function nodejs_remove_user_from_channel($uid, $channel) {
  $result = Nodejs::removeUserFromChannel($uid, $channel);
  if (isset($result->error)) {
    $params = array(
      '%uid' => $uid,
      '%channel' => $channel,
      '%code' => $result->code,
      '%error' => $result->error,
    );
    watchdog('nodejs', t('Error removing user %uid from channel %channel on Node.js server: [%code] %error', $params));
    return (object) array();
  }
  else {
    return TRUE;
  }
}
function nodejs_add_user_to_channel($uid, $channel) {
  $result = Nodejs::addUserToChannel($uid, $channel);
  if (isset($result->error)) {
    $params = array(
      '%uid' => $uid,
      '%channel' => $channel,
      '%code' => $result->code,
      '%error' => $result->error,
    );
    watchdog('nodejs', t('Error adding user %uid to channel %channel on Node.js server: [%code] %error', $params));
    return (object) array();
  }
  else {
    return TRUE;
  }
}
function nodejs_get_client_socket_id() {
  $client_socket_id = isset($_POST['nodejs_client_socket_id']) ? $_POST['nodejs_client_socket_id'] : '';
  return preg_match('/^\\d+$/', $client_socket_id) ? $client_socket_id : '';
}
class Nodejs {
  public static $messages = array();
  public static $config = NULL;
  public static $baseUrl = NULL;
  public static $headers = NULL;
  public static function initConfig() {
    if (!isset(self::$config)) {
      self::$config = nodejs_get_config();
      self::$headers = array(
        'NodejsServiceKey' => self::$config['serviceKey'],
      );
      self::$baseUrl = nodejs_get_url(self::$config);
    }
  }
  public static function getMessages() {
    return self::$messages;
  }
  public static function enqueueMessage(StdClass $message) {
    self::$messages[] = $message;
  }
  public static function sendMessages() {
    foreach (self::$messages as $message) {
      self::sendMessage($message);
    }
  }
  public static function sendMessage(StdClass $message) {
    self::initConfig();
    drupal_alter('nodejs_message', $message);
    $message->clientSocketId = str_replace("/", "", nodejs_get_client_socket_id());
    $request_method = 'POST';
    $request_retry = 3;
    $data = json_encode($message);
    return drupal_http_request(self::$baseUrl . 'nodejs/publish', self::$headers, $request_method, $data, $request_retry);
  }
  public static function setUserPresenceList($uid, array $uids) {
    self::initConfig();
    return drupal_http_request(self::$baseUrl . "nodejs/user/presence-list/{$uid}/" . implode(',', $uids), self::$headers);
  }
  public static function logoutUser($token) {
    self::initConfig();
    return drupal_http_request(self::$baseUrl . "nodejs/user/logout/{$token}", self::$headers);
  }
  public static function sendContentTokenMessage($message) {
    self::initConfig();
    $message->clientSocketId = nodejs_get_client_socket_id();
    $request_method = 'POST';
    $request_retry = 3;
    $data = json_encode($message);
    return drupal_http_request(self::$baseUrl . 'nodejs/content/token/message', self::$headers, $request_method, $data, $request_retry);
  }
  public static function sendContentToken($message) {
    self::initConfig();
    $request_method = 'POST';
    $request_retry = 3;
    $data = json_encode($message);
    return drupal_http_request(self::$baseUrl . 'nodejs/content/token', self::$headers, $request_method, $data, $request_retry);
  }
  public static function kickUser($uid) {
    self::initConfig();
    return drupal_http_request(self::$baseUrl . "nodejs/user/kick/{$uid}", self::$headers);
  }
  public static function addUserToChannel($uid, $channel) {
    self::initConfig();
    return drupal_http_request(self::$baseUrl . "nodejs/user/channel/add/{$channel}/{$uid}", self::$headers);
  }
  public static function removeUserFromChannel($uid, $channel) {
    self::initConfig();
    return drupal_http_request(self::$baseUrl . "nodejs/user/channel/remove/{$channel}/{$uid}", self::$headers);
  }
}
function _drupal_hmac_base64($data, $key) {
  $hmac = base64_encode(hash_hmac('sha256', $data, $key, TRUE));
  
  return strtr($hmac, array(
    '+' => '-',
    '/' => '_',
    '=' => '',
  ));
}
function _drupal_random_bytes($count) {
  
  static $random_state, $bytes;
  
  if (!isset($random_state)) {
    $random_state = print_r($_SERVER, TRUE);
    if (function_exists('getmypid')) {
      
      $random_state .= getmypid();
    }
    $bytes = '';
  }
  if (strlen($bytes) < $count) {
    
    if ($fh = @fopen('/dev/urandom', 'rb')) {
      
      $bytes .= fread($fh, max(4096, $count));
      fclose($fh);
    }
    
    while (strlen($bytes) < $count) {
      $random_state = hash('sha256', microtime() . mt_rand() . $random_state);
      $bytes .= hash('sha256', mt_rand() . $random_state, TRUE);
    }
  }
  $output = substr($bytes, 0, $count);
  $bytes = substr($bytes, $count);
  return $output;
}
function _drupal_get_hash_salt() {
  global $drupal_hash_salt, $db_url;
  
  return empty($drupal_hash_salt) ? hash('sha256', serialize($db_url)) : $drupal_hash_salt;
}