View source
<?php
define('NOTIFICATIONS_STEP_ROWS', 1000);
define('NOTIFICATIONS_STEP_USERS', 1000);
define('NOTIFICATIONS_TIME_MARGIN', 5);
function notifications_process_run($cron = TRUE) {
notifications_log('Starting queue process');
notifications_process('start');
if ($cron) {
notifications_process('cron');
}
$stop = FALSE;
$send_intervals = _notifications_send_intervals();
unset($send_intervals[-1]);
$max_sqid = notifications_process_prepare();
foreach ($send_intervals as $interval => $name) {
notifications_log('Processing send interval ' . $name);
while (notifications_process_queue($interval, $max_sqid)) {
$stop = !notifications_process('check');
}
if ($stop) {
notifications_log('Process stopped');
break;
}
}
}
function notifications_process_pull($method, $users, $limit = 0, $delete = FALSE) {
$messages = array();
$maxsqid = 0;
$sql = "SELECT uid, eid, module, MIN(sid) AS sid, MAX(sqid) AS sqid FROM {notifications_queue} ";
$sql .= "WHERE send_method = '%s' AND uid IN (%s) GROUP BY uid, eid, module ORDER BY sqid";
$str_uids = implode(',', $users);
if ($limit) {
$result = db_query_range($sql, $method, $str_uids, 0, $limit);
}
else {
$result = db_query($sql, $method, $str_uids);
}
while ($queue = db_fetch_object($result)) {
$maxsqid = $queue->squid;
$account = notifications_callback($queue->module, 'load_user', $queue->uid);
$event = notifications_load_event($queue->eid);
if (notifications_callback($queue->module, 'user_allowed', 'event', $account, $event)) {
$subscriptions = array(
$queue->sid,
);
$message = notifications_callback($queue->module, 'process_message', $account, $event, $subscriptions, $method);
$message['uid'] = $queue->uid;
$message['from'] = $event->uid;
$messages[] = $message;
}
}
if ($messages && $delete) {
db_query("DELETE FROM {notifications_queue} WHERE sqid < %d AND send_method = '%s' AND uid IN (%s)", $maxsqid, $method, $str_uids);
}
return $messages;
}
function notifications_process_prepare($module = 'notifications') {
$expiretime = time() - 60;
db_query("DELETE FROM {notifications_event} WHERE created < %d AND eid < (SELECT MIN(eid) FROM {notifications_queue})", $expiretime);
return db_result(db_query("SELECT max(sqid) FROM {notifications_queue}"));
}
function notifications_process($op = 'check', $name = NULL, $value = NULL) {
static $limit = array(), $options = array();
static $current = array(
'message' => 0,
'step' => 0,
);
switch ($op) {
case 'start':
$defaults = variable_get('notifications_process_limit', array(
'time' => 0,
'message' => 0,
'row' => 0,
'percent' => 0,
));
foreach ($defaults as $name => $value) {
if ($value && !isset($limit[$name])) {
$limit[$name] = $value;
}
}
break;
case 'cron':
$timelimit = array();
$cronstart = variable_get('cron_semaphore', time());
if ($maxtime = ini_get('max_execution_time')) {
$timelimit[] = $cronstart + $maxtime - NOTIFICATIONS_TIME_MARGIN;
if ($limit['percent']) {
$timelimit[] = time() + $maxtime * $limit['percent'] / 100;
unset($limit['percent']);
}
}
if ($limit['time']) {
$timelimit[] = time() + $limit['time'];
}
if ($timelimit) {
$limit['time'] = min($timelimit);
}
break;
case 'init':
$current[$name] = 0;
$limit[$name] = $value;
break;
case 'count':
$value = $value ? $value : 1;
isset($current[$name]) ? $current[$name] += $value : ($current[$name] = $value);
break;
case 'option':
if (isset($value)) {
$options[$name] = $value;
}
return $options[$name];
}
$current['time'] = time();
foreach ($limit as $name => $value) {
if ($value && !empty($current[$name]) && $current[$name] >= $value) {
watchdog('notifications', t('Reached processing limit on queue processing: %name = %value', array(
'%name' => $name,
'%value' => $value,
)));
return FALSE;
}
}
return TRUE;
}
function notifications_process_rows($conditions) {
$account = NULL;
$subscriptions = $events = $processed = array();
$send_method = $send_interval = NULL;
$query = notifications_queue_query($conditions);
$sql = "SELECT * FROM {notifications_queue} ";
$sql .= " WHERE " . implode(' AND ', $query['where']);
$sql .= " ORDER BY uid, send_method, send_interval";
$result = db_query($sql, $query['args']);
while (($queue = db_fetch_object($result)) || $processed) {
if (!$account || !$queue || $queue->uid != $account->uid || $queue->send_method != $send_method || $queue->send_interval != $send_interval) {
if ($account && $events && $subscriptions) {
notifications_process_send($account, $events, $subscriptions, $send_method, $send_interval);
notifications_update_sent($account->uid, $send_method, $send_interval, time());
}
if ($processed) {
notifications_queue_delete(array(
'sqids' => $processed,
));
}
$subscriptions = $events = $processed = array();
if ($queue) {
$account = notifications_load_user($queue->uid);
$send_method = $queue->send_method;
$send_interval = $queue->send_interval;
}
}
if ($queue) {
$event = notifications_load_event($queue->eid);
if (notifications_user_allowed('event', $account, $event)) {
$events[$queue->eid] = $event;
$subscriptions[$queue->eid][] = $queue->sid;
}
$processed[] = $queue->sqid;
}
}
}
function notifications_process_queue($send_interval, $max_sqid, $module = 'notifications') {
$count = 0;
$timelimit = time() - $send_interval;
$sql = "SELECT q.uid, q.send_method, count(*) AS count FROM {notifications_queue} q ";
$sql .= " LEFT JOIN {notifications_sent} su ON q.uid = su.uid AND q.send_interval = su.send_interval AND q.send_method = su.send_method ";
$sql .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.sqid <= %d AND q.module = '%s'";
$sql .= " AND (su.uid IS NULL OR su.sent < %d) ";
$sql .= " GROUP BY q.uid, q.send_method, su.sent ORDER BY su.sent";
$result = db_query_range($sql, $send_interval, $max_sqid, $module, $timelimit, 0, NOTIFICATIONS_STEP_USERS);
$sqid = 0;
while (($user = db_fetch_object($result)) && notifications_process('check')) {
notifications_log("Processing user {$user->uid}, rows {$user->count}, send_method {$user->send_method}");
$events = $subscriptions = $processed = array();
$send_method = $user->send_method;
$account = notifications_callback($module, 'load_user', $user->uid);
$result_subs = db_query_range("SELECT * FROM {notifications_queue} WHERE cron = 1 AND send_interval = '%d' AND uid = %d AND sqid <= %d ORDER BY send_method, sqid", $send_interval, $account->uid, $max_sqid, 0, NOTIFICATIONS_STEP_ROWS);
while (($queue = db_fetch_object($result_subs)) && notifications_process('count', 'row')) {
$count++;
$processed[] = $sqid = $queue->sqid;
$event = notifications_load_event($queue->eid, TRUE);
if (notifications_callback($module, 'user_allowed', 'event', $account, $event)) {
$events[$queue->eid] = $event;
$subscriptions[$queue->eid][] = $queue->sid;
notifications_log("Processing queued sid={$queue->sid} event={$queue->eid} ({$event->type}, {$event->action}) send_method={$send_method}");
}
else {
notifications_log("Access denied for queued event sid={$queue->sid} event={$queue->eid} ({$event->type}, {$event->action})");
}
}
if ($events) {
notifications_callback($module, 'process_send', $account, $events, $subscriptions, $send_method, $send_interval);
notifications_callback($module, 'update_sent', $user->uid, $send_method, $send_interval, time());
}
if ($processed && !notifications_process('option', 'debug')) {
notifications_queue_delete(array(
'uid' => $user->uid,
'send_interval' => $send_interval,
'send_method' => $send_method,
'max_sqid' => $sqid,
));
}
}
return $count;
}
function notifications_update_sent($uid, $method, $interval, $time) {
db_query("UPDATE {notifications_sent} SET sent = %d WHERE uid = %d AND send_interval = '%d' AND send_method = '%s'", $time, $uid, $interval, $method);
if (!db_affected_rows()) {
db_query("INSERT INTO {notifications_sent}(uid, send_interval, send_method, sent) VALUES(%d, '%d', '%s', %d)", $uid, $interval, $method, $time);
}
}
function notifications_process_send($account, $events, $subscriptions, $send_method, $send_interval) {
notifications_log("Sending out, method={$send_method}, interval={$send_interval}, events=" . count($events));
if ($digest = notifications_digest_method($send_interval)) {
$function = $digest['digest callback'];
$messages = $function($account, $events, $subscriptions, $send_interval, $send_method);
}
else {
$sender_option = variable_get('notifications_sender', 0);
foreach ($events as $event) {
$message = notifications_process_message($account, $event, $subscriptions[$event->eid], $send_method);
$message['notifications'] = array(
'events' => array(
$event,
),
'subscriptions' => $subscriptions,
);
if ($sender_option) {
$sender = notifications_load_user($event->uid);
$message['sender_name'] = $sender->name;
if ($sender_option == 2) {
$message['sender_account'] = $sender;
}
}
$messages[] = $message;
}
}
foreach ($messages as $message) {
notifications_process('count', 'send');
notifications_message_send($account, $message, $send_method);
}
return $messages;
}
function notifications_process_digest_short($account, $events, $subscriptions, $send_interval, $send_method) {
$list = array();
foreach ($events as $event) {
notifications_log($event, 'digesting');
$sid = is_array($subscriptions[$event->eid]) ? array_shift($subscriptions[$event->eid]) : 0;
$subscription = $sid ? notifications_load_subscription($sid) : NULL;
$objects = $event->objects + array(
'user' => $account,
'subscription' => $subscription,
);
$digest = nofitications_digest_event_info($event);
$digest_type = $digest['type'];
$digest_value = $digest['value'];
if (!isset($list[$digest_type][$digest_value]['group'])) {
$group = array(
'title' => notifications_digest_group($digest, 'title', $send_method),
'footer' => notifications_digest_group($digest, 'footer', $send_method),
);
$list[$digest_type][$digest_value]['group'] = notifications_text_replace($group, $objects);
notifications_log("Digesting object (type={$digest_type} value={$digest_value})");
}
if (!isset($list[$digest_type][$digest_value]['line'][$event->eid])) {
$line = notifications_digest_line($event, $send_method, $objects);
$list[$digest_type][$digest_value]['line'][$event->eid] = notifications_text_replace($line, $event->objects);
}
}
$text['subject'] = notifications_message_part('digest', 'subject', $send_method);
$text['header'] = notifications_message_part('digest', 'header', $send_method);
$text['footer'] = notifications_message_part('digest', 'footer', $send_method);
$text = notifications_text_replace($text, array(
'user' => $account,
'subscription' => NULL,
));
$body = theme('notifications_digest_short_body', $text, $list);
$message = array(
'subject' => $text['subject'],
'body' => $body,
'events' => $events,
'subscriptions' => $subscriptions,
'digest' => 'short',
);
return array(
$message,
);
}
function notifications_process_digest_long($account, $events, $subscriptions, $send_interval, $send_method) {
$body = array();
foreach ($events as $event) {
notifications_log($event, 'digesting');
$part = notifications_message_part('event', 'main', $send_method, $event);
$sid = is_array($subscriptions[$event->eid]) ? array_shift($subscriptions[$event->eid]) : 0;
$subscription = $sid ? notifications_load_subscription($sid) : NULL;
$objects = $event->objects + array(
'user' => $account,
'subscription' => $subscription,
);
$body[] = notifications_text_replace($part, $objects);
}
$text['subject'] = notifications_message_part('digest', 'subject', $send_method);
$text['header'] = notifications_message_part('digest', 'header', $send_method);
$text['footer'] = notifications_message_part('digest', 'footer', $send_method);
$text = notifications_text_replace($text, array(
'user' => $account,
'subscription' => NULL,
));
$body = theme('notifications_digest_long_body', $text['header'], $body, $text['footer']);
$message = array(
'subject' => $text['subject'],
'body' => $body,
'events' => $events,
'subscriptions' => $subscriptions,
'digest' => 'long',
);
return array(
$message,
);
}
function nofitications_digest_event_info($event) {
$info = notifications_event_types($event->type, $event->action);
if (!empty($info['digest'])) {
$type = $info['digest'][0];
$field = $info['digest'][1];
$object = !empty($event->objects[$type]) ? $event->objects[$type] : NULL;
$value = $object && isset($object->{$field}) ? $object->{$field} : 0;
}
else {
$type = $event->type;
$field = $event->action;
$value = 0;
$object = NULL;
}
return array(
'type' => $type,
'field' => $field,
'value' => $value,
'object' => $object,
);
}
function notifications_process_message($account, $event, $subscriptions, $send_method) {
$info = notifications_event_text($event);
$text = array(
'subject' => notifications_message_part('event', 'subject', $send_method, $event),
'header' => notifications_message_part('event', 'header', $send_method, $event),
'event' => notifications_message_part('event', 'main', $send_method, $event),
'footer' => notifications_message_part('event', 'footer', $send_method, $event),
);
if ($sid = array_shift($subscriptions)) {
$subscription = notifications_load_subscription($sid);
}
else {
$subscription = NULL;
}
$objects = array(
'user' => $account,
'event' => $event,
'subscription' => $subscription,
);
$objects = array_merge($objects, $event->objects);
$text = notifications_text_replace($text, $objects);
$subject = $text['subject'];
unset($text['subject']);
return array(
'subject' => $subject,
'body' => $text,
);
}
function notifications_message_part($type, $key, $method, $param = NULL) {
if ($type == 'event' && is_object($param)) {
if (isset($param->text[$key])) {
return $param->text[$key];
}
else {
$options = array(
$param->type,
$param->action,
);
}
}
elseif ($method == 'test') {
return "{$type} {$key} [type-name] [title] [site-name]";
}
else {
$options = is_array($param) ? $param : array();
}
$keyparts = array_merge(array(
'notifications-' . $type,
), $options);
$output = "[UNDEFINED type = {$type}, method = {$method}, key = " . implode('-', $keyparts) . ']';
while ($keyparts) {
$groupkey = implode('-', $keyparts);
if ($text = messaging_message_part($groupkey, $key, $method)) {
$output = $text == MESSAGING_EMPTY ? '' : $text;
break;
}
array_pop($keyparts);
}
return $output;
}
function notifications_text_replace($text, $objects) {
$objects['global'] = NULL;
return token_replace_multiple($text, $objects);
}
function notifications_message_send($account, $message, $send_method) {
$message['type'] = 'notifications';
notifications_process('count', 'message');
messaging_message_send_user($account, $message, $send_method);
}
function notifications_event_text($event) {
$info = notifications_event_types($event->type, $event->action);
return $info;
}
function notifications_load_user($uid) {
return messaging_load_user($uid);
}
function notifications_load_event($id) {
static $cache = array();
if (!array_key_exists($id, $cache)) {
$event = db_fetch_object(db_query("SELECT * FROM {notifications_event} WHERE eid = %d", $id));
$event->params = unserialize($event->params);
$event->objects = array();
notifications_module_invoke('event load', $event);
$cache[$id] = $event;
}
return $cache[$id];
}
function notifications_queue_delete($params) {
$query = notifications_queue_query($params);
db_query("DELETE FROM {notifications_queue} WHERE " . implode(' AND ', $query['where']), $query['args']);
}
function notifications_queue_query($params) {
$where = $args = array();
foreach ($params as $field => $value) {
switch ($field) {
case 'max_sqid':
$where[] = "sqid <= %d";
$args[] = $value;
break;
case 'sqids':
$where[] = "sqid IN (%s)";
$args[] = implode(',', array_map('db_escape_string', $value));
break;
default:
$where[] = "{$field} = '%s'";
$args[] = $value;
break;
}
}
return array(
'where' => $where,
'args' => $args,
);
}
function notifications_log($info = NULL, $type = 'info') {
static $logs;
if ($info) {
$message = $type . ': ';
$message .= is_string($info) ? $info : print_r($info, TRUE);
$logs[] = $message;
if ($type == 'watchdog') {
watchdog('notifications', $info);
}
}
else {
return $logs;
}
}
function theme_notifications_digest_short_body($text, $list) {
$body['header'] = $text['header'];
foreach ($list as $type => $objects) {
foreach ($objects as $oid => $data) {
$body['content'][] = $data['group']['title'];
foreach ($data['line'] as $line) {
$body['content'][] = theme('notifications_digest_short_line', $line, $data['group']);
}
$body['content'][] = $data['group']['footer'];
}
}
$body['footer'] = $text['footer'];
return $body;
}
function theme_notifications_digest_short_line($line, $group) {
return '- ' . $line;
}
function theme_notifications_digest_long_body($header, $content, $footer) {
return array(
'header' => $header,
'content' => $content,
'footer' => $footer,
);
}
function notifications_digest_group($digest, $part, $method) {
static $texts = array();
$type = $digest['type'];
$value = $digest['value'];
if (!isset($texts[$type][$value][$part][$method])) {
if ($line = notifications_message_part('digest', $part, $method, array(
$type,
$digest['field'],
))) {
$output = $line;
}
else {
$output = '';
}
$texts[$type][$value][$part][$method] = $output;
}
return $texts[$type][$value][$part][$method];
}
function notifications_digest_line($event, $method) {
static $digest = array();
if (!isset($digest[$event->eid][$method])) {
if (!empty($event->text['digest'])) {
$line = $event->text['digest'];
}
elseif ($part = notifications_message_part('event', 'digest', $method, $event)) {
$line = $part;
}
else {
$info = notifications_event_types($event->type, $event->action);
$line = $info['line'];
}
$digest[$event->eid][$method] = $line;
}
return $digest[$event->eid][$method];
}