View source
<?php
define('NOTIFICATIONS_STEP_ROWS', 1000);
define('NOTIFICATIONS_STEP_USERS', 1000);
define('NOTIFICATIONS_TIME_MARGIN', 5);
function notifications_process_run($cron = TRUE) {
notifications_log('Starting notifications process');
notifications_process('start');
if ($cron) {
notifications_process('cron');
}
$stop = FALSE;
$send_intervals = _notifications_send_intervals();
unset($send_intervals[-1]);
if ($max_sqid = notifications_process_prepare()) {
foreach ($send_intervals as $interval => $name) {
notifications_log('Processing queue', array(
'send interval' => $name,
));
while (notifications_process_queue($interval, $max_sqid)) {
$stop = !notifications_process('check');
}
if ($stop) {
notifications_log('Process stopped, reached processing limits');
break;
}
else {
notifications_log('Process finished', array(
'send interval' => $name,
));
}
}
}
else {
notifications_log('No rows in queue');
}
}
function notifications_process_prepare() {
if ($keep = variable_get('notifications_log', 0)) {
db_query("DELETE FROM {notifications_queue} WHERE cron = 0 AND sent < %d", time() - $keep);
}
notifications_event_clean();
return db_result(db_query("SELECT max(sqid) FROM {notifications_queue}"));
}
function notifications_event_clean($update = FALSE) {
$expiretime = time() - 60;
if ($update) {
db_query("UPDATE {notifications_event} e SET counter = (SELECT COUNT(*) FROM {notifications_queue} q WHERE q.eid = e.eid ) WHERE e.created < %d", $expiretime);
}
db_query("DELETE FROM {notifications_event} WHERE counter = 0 AND created < %d", $expiretime);
db_query("DELETE FROM {notifications_event} WHERE created < %d AND eid < (SELECT MIN(eid) FROM {notifications_queue})", $expiretime);
}
function notifications_process($op = 'check', $name = NULL, $value = NULL) {
static $limit = array(), $options = array();
static $current = array(
'message' => 0,
'step' => 0,
);
switch ($op) {
case 'start':
$defaults = variable_get('notifications_process_limit', array(
'time' => 0,
'message' => 0,
'row' => 0,
'percent' => 0,
));
foreach ($defaults as $name => $value) {
if ($value && !isset($limit[$name])) {
$limit[$name] = $value;
}
}
break;
case 'cron':
$timelimit = array();
$cronstart = variable_get('cron_semaphore', time());
if ($maxtime = ini_get('max_execution_time')) {
$timelimit[] = $cronstart + $maxtime - NOTIFICATIONS_TIME_MARGIN;
if (!empty($limit['percent'])) {
$timelimit[] = time() + $maxtime * $limit['percent'] / 100;
unset($limit['percent']);
}
}
if (!empty($limit['time'])) {
$timelimit[] = time() + $limit['time'];
}
if ($timelimit) {
$limit['time'] = min($timelimit);
}
break;
case 'init':
$current[$name] = 0;
$limit[$name] = $value;
break;
case 'option':
if (isset($value)) {
$options[$name] = $value;
}
return isset($options[$name]) ? $options[$name] : FALSE;
break;
case 'limit':
return isset($limit[$name]) ? $limit[$name] : 0;
case 'current':
return isset($current[$name]) ? $current[$name] : 0;
case 'count':
$value = $value ? $value : 1;
isset($current[$name]) ? $current[$name] += $value : ($current[$name] = $value);
break;
case 'check':
$current['time'] = time();
foreach ($limit as $name => $value) {
if ($value && !empty($current[$name]) && $current[$name] >= $value) {
watchdog('notifications', 'Reached processing limit on queue processing: %name = %value', array(
'%name' => $name,
'%value' => $value,
));
return FALSE;
}
}
return TRUE;
}
}
function notifications_process_rows($conditions, $limit = 0, $update = TRUE) {
notifications_log('Processing queue rows', $conditions);
$account = $destination = NULL;
$subscriptions = $events = $processed = array();
$send_method = $send_interval = $module = NULL;
$test = notifications_process('option', 'test');
$count = 0;
$query = notifications_queue_query($conditions);
$sql = "SELECT * FROM {notifications_queue} ";
$sql .= " WHERE " . implode(' AND ', $query['where']);
$sql .= " ORDER BY module, uid, destination, send_method, send_interval";
if ($limit) {
$result = db_query_range($sql, $query['args'], 0, $limit);
}
else {
$result = db_query($sql, $query['args']);
}
while (($queue = db_fetch_object($result)) || $processed) {
notifications_process('count', 'row');
if (!$account || !$queue || $queue->module != $module || $queue->uid != $account->uid || $queue->destination != $destination || $queue->send_method != $send_method || $queue->send_interval != $send_interval) {
if ($account && $events && $subscriptions) {
$messages = notifications_callback($module, 'process_compose', $account, $events, $subscriptions, $send_method, $send_interval);
notifications_log('Composed messages', array(
'number' => count($messages),
'send_method' => $send_method,
));
notifications_callback($module, 'process_send', $account, $messages, $send_method, $test);
if (!$test) {
notifications_update_sent($account, $send_method, $send_interval, time());
}
}
if ($processed && $update) {
notifications_queue_done(array(
'sqid' => $processed,
));
}
$subscriptions = $events = $processed = array();
if ($queue) {
$send_method = $queue->send_method;
$send_interval = $queue->send_interval;
$destination = $queue->destination;
$module = $queue->module;
$account = notifications_callback($module, 'load_user', $queue->uid, $destination, $send_method);
}
}
if ($queue) {
$count++;
$processed[] = $queue->sqid;
if ($event = notifications_load_event($queue->eid)) {
notifications_event_tracker('count', $event);
notifications_log('Processing queued', array(
'queue sqid' => $queue->sqid,
'event' => $queue->eid,
'type' => $event->type,
'action' => $event->action,
'send method' => $send_method,
));
if (notifications_user_allowed('event', $account, $event)) {
$events[$queue->eid] = $event;
$subscriptions[$queue->eid][] = $queue->sid;
}
else {
notifications_log('Access denied for event', array(
'account' => $user->uid,
'event' => $queue->eid,
));
}
}
else {
notifications_log('Cannot load event', array(
'eid' => $queue->eid,
'queue sid' => $queue->sid,
));
}
}
}
if ($update) {
notifications_event_tracker('update');
}
return $count;
}
function notifications_process_queue($send_interval, $max_sqid) {
notifications_log('Starting queue processing', array(
'send interval' => $send_interval,
'max squid' => $max_sqid,
));
$test = notifications_process('option', 'test');
$count = 0;
$timelimit = time() - $send_interval;
$step_users = NOTIFICATIONS_STEP_USERS;
$step_rows = NOTIFICATIONS_STEP_ROWS;
if ($row_limit = notifications_process('limit', 'row')) {
$remaining_rows = $row_limit - notifications_process('current', 'row');
if ($remaining_rows > 0) {
$step_users = min($remaining_rows, $step_users);
$step_rows = min($remaining_rows, $step_rows);
}
}
$sql = "SELECT q.uid, q.destination, q.module, q.send_method, count(*) AS count_rows FROM {notifications_queue} q ";
$sql .= " LEFT JOIN {notifications_sent} su ON q.uid = su.uid AND q.send_interval = su.send_interval AND q.send_method = su.send_method ";
$sql .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.sqid <= %d";
$sql .= " AND (su.uid IS NULL OR su.sent < %d) ";
$sql .= " GROUP BY q.uid, q.destination, q.module, q.send_method, su.sent ORDER BY su.sent";
$result = db_query_range($sql, $send_interval, $max_sqid, $timelimit, 0, $step_users);
while (($queue = db_fetch_object($result)) && notifications_process('check')) {
notifications_log('Queue processing', array(
'user' => $queue->uid,
'rows' => $queue->count_rows,
'send method' => $queue->send_method,
));
$module = $queue->module;
$events = $subscriptions = $processed = array();
if ($queue->count_rows > $step_rows) {
$limit = $step_rows;
$update = TRUE;
}
else {
$limit = $queue->count_rows;
$update = FALSE;
}
$batch = array(
'uid' => $queue->uid,
'destination' => $queue->destination,
'module' => $queue->module,
'send_method' => $queue->send_method,
'send_interval' => $send_interval,
'cron' => 1,
'max_sqid' => $max_sqid,
);
$processed = notifications_callback($queue->module, 'process_rows', $batch, $limit, $update);
$count += $processed;
if ($processed && !$test && !$update) {
notifications_queue_done($batch);
}
}
if (!$test) {
notifications_event_tracker('update');
return $count;
}
else {
return 0;
}
}
function notifications_event_tracker($op, $event = NULL) {
static $events = array();
switch ($op) {
case 'count':
$events[$event->eid] = array_key_exists($event->eid, $events) ? $events[$event->eid] + 1 : 1;
break;
case 'delete':
foreach (array(
'notifications_queue',
'notifications_event',
) as $table) {
db_query('DELETE FROM {' . $table . '} WHERE eid = %d', $event->eid);
}
if (array_key_exists($event->eid, $events)) {
unset($events[$event->eid]);
}
break;
case 'update':
foreach ($events as $eid => $count) {
db_query('UPDATE {notifications_event} SET counter = counter - %d WHERE eid = %d', $count, $eid);
}
case 'reset':
$events = array();
}
}
function notifications_update_sent($account, $method, $interval, $time) {
db_query("UPDATE {notifications_sent} SET sent = %d WHERE uid = %d AND send_interval = '%d' AND send_method = '%s'", $time, $account->uid, $interval, $method);
if (!db_affected_rows()) {
db_query("INSERT INTO {notifications_sent}(uid, send_interval, send_method, sent) VALUES(%d, '%d', '%s', %d)", $account->uid, $interval, $method, $time);
}
}
function notifications_process_compose($account, $events, $subscriptions, $send_method, $send_interval, $module = 'notifications') {
notifications_log('Processing for sending', array(
'method' => $send_method,
'interval' => $send_interval,
'module' => $module,
'events' => count($events),
));
if ($digest = notifications_digest_method($send_interval)) {
$function = $digest['digest callback'];
$messages = $function($account, $events, $subscriptions, $send_interval, $send_method, $module);
}
else {
$sender_option = variable_get('notifications_sender', 0);
foreach ($events as $event) {
$message = notifications_process_message($account, $event, $subscriptions[$event->eid], $send_method);
$message['notifications'] = array(
'events' => array(
$event,
),
'subscriptions' => $subscriptions,
);
if ($sender_option) {
$sender = notifications_load_user($event->uid);
$message['sender_name'] = $sender->name;
if ($sender_option == 2) {
$message['sender_account'] = $sender;
}
}
$messages[] = $message;
}
}
return $messages;
}
function notifications_process_send($account, $messages, $send_method, $test = FALSE) {
foreach ($messages as $message) {
notifications_process('count', 'send');
notifications_debug('Sending out notification', array(
'method' => $send_method,
'message' => $message,
));
notifications_message_send($account, $message, $send_method, $test);
}
return $messages;
}
function notifications_process_message($account, $event, $subscriptions, $send_method) {
$info = notifications_event_text($event);
$text = array(
'subject' => notifications_message_part('event', 'subject', $send_method, $event),
'header' => notifications_message_part('event', 'header', $send_method, $event),
'event' => notifications_message_part('event', 'main', $send_method, $event),
'footer' => notifications_message_part('event', 'footer', $send_method, $event),
);
if ($sid = array_shift($subscriptions)) {
$subscription = notifications_load_subscription($sid);
}
else {
$subscription = NULL;
}
$objects = array(
'user' => $account,
'event' => $event,
'subscription' => $subscription,
);
$objects = array_merge($objects, $event->objects);
$text = messaging_text_replace($text, $objects);
$subject = $text['subject'];
unset($text['subject']);
return array(
'subject' => $subject,
'body' => $text,
);
}
function notifications_message_part($type, $key, $method, $param = NULL, $module = 'notifications') {
if ($type == 'event' && is_object($param)) {
if (isset($param->text[$key])) {
return $param->text[$key];
}
else {
$options = array(
$param->type,
$param->action,
);
}
}
elseif ($method == 'test') {
return "{$type} {$key} [type-name] [title] [site-name]";
}
else {
$options = is_array($param) ? $param : array();
}
$search = $keyparts = array_merge(array(
$module,
$type,
), $options);
while ($keyparts) {
$groupkey = implode('-', $keyparts);
if ($text = messaging_message_part($groupkey, $key, $method)) {
$output = $text == MESSAGING_EMPTY ? '' : $text;
break;
}
array_pop($keyparts);
}
if (isset($output)) {
return $output;
}
elseif ($module != 'notifications') {
return notifications_message_part($type, $key, $method, $param, 'notifications');
}
else {
return "[UNDEFINED module = {$module}, key = {$key}, type = {$type}, method = {$method}, search = " . implode(',', $search) . ']';
}
}
function notifications_message_send($account, $message, $send_method, $test = FALSE) {
notifications_debug('Preparing user notification for messaging', array(
'message' => $message,
'account' => $account,
));
$message = (object) $message;
$message->type = 'notifications';
$message->test = $test;
notifications_process('count', 'message');
messaging_message_send_user($account, $message, $send_method);
}
function notifications_event_text($event) {
$info = notifications_event_types($event->type, $event->action);
return $info;
}
function notifications_load_user($uid, $destination = NULL, $send_method = NULL) {
if ($uid) {
return messaging_load_user($uid);
}
else {
$account = drupal_anonymous_user();
$account->destination = $destination;
$account->send_method = $send_method;
return $account;
}
}
function notifications_load_event($id) {
static $cache = array();
if (!array_key_exists($id, $cache)) {
$event = db_fetch_object(db_query("SELECT * FROM {notifications_event} WHERE eid = %d", $id));
$event->params = unserialize($event->params);
$event->objects = array();
notifications_module_invoke('event load', $event);
if (!empty($event->delete)) {
notifications_event_tracker('delete', $event);
$event = NULL;
}
$cache[$id] = $event;
}
return $cache[$id];
}
function notifications_queue_done($params) {
if (variable_get('notifications_log', 0)) {
notifications_queue_update($params, array(
'cron' => 0,
'sent' => time(),
));
}
else {
notifications_queue_delete($params);
}
}
function notifications_queue_update($params, $updates) {
$values = _messaging_query_conditions('notifications_queue', $updates);
$where = notifications_queue_query($params);
$args = array_merge($values['args'], $where['args']);
return db_query('UPDATE {notifications_queue} SET ' . implode(', ', $values['conditions']) . ' WHERE ' . implode(' AND ', $where['where']), $args);
}
function notifications_queue_delete($params) {
$query = notifications_queue_query($params);
db_query("DELETE FROM {notifications_queue} WHERE " . implode(' AND ', $query['where']), $query['args']);
}
function notifications_queue_query($params) {
$where = $args = array();
if (isset($params['max_sqid'])) {
$where[] = "sqid <= %d";
$args[] = $params['max_sqid'];
unset($params['max_sqid']);
}
$values = _messaging_query_conditions('notifications_queue', $params);
$where = array_merge($where, $values['conditions']);
$args = array_merge($args, $values['args']);
return array(
'where' => $where,
'args' => $args,
);
}
function nofitications_digest_event_info($event, $module = 'notifications') {
$info = notifications_event_types($event->type, $event->action);
if (!empty($info['digest'])) {
$type = $info['digest'][0];
$field = $info['digest'][1];
if ($type == 'event') {
$object = $event;
}
else {
$object = !empty($event->objects[$type]) ? $event->objects[$type] : NULL;
}
}
else {
$type = $event->type;
$field = $event->action;
$object = NULL;
}
$value = $object && isset($object->{$field}) ? $object->{$field} : 0;
return array(
'type' => $type,
'field' => $field,
'value' => $value,
'object' => $object,
'module' => $module,
);
}
function notifications_process_digest_short($account, $events, $subscriptions, $send_interval, $send_method, $module = 'notifications') {
$list = array();
foreach ($events as $event) {
notifications_log('Digesting short format', array(
'event' => $event,
));
$sid = is_array($subscriptions[$event->eid]) ? array_shift($subscriptions[$event->eid]) : 0;
$subscription = $sid ? notifications_load_subscription($sid) : NULL;
$objects = $event->objects + array(
'user' => $account,
'subscription' => $subscription,
);
$digest = nofitications_digest_event_info($event);
$digest_type = $digest['type'];
$digest_value = $digest['value'];
if (!isset($list[$digest_type][$digest_value]['group'])) {
$group = array(
'title' => notifications_digest_group($digest, 'title', $send_method),
'footer' => notifications_digest_group($digest, 'closing', $send_method),
);
$list[$digest_type][$digest_value]['group'] = messaging_text_replace($group, $objects);
notifications_log('Digesting object', array(
'type' => $digest_type,
'value' => $digest_value,
));
}
if (!isset($list[$digest_type][$digest_value]['line'][$event->eid])) {
$line = notifications_digest_line($event, $send_method, $objects);
$objects['event'] = $event;
$list[$digest_type][$digest_value]['line'][$event->eid] = messaging_text_replace($line, $objects);
}
}
$text['subject'] = notifications_message_part('digest', 'subject', $send_method, NULL, $module);
$text['header'] = notifications_message_part('digest', 'header', $send_method, NULL, $module);
$text['footer'] = notifications_message_part('digest', 'footer', $send_method, NULL, $module);
$text = messaging_text_replace($text, array(
'user' => $account,
'subscription' => NULL,
));
$body = theme('notifications_digest_short_body', $text, $list);
$message = array(
'subject' => $text['subject'],
'body' => $body,
'events' => $events,
'subscriptions' => $subscriptions,
'digest' => 'short',
);
return array(
$message,
);
}
function notifications_process_digest_long($account, $events, $subscriptions, $send_interval, $send_method, $module = 'notifications') {
$body = array();
foreach ($events as $event) {
notifications_log('Digesting long format', array(
'event' => $event,
));
$part = array();
$part[] = notifications_message_part('event', 'subject', $send_method, $event, $module);
$part[] = notifications_message_part('event', 'main', $send_method, $event, $module);
$sid = is_array($subscriptions[$event->eid]) ? array_shift($subscriptions[$event->eid]) : 0;
$subscription = $sid ? notifications_load_subscription($sid) : NULL;
$objects = $event->objects + array(
'user' => $account,
'subscription' => $subscription,
'event' => $event,
);
$body = array_merge($body, messaging_text_replace($part, $objects));
}
$text['subject'] = notifications_message_part('digest', 'subject', $send_method, NULL, $module);
$text['header'] = notifications_message_part('digest', 'header', $send_method, NULL, $module);
$text['footer'] = notifications_message_part('digest', 'footer', $send_method, NULL, $module);
$text = messaging_text_replace($text, array(
'user' => $account,
'subscription' => NULL,
));
$body = theme('notifications_digest_long_body', $text['header'], $body, $text['footer']);
$message = array(
'subject' => $text['subject'],
'body' => $body,
'events' => $events,
'subscriptions' => $subscriptions,
'digest' => 'long',
);
return array(
$message,
);
}
function notifications_digest_group($digest, $part, $method) {
static $texts = array();
$type = $digest['type'];
$value = $digest['value'];
if (!isset($texts[$type][$value][$part][$method])) {
if ($line = notifications_message_part('digest', $part, $method, array(
$type,
$digest['field'],
), $digest['module'])) {
$output = $line;
}
else {
$output = '';
}
$texts[$type][$value][$part][$method] = $output;
}
return $texts[$type][$value][$part][$method];
}
function notifications_digest_line($event, $method) {
static $digest = array();
if (!isset($digest[$event->eid][$method])) {
if (!empty($event->text['digest'])) {
$line = $event->text['digest'];
}
elseif ($part = notifications_message_part('event', 'digest', $method, $event)) {
$line = $part;
}
else {
$info = notifications_event_types($event->type, $event->action);
$line = $info['line'];
}
$digest[$event->eid][$method] = $line;
}
return $digest[$event->eid][$method];
}
function theme_notifications_digest_short_body($text, $list) {
$body['header'] = $text['header'];
foreach ($list as $type => $objects) {
foreach ($objects as $oid => $data) {
$body['content'][] = $data['group']['title'];
foreach ($data['line'] as $line) {
$body['content'][] = theme('notifications_digest_short_line', $line, $data['group']);
}
$body['content'][] = $data['group']['footer'];
}
}
$body['footer'] = $text['footer'];
return $body;
}
function theme_notifications_digest_short_line($line, $group) {
return '- ' . $line;
}
function theme_notifications_digest_long_body($header, $content, $footer) {
return array(
'header' => $header,
'content' => $content,
'footer' => $footer,
);
}