View source
<?php
define('NODEJS_SERVER_VERSION', '1.0.9');
define('NODEJS_SERVER_NPM_URL', 'https://registry.npmjs.org/drupal-node.js');
function nodejs_generate_content_token() {
return drupal_hmac_base64(drupal_random_bytes(512), drupal_get_private_key() . drupal_get_hash_salt());
}
function nodejs_send_content_channel_message($message) {
Nodejs::sendContentTokenMessage($message);
}
function nodejs_permission() {
return array(
'add users to nodejs channels' => array(
'title' => t('Add users to the Node.js server channels'),
'description' => t('Allows to add users manually to any channel in the Node.js server.'),
),
);
}
function nodejs_send_content_channel_token($channel, $notify_on_disconnect = FALSE) {
$message = (object) array(
'token' => nodejs_generate_content_token(),
'channel' => $channel,
'notifyOnDisconnect' => $notify_on_disconnect,
);
if ($node_response = Nodejs::sendContentToken($message)) {
if ($node_response->status == 'success') {
drupal_add_js(array(
'nodejs' => array(
'contentTokens' => array(
$channel => $message->token,
),
),
), array(
'type' => 'setting',
));
$node_response->token = $message->token;
return $node_response;
}
else {
watchdog('nodejs', 'Error sending content channel token for channel "%channel". Node.js server response: %error', array(
'%channel' => $channel,
'%error' => $node_response->error,
));
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_get_content_channel_users($channel) {
$message = (object) array(
'channel' => $channel,
);
if ($node_response = Nodejs::getContentTokenUsers($message)) {
if (isset($node_response->error)) {
watchdog('nodejs', 'Error getting content channel users for channel "%channel" on the Node.js server. Server response: %error', array(
'%channel' => $channel,
'%error' => $node_response->error,
));
return FALSE;
}
else {
return array(
'uids' => !empty($node_response->users->uids) ? $node_response->users->uids : array(),
'authTokens' => !empty($node_response->users->authTokens) ? $node_response->users->authTokens : array(),
);
}
}
else {
return FALSE;
}
}
function nodejs_kick_user($uid) {
if ($node_response = Nodejs::kickUser($uid)) {
if ($node_response->status == 'success') {
return TRUE;
}
else {
watchdog('nodejs', 'Error kicking uid "%uid" from the Node.js server. Server response: %error', array(
'%uid' => $uid,
'%error' => $node_response->error,
));
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_logout_user($token) {
if ($node_response = Nodejs::logoutUser($token)) {
if ($node_response->status == 'success') {
return TRUE;
}
else {
watchdog('nodejs', 'Error logging out token "%token" from the Node.js server. Server response: %error', array(
'%token' => $token,
'%error' => $node_response->error,
));
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_set_user_presence_list($uid, array $uids) {
if ($node_response = Nodejs::setUserPresenceList($uid, $uids)) {
if ($node_response->status == 'success') {
return TRUE;
}
else {
watchdog('nodejs', 'Error setting user presence list for uid "%uid" on the Node.js server. Server response: %error', array(
'%uid' => $uid,
'%error' => $node_response->error,
));
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_broadcast_message($subject, $body) {
$message = (object) array(
'broadcast' => TRUE,
'data' => (object) array(
'subject' => $subject,
'body' => $body,
),
'channel' => 'nodejs_notify',
);
nodejs_enqueue_message($message);
}
function nodejs_send_channel_message($channel, $subject, $body) {
$message = (object) array(
'data' => (object) array(
'subject' => $subject,
'body' => $body,
),
'channel' => $channel,
);
nodejs_enqueue_message($message);
}
function nodejs_send_user_message($uid, $subject, $body) {
$message = (object) array(
'data' => (object) array(
'subject' => $subject,
'body' => $body,
),
'channel' => 'nodejs_user_' . $uid,
'callback' => 'nodejsNotify',
);
nodejs_enqueue_message($message);
}
function nodejs_send_user_message_multiple($uids, $subject, $body) {
if (!is_array($uids)) {
$uids = explode(',', $uids);
}
foreach ($uids as $uid) {
nodejs_send_user_message($uid, $subject, $body);
}
}
function nodejs_send_role_message($role_name, $subject, $body) {
$query = db_select('users', 'u');
$query
->join('users_roles', 'ur', 'ur.uid = u.uid');
$query
->join('role', 'r', 'ur.rid = r.rid');
$uids = $query
->fields('u', array(
'uid',
))
->condition('r.name', $role_name)
->execute()
->fetchCol();
nodejs_send_user_message_multiple($uids, $subject, $body);
}
function nodejs_init() {
drupal_register_shutdown_function(array(
'Nodejs',
'sendMessages',
));
if (nodejs_add_js_to_page_check()) {
$_SESSION['nodejs_config'] = $nodejs_config = nodejs_get_config();
if (isset($nodejs_config['serviceKey'])) {
unset($nodejs_config['serviceKey']);
}
$socket_io_config = nodejs_get_socketio_js_config($nodejs_config);
drupal_add_js($socket_io_config['path'], array(
'type' => $socket_io_config['type'],
));
drupal_add_js(drupal_get_path('module', 'nodejs') . '/nodejs.js', array(
'type' => 'file',
));
drupal_add_js(array(
'nodejs' => $nodejs_config,
), array(
'type' => 'setting',
));
foreach (nodejs_get_js_handlers() as $handler_file) {
drupal_add_js($handler_file, array(
'type' => 'file',
));
}
}
}
function nodejs_add_js_to_page_check() {
global $user;
$disabled_pages = variable_get('nodejs_disabled_pages', 1);
$valid_page = drupal_match_path(drupal_get_path_alias(), variable_get('nodejs_pages', '*'));
if (variable_get('nodejs_authenticated_users_only', FALSE)) {
$valid_user = $user->uid > 0;
}
else {
$valid_user = TRUE;
}
return $disabled_pages ? $valid_page && $valid_user : !$valid_page && $valid_user;
}
function nodejs_get_socketio_js_config($nodejs_config) {
$socket_io_config = array(
'path' => variable_get('nodejs_socket_io_path', FALSE),
'type' => variable_get('nodejs_socket_io_type', 'internal'),
);
if ($socket_io_config['type'] === 'internal' || empty($socket_io_config['path'])) {
$socket_io_config['path'] = $nodejs_config['client']['scheme'] . '://' . $nodejs_config['client']['host'] . ':' . $nodejs_config['client']['port'] . $nodejs_config['resource'] . '/socket.io.js';
}
return $socket_io_config;
}
function nodejs_get_js_handlers() {
$handlers = module_invoke_all('nodejs_handlers_info');
drupal_alter('nodejs_js_handlers', $handlers);
return $handlers;
}
function nodejs_menu() {
return array(
'admin/config/nodejs' => array(
'title' => 'Nodejs',
'description' => 'Configure nodejs module.',
'position' => 'left',
'weight' => -20,
'page callback' => 'system_admin_menu_block_page',
'access arguments' => array(
'access administration pages',
),
'file' => 'system.admin.inc',
'file path' => drupal_get_path('module', 'system'),
),
'admin/config/nodejs/config' => array(
'title' => 'Configuration',
'description' => 'Adjust node.js settings.',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'nodejs_settings',
),
'access arguments' => array(
'administer site configuration',
),
'file' => 'nodejs.admin.inc',
),
'nodejs/message' => array(
'title' => 'Message from Node.js server',
'page callback' => 'nodejs_message_handler',
'access callback' => TRUE,
'type' => MENU_CALLBACK,
),
'nodejs/user/channel/add' => array(
'title' => 'Add a channel to the Node.js server',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'nodejs_add_user_to_channel_form',
),
'access arguments' => array(
'add users to nodejs channels',
),
'type' => MENU_CALLBACK,
),
);
}
function nodejs_get_add_channel($channel) {
if ($node_response = Nodejs::addChannel($channel)) {
if ($node_response->status == 'success') {
return TRUE;
}
else {
watchdog('nodejs', 'Error adding channel to the Node.js server. Server response: %error', array(
'%error' => $node_response->error,
));
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_get_check_channel($channel) {
if ($node_response = Nodejs::checkChannel($channel)) {
if ($node_response->status == 'success') {
return $node_response->result;
}
else {
watchdog('nodejs', 'Error checking channel on the Node.js server. Server response: %error', array(
'%error' => $node_response->error,
));
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_get_remove_channel($channel) {
if ($node_response = Nodejs::removeChannel($channel)) {
if ($node_response->status == 'success') {
return TRUE;
}
else {
watchdog('nodejs', 'Error removing channel from the Node.js server. Server response: %error', array(
'%error' => $node_response->error,
));
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_add_user_to_channel_form($form, $form_state) {
$form = array();
$form['nodejs_uid'] = array(
'#type' => 'textfield',
'#description' => t('The user uid to add to a channel.'),
'#title' => t('User uid to add'),
);
$form['nodejs_channel'] = array(
'#type' => 'textfield',
'#description' => t('The name of the channel to give a user access to.'),
'#title' => t('Channel to add'),
);
$form['nodejs_submit'] = array(
'#type' => 'submit',
'#value' => t('Add user'),
);
return $form;
}
function nodejs_add_user_to_channel_form_submit($form, &$form_state) {
$values = (object) $form_state['values'];
if (nodejs_add_user_to_channel($values->nodejs_uid, $values->nodejs_channel)) {
drupal_set_message(t("Added uid %uid to %channel.", array(
'%uid' => $values->nodejs_uid,
'%channel' => $values->nodejs_channel,
)));
}
else {
drupal_set_message(t("Failed to add uid %uid to %channel.", array(
'%uid' => ${$values}->nodejs_uid,
'%channel' => $values->nodejs_channel,
)), 'error');
}
}
function nodejs_add_user_to_channel_form_validate($form, &$form_state) {
$values = (object) $form_state['values'];
if (!preg_match('/^\\d+$/', $values->nodejs_uid)) {
form_set_error('nodejs_uid', t('Invalid uid - please enter a numeric uid.'));
}
if (!preg_match('/^([a-z0-9_]+)$/i', $values->nodejs_channel)) {
form_set_error('nodejs_channel', t('Invalid channel name - only numbers, letters and underscores are allowed.'));
}
}
function nodejs_enqueue_message(StdClass $message) {
$message->broadcast = isset($message->broadcast) ? $message->broadcast : FALSE;
Nodejs::enqueueMessage($message);
}
function nodejs_send_message(StdClass $message) {
$message->broadcast = isset($message->broadcast) ? $message->broadcast : FALSE;
return Nodejs::sendMessage($message);
}
function nodejs_nodejs_user_channels($account) {
if (variable_get('nodejs_enable_userchannel', TRUE) && $account->uid) {
return array(
'nodejs_user_' . $account->uid,
);
}
return array();
}
function nodejs_user_logout($account) {
if (isset($_SESSION['nodejs_config'])) {
nodejs_logout_user($_SESSION['nodejs_config']['authToken']);
}
}
function nodejs_is_valid_service_key($service_key) {
return $service_key == variable_get('nodejs_service_key', '');
}
function nodejs_message_handler() {
if (!isset($_POST['serviceKey']) || !nodejs_is_valid_service_key($_POST['serviceKey'])) {
drupal_json_output(array(
'error' => 'Invalid service key.',
));
drupal_exit();
}
if (!isset($_POST['messageJson'])) {
drupal_json_output(array(
'error' => 'No message.',
));
drupal_exit();
}
$message = drupal_json_decode($_POST['messageJson']);
$response = array();
switch ($message['messageType']) {
case 'authenticate':
$response = nodejs_auth_check($message);
break;
case 'userOffline':
if (empty($message['uid'])) {
$response['error'] = 'Missing uid for userOffline message.';
}
else {
if (!preg_match('/^\\d+$/', $message['uid'])) {
$response['error'] = 'Invalid (!/^\\d+$/) uid for userOffline message.';
}
else {
nodejs_user_set_offline($message['uid']);
$response['message'] = "User {$message['uid']} set offline.";
}
}
break;
default:
$handlers = array();
foreach (module_implements('nodejs_message_callback') as $module) {
$function = $module . '_nodejs_message_callback';
if (is_array($function($message['messageType']))) {
$handlers += $function($message['messageType']);
}
}
foreach ($handlers as $callback) {
$callback($message, $response);
}
}
drupal_alter('nodejs_message_response', $response, $message);
drupal_json_output($response ? $response : array(
'error' => 'Not implemented',
));
drupal_exit();
}
function nodejs_auth_check($message) {
$nodejs_auth_check_callback = variable_get('nodejs_auth_check_callback', 'nodejs_auth_check_callback');
if (!function_exists($nodejs_auth_check_callback)) {
throw new Exception("No nodejs_auth_check callback found - looked for '{$nodejs_auth_check_callback}'.");
}
$uid = $nodejs_auth_check_callback($message['authToken']);
$auth_user = $uid > 0 ? user_load($uid) : drupal_anonymous_user();
$auth_user->authToken = $message['authToken'];
$auth_user->nodejsValidAuthToken = $uid !== FALSE;
$auth_user->clientId = $message['clientId'];
if ($auth_user->nodejsValidAuthToken) {
$auth_user->channels = array();
foreach (module_implements('nodejs_user_channels') as $module) {
$function = $module . '_nodejs_user_channels';
foreach ($function($auth_user) as $channel) {
$auth_user->channels[] = $channel;
}
}
$auth_user->presenceUids = array_unique(module_invoke_all('nodejs_user_presence_list', $auth_user));
$nodejs_config = nodejs_get_config();
$auth_user->serviceKey = $nodejs_config['serviceKey'];
drupal_add_http_header('NodejsServiceKey', $nodejs_config['serviceKey']);
drupal_alter('nodejs_auth_user', $auth_user);
if ($auth_user->uid) {
nodejs_user_set_online($auth_user->uid);
}
$auth_user->contentTokens = isset($message['contentTokens']) ? $message['contentTokens'] : array();
}
return $auth_user;
}
function nodejs_auth_check_callback($auth_token) {
$sql = "SELECT uid FROM {sessions} WHERE MD5(sid) = :auth_key OR MD5(ssid) = :auth_key";
return db_query($sql, array(
':auth_key' => $auth_token,
))
->fetchField();
}
function nodejs_auth_get_token($account) {
$nodejs_auth_get_token_callback = variable_get('nodejs_auth_get_token_callback', 'nodejs_auth_get_token_callback');
if (!function_exists($nodejs_auth_get_token_callback)) {
throw new Exception("Cannot proceed without a valid nodejs_auth_get_token callback - looked for '{$nodejs_auth_get_token_callback}'.");
}
return $nodejs_auth_get_token_callback($account);
}
function nodejs_auth_get_token_callback($account) {
drupal_session_start();
return md5(session_id());
}
function nodejs_user_set_online($uid) {
try {
db_query('INSERT INTO {nodejs_presence} (uid, login_time) VALUES (:uid, :login_time)', array(
':uid' => $uid,
':login_time' => time(),
));
} catch (Exception $e) {
}
}
function nodejs_user_set_offline($uid) {
try {
db_query('DELETE FROM {nodejs_presence} WHERE uid = :uid', array(
':uid' => $uid,
));
} catch (Exception $e) {
}
}
function nodejs_get_config() {
global $user;
$defaults = array(
'nodejs' => array(
'scheme' => variable_get('nodejs_server_scheme', 'http'),
'secure' => variable_get('nodejs_server_scheme', 'http') == 'https' ? 1 : 0,
'host' => variable_get('nodejs_server_host', 'localhost'),
'port' => variable_get('nodejs_server_port', '8080'),
),
'client' => array(
'scheme' => variable_get('nodejs_client_js_scheme', variable_get('nodejs_server_scheme', 'http')),
'secure' => variable_get('nodejs_client_js_scheme', variable_get('nodejs_server_scheme', 'http')) == 'https' ? 1 : 0,
'host' => variable_get('nodejs_client_js_host', variable_get('nodejs_server_host', 'localhost')),
'port' => variable_get('nodejs_client_js_port', variable_get('nodejs_server_port', '8080')),
),
'resource' => variable_get('nodejs_config_resource', '/socket.io'),
'authToken' => nodejs_auth_get_token($user),
'serviceKey' => variable_get('nodejs_service_key', ''),
'websocketSwfLocation' => base_path() . drupal_get_path('module', 'nodejs') . '/socket_io/socket.io/support/socket.io-client/lib/vendor/web-socket-js/WebSocketMain.swf',
'log_http_errors' => variable_get('nodejs_log_http_errors', TRUE),
);
return variable_get('nodejs_config', array()) + $defaults;
}
function nodejs_get_url($config, $callback = '') {
return $config['nodejs']['scheme'] . '://' . $config['nodejs']['host'] . ':' . $config['nodejs']['port'] . '/' . $callback;
}
function nodejs_remove_user_from_channel($uid, $channel) {
if ($node_response = Nodejs::removeUserFromChannel($uid, $channel)) {
if ($node_response->status == 'success') {
return TRUE;
}
else {
$params = array(
'%uid' => $uid,
'%channel' => $channel,
'%error' => $node_response->error,
);
watchdog('nodejs', 'Error removing user with uid: %uid from channel %channel on the Node.js server. Server response: %error', $params);
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_add_user_to_channel($uid, $channel) {
if ($node_response = Nodejs::addUserToChannel($uid, $channel)) {
if ($node_response->status == 'success') {
return TRUE;
}
else {
$params = array(
'%uid' => $uid,
'%channel' => $channel,
'%error' => $node_response->error,
);
watchdog('nodejs', 'Error adding user with uid: %uid to channel %channel on the Node.js server. Server response: %error', $params);
return FALSE;
}
}
else {
return FALSE;
}
}
function nodejs_get_client_socket_id() {
$client_socket_id = isset($_POST['nodejs_client_socket_id']) ? $_POST['nodejs_client_socket_id'] : '';
return preg_match('/^[0-9a-z_-]+$/i', $client_socket_id) ? $client_socket_id : '';
}
function nodejs_server_has_update() {
$response = drupal_http_request(NODEJS_SERVER_NPM_URL);
if (isset($response->error)) {
return FALSE;
}
$npm_data = json_decode($response->data);
$server_version = Nodejs::getServerVersion();
if ($server_version && isset($npm_data->{'dist-tags'}->latest) && $npm_data->{'dist-tags'}->latest != $server_version) {
return TRUE;
}
return FALSE;
}
class Nodejs {
public static $messages = array();
public static $config = NULL;
public static $baseUrl = NULL;
public static $nodeServerVersion = NULL;
public static function initConfig() {
if (!isset(self::$config)) {
self::$config = nodejs_get_config();
self::$baseUrl = nodejs_get_url(self::$config);
}
}
public static function getMessages() {
return self::$messages;
}
public static function enqueueMessage(StdClass $message) {
self::$messages[] = $message;
}
public static function sendMessages() {
foreach (self::$messages as $message) {
self::sendMessage($message);
}
}
public static function getServerVersion() {
if (isset(self::$nodeServerVersion)) {
return self::$nodeServerVersion;
}
$data = self::healthCheck();
if (!isset($data->version)) {
self::$nodeServerVersion = FALSE;
}
else {
self::$nodeServerVersion = $data->version;
}
return self::$nodeServerVersion;
}
public static function checkServerVersion() {
$server_version = self::getServerVersion();
if (!$server_version) {
return FALSE;
}
$current_parts = explode('.', $server_version);
$required_parts = explode('.', NODEJS_SERVER_VERSION);
$current_major = reset($current_parts);
$required_major = reset($required_parts);
if ($current_major != $required_major || version_compare($server_version, NODEJS_SERVER_VERSION) < 0) {
return FALSE;
}
return TRUE;
}
public static function sendMessage(StdClass $message) {
self::initConfig();
drupal_alter('nodejs_message', $message);
$message->clientSocketId = nodejs_get_client_socket_id();
$options = array(
'method' => 'POST',
'data' => drupal_json_encode($message),
'timeout' => !empty(self::$config['timeout']) ? self::$config['timeout'] : 5,
);
return self::httpRequest('nodejs/publish', $options);
}
public static function healthCheck() {
$data = self::httpRequest('nodejs/health/check', array(), TRUE);
return $data;
}
public static function setUserPresenceList($uid, array $uids) {
return self::httpRequest("nodejs/user/presence-list/{$uid}/" . implode(',', $uids));
}
public static function logoutUser($token) {
$options = array(
'method' => 'POST',
);
return self::httpRequest("nodejs/user/logout/{$token}", $options);
}
public static function sendContentTokenMessage($message) {
$message->clientSocketId = nodejs_get_client_socket_id();
drupal_alter('nodejs_content_channel_message', $message);
$options = array(
'method' => 'POST',
'data' => drupal_json_encode($message),
'options' => array(
'timeout' => 5.0,
),
);
return self::httpRequest('nodejs/content/token/message', $options);
}
public static function sendContentToken($message) {
$options = array(
'method' => 'POST',
'data' => drupal_json_encode($message),
);
return self::httpRequest('nodejs/content/token', $options);
}
public static function getContentTokenUsers($message) {
$options = array(
'method' => 'POST',
'data' => drupal_json_encode($message),
);
return self::httpRequest('nodejs/content/token/users', $options);
}
public static function kickUser($uid) {
$options = array(
'method' => 'POST',
);
return self::httpRequest("nodejs/user/kick/{$uid}", $options);
}
public static function addUserToChannel($uid, $channel) {
$options = array(
'method' => 'POST',
);
return self::httpRequest("nodejs/user/channel/add/{$channel}/{$uid}", $options);
}
public static function removeUserFromChannel($uid, $channel) {
$options = array(
'method' => 'POST',
);
return self::httpRequest("nodejs/user/channel/remove/{$channel}/{$uid}", $options);
}
public static function addChannel($channel) {
$options = array(
'method' => 'POST',
);
return self::httpRequest("nodejs/channel/add/{$channel}", $options);
}
public static function checkChannel($channel) {
return self::httpRequest("nodejs/channel/check/{$channel}");
}
public static function removeChannel($channel) {
$options = array(
'method' => 'POST',
);
return self::httpRequest("nodejs/channel/remove/{$channel}", $options);
}
public static function httpRequest($url, $options = array(), $ignore_version = FALSE) {
self::initConfig();
if (!$ignore_version && !self::checkServerVersion()) {
return FALSE;
}
$options += array(
'method' => 'GET',
'headers' => array(),
);
$options['headers'] += array(
'NodejsServiceKey' => self::$config['serviceKey'],
'Content-type' => 'application/json',
);
$response = drupal_http_request(self::$baseUrl . $url, $options);
if (isset($response->error)) {
if (self::$config['log_http_errors']) {
$params = array(
'%code' => $response->code,
'%error' => $response->error,
'%url' => $url,
);
$log_message = 'Error reaching the Node.js server at "%url": [%code] %error.';
if (!empty($options['data'])) {
$params['data'] = $options['data'];
$log_message = 'Error reaching the Node.js server at "%url" with data "%data": [%code] %error.';
}
watchdog('nodejs', $log_message, $params);
}
return FALSE;
}
return json_decode($response->data);
}
}