notifications.cron.inc in Notifications 6.3
Same filename and directory in other branches
File
notifications.cron.incView source
<?php
/**
* Notifications module. Queue processing.
*
* Callbacks for queue processing. They may be implemented by other modules
* _load_user()
* _user_allowed()
* _process_send()
*
* @ TO DO: Support different languages for message localization
*/
// Number of users to process for each step
define('NOTIFICATIONS_STEP_ROWS', 1000);
define('NOTIFICATIONS_STEP_USERS', 1000);
// Minimum amount of seconds the process will need for clean-up tasks
// Just to make sure that after exhausting cron assigned time we'll have a few spare seconds for some cleanup
define('NOTIFICATIONS_TIME_MARGIN', 5);
/**
* Function to be called on cron by the main notifications_cron
*
* It will call each subscription_process for each interval a number of times
*
* This should send out messages starting with immediate delivery. We send first immediate delivery
* because the other ones can be added up for each period.
* Assumption: The bigger the interval, the longer delay it may admit (?) I.e. sending hourly email
* after 1 hour 15 mins may be ok if the system is overloaded.
*/
function notifications_process_run($cron = TRUE) {
// If we are running on language split mode we'll get a language here, which switches automatically for cron
// When we run the sending manually, the language is current language
$language = notifications_process_language();
notifications_log('Starting notifications process', array(
'language' => $language ? $language->name : 'All',
));
notifications_process('start');
// There may be special time adjustments for cron
if ($cron) {
notifications_process('cron');
}
$stop = FALSE;
$send_intervals = _notifications_send_intervals();
unset($send_intervals[-1]);
if ($max_sqid = notifications_process_prepare()) {
foreach ($send_intervals as $interval => $interval_name) {
notifications_log('Processing queue', array(
'send interval' => $interval_name,
));
while (notifications_process_queue($interval, $max_sqid, $language)) {
$stop = !notifications_process('check');
}
if ($stop) {
notifications_log('Process stopped, reached processing limits');
break;
}
else {
notifications_log('Process finished', array(
'send interval' => $interval_name,
));
}
}
}
else {
notifications_log('No rows in queue');
}
// Advance the language, we don't mind whether we've reached limits, need to move on
notifications_process_language(TRUE);
}
/**
* If language split mode enabled set global language or mark current language as done
*
* We go through the language list in order and when done start again. Ideally this should
* run just once per cron run because there are multiple static variables out there that
* can be set just once.
*
*
*
* Note that this function may switch global language.
*
* @return $language
* Current language in case we've changed it
*/
function notifications_process_language() {
global $language;
if (variable_get('notifications_language_split_mode', 0) && module_exists('i18ncron') && variable_get('language_count', 1) > 1) {
return $language;
}
}
/**
* Remove a language form the database when a language has been disabled
*/
function notifications_disable_language($langcode) {
$default = language_default('language');
foreach (array(
'notifications',
'notifications_queue',
) as $table) {
db_query("UPDATE {" . $table . "} SET language = '%s' WHERE language = '%s'", $default, $langcode);
}
}
/**
* Prepare subscriptions queue
*
* This is intended to avoid race conditions where new rows are added while the process is running
*
* @return
* Max $sqid that will be processed this cron
*/
function notifications_process_prepare() {
// Clean up expired logs from queue if logging enabled
if ($keep = variable_get('notifications_log', 0)) {
db_query("DELETE FROM {notifications_queue} WHERE cron = 0 AND sent < %d", time() - $keep);
}
// Clean up event table
notifications_event_clean();
// This will get the latest notification in queue so we don't mess with new ones being created during cron run
// It will also prevent clashes with the immediate sending feature
return db_result(db_query("SELECT max(sqid) FROM {notifications_queue}"));
}
/**
* Clean up event table
*
* @param $update
* Update event counter
*/
function notifications_event_clean($update = FALSE) {
// This expiretime will prevent some race condition that occurs when the event is saved but the subs queue not yet populated
$expiretime = time() - 60;
if ($update) {
// Update event counter, which keeps the number of notifications pending for each event
db_query("UPDATE {notifications_event} e SET counter = (SELECT COUNT(*) FROM {notifications_queue} q WHERE q.eid = e.eid ) WHERE e.created < %d", $expiretime);
}
db_query("DELETE FROM {notifications_event} WHERE counter = 0 AND created < %d", $expiretime);
// Delete events with no pending notifications. As events are created sequentially, we use this fact to speed up the query
db_query("DELETE FROM {notifications_event} WHERE created < %d AND eid < (SELECT MIN(eid) FROM {notifications_queue})", $expiretime);
}
/**
* Controls and checks limits for queue processing
* It can be used by other modules to add their own limits here, like number of sms sent, etc...
* @param $op
* 'start' => Start the counters
* 'cron' => Special time adjustment for cron operations
* 'init' => Start a new counter with $value limit
* 'option' => Sets /gets options
* - debug
* - output Enables output for admin page
* @return
* TRUE if we are yet under the processing limits
*/
function notifications_process($op = 'check', $name = NULL, $value = NULL) {
static $limit = array(), $options = array();
static $current = array(
'message' => 0,
'step' => 0,
);
switch ($op) {
case 'start':
$defaults = variable_get('notifications_process_limit', array(
'time' => 0,
'message' => 0,
'row' => 0,
'percent' => 0,
));
foreach ($defaults as $name => $value) {
if ($value && !isset($limit[$name])) {
$limit[$name] = $value;
}
}
break;
case 'cron':
// Calculate time limit. We get the smaller of all these times in seconds
// There's an issue with poormanscron not setting the cron semaphore so it will default to current time
$timelimit = array();
$cronstart = variable_get('cron_semaphore', time());
// Max execution time may be zero meaning no limit, then no limits based on this
if ($maxtime = ini_get('max_execution_time')) {
$timelimit[] = $cronstart + $maxtime - NOTIFICATIONS_TIME_MARGIN;
if (!empty($limit['percent'])) {
$timelimit[] = time() + $maxtime * $limit['percent'] / 100;
unset($limit['percent']);
}
}
// This is an absolute limit, applies always if set
if (!empty($limit['time'])) {
$timelimit[] = time() + $limit['time'];
}
if ($timelimit) {
$limit['time'] = min($timelimit);
}
break;
break;
case 'init':
$current[$name] = 0;
$limit[$name] = $value;
break;
case 'count':
$value = $value ? $value : 1;
isset($current[$name]) ? $current[$name] += $value : ($current[$name] = $value);
break;
case 'option':
if (isset($value)) {
$options[$name] = $value;
}
return isset($options[$name]) ? $options[$name] : FALSE;
}
$current['time'] = time();
// Check all limits till we find a false one
foreach ($limit as $name => $value) {
if ($value && !empty($current[$name]) && $current[$name] >= $value) {
watchdog('notifications', 'Reached processing limit on queue processing: %name = %value', array(
'%name' => $name,
'%value' => $value,
));
return FALSE;
}
}
return TRUE;
}
/**
* Process rows given query conditions
*
* This is used by the immediate sending feature
* @see notifications_queue_query()
*
* @param $conditions
* Array of query conditions
* @param $limit
* Optional, limit the number of rows to process
* @param $update
* Optional, update queue rows and event counter after processing
*/
function notifications_process_rows($conditions, $limit = 0, $update = TRUE) {
notifications_log('Processing queue rows', $conditions);
$account = $destination = NULL;
$subscriptions = $events = $processed = array();
$send_method = $send_interval = $module = NULL;
$test = notifications_process('option', 'test');
$count = 0;
// Build query and fetch rows from queue
$query = notifications_queue_query($conditions);
$sql = "SELECT * FROM {notifications_queue} ";
$sql .= " WHERE " . implode(' AND ', $query['where']);
$sql .= " ORDER BY module, uid, destination, send_method, send_interval";
if ($limit) {
$result = db_query_range($sql, $query['args'], 0, $limit);
}
else {
$result = db_query($sql, $query['args']);
}
// Group rows by module, user, send_method, send_interval before composing and sending
// This loop has to run a final time after all rows have been fetched
while (($queue = db_fetch_object($result)) || $processed) {
if (!$account || !$queue || $queue->module != $module || $queue->uid != $account->uid || $queue->destination != $destination || $queue->send_method != $send_method || $queue->send_interval != $send_interval) {
// New user or sending method or destination, send if not the first row and reset
if ($account && $events && $subscriptions) {
// Build parameters into object so they are manageable and easily extendable
$parameters = (object) array(
'module' => $module,
'account' => $account,
'events' => $events,
'subscriptions' => $subscriptions,
'send_method' => $send_method,
'send_interval' => $send_interval,
'test' => $test,
'language' => NULL,
'time' => time(),
);
// Defaults to notifications_process_compose()
$messages = notifications_callback($module, 'process_compose', $parameters);
notifications_log('Composed messages', array(
'number' => count($messages),
'send_method' => $send_method,
));
// Note that we pass the testing parameter to notifications_process_send
// Defaults to notifications_process_send()
notifications_callback($module, 'process_send', $messages, $parameters);
if (!$test) {
notifications_update_sent($account, $send_method, $send_interval, time());
}
}
// If rows processed and $update enabled, we mark them as done
if ($processed && $update) {
notifications_queue_done(array(
'sqid' => $processed,
));
}
$subscriptions = $events = $processed = array();
// Keep track of parameters that will trigger a sending when changing
if ($queue) {
$send_method = $queue->send_method;
$send_interval = $queue->send_interval;
$destination = $queue->destination;
$module = $queue->module;
// Users may be handled by a different module implementing the _load_user callback.
// I.e. for anonymous users it may load the name from somewhere
$account = notifications_callback($module, 'load_user', $queue->uid, $destination, $send_method);
}
}
// For every row in queue, compile everything that will be available for sending
if ($queue) {
$count++;
$processed[] = $queue->sqid;
// Load event, check it exists and check the user has access to the event objects
if ($event = notifications_load_event($queue->eid)) {
notifications_event_tracker('count', $event);
notifications_log('Processing queued', array(
'queue sqid' => $queue->sqid,
'event' => $queue->eid,
'type' => $event->type,
'action' => $event->action,
'send method' => $send_method,
));
if (notifications_user_allowed('event', $account, $event)) {
// This will take care of duplicated events
$events[$queue->eid] = $event;
// We keep track also of subscriptions originating this event
$subscriptions[$queue->eid][] = $queue->sid;
}
else {
notifications_log('Access denied for event', array(
'account' => $user->uid,
'event' => $queue->eid,
));
}
}
else {
notifications_log('Cannot load event', array(
'eid' => $queue->eid,
'queue sid' => $queue->sid,
));
}
}
}
if ($update) {
notifications_event_tracker('update');
}
// Return number of rows processed
return $count;
}
/**
* Process subscriptions queue
*
* The subscriptions queue has the following fields
* sqid, uid, eid, sid, digest
*
* This function should be able of splitting the whole processing in several steps.
* It will be called multiple times for each send interval
*
* Messages will be processed for each send interval, send_method, user
*
* @param $send_interval
* Send interval to process
* @param $max_sqid
* Max queue id to process
* @param $language
* Optional language to process only rows in this language
* @return Number of rows processed
*
* @ TODO Review time conditions
* @ TODO Per module queue processing
*/
function notifications_process_queue($send_interval, $max_sqid, $language = NULL) {
notifications_log('Starting queue processing', array(
'send interval' => $send_interval,
'max sqid' => $max_sqid,
));
// Option for test running, marking messages as test, nor updating not sending
$test = notifications_process('option', 'test');
// Option for normal running but without updating the queue records
$keep = notifications_process('option', 'keep');
// Count processed rows
$count = 0;
// This is the time from which stored rows will be sent
$timelimit = time() - $send_interval;
// Get users to process messages for, with this time interval and ordered by squid
// Order by last sent for this send interval
// Note: If we get the users with more messages pending first this may save some time
$sql = "SELECT q.uid, q.destination, q.module, q.send_method, count(*) AS count_rows FROM {notifications_queue} q ";
$sql .= " LEFT JOIN {notifications_sent} su ON q.uid = su.uid AND q.send_interval = su.send_interval AND q.send_method = su.send_method ";
$sql .= " WHERE q.cron = 1 AND q.send_interval = '%d' AND q.sqid <= %d";
if ($language) {
$sql .= " AND q.language = '%s'";
}
$sql .= " AND (su.uid IS NULL OR su.sent < %d) ";
// Note: the group by su.sent seems to be needed by pgsql
$sql .= " GROUP BY q.uid, q.destination, q.module, q.send_method, su.sent ORDER BY su.sent";
if ($language) {
$result = db_query_range($sql, $send_interval, $max_sqid, $language->language, $timelimit, 0, NOTIFICATIONS_STEP_USERS);
}
else {
$result = db_query_range($sql, $send_interval, $max_sqid, $timelimit, 0, NOTIFICATIONS_STEP_USERS);
}
// We create a bach for each user, destination, method and hand it over to notifications_process_rows()
while (($queue = db_fetch_object($result)) && notifications_process('check')) {
notifications_log('Queue processing', array(
'user' => $queue->uid,
'rows' => $queue->count_rows,
'send method' => $queue->send_method,
));
$module = $queue->module;
$events = $subscriptions = $processed = array();
// Process all rows for this user. With some hard limit to prevent process lock ups.
// In case we have too many rows, we go updating step by step
if ($queue->count_rows > NOTIFICATIONS_STEP_ROWS) {
$limit = NOTIFICATIONS_STEP_ROWS;
$update = TRUE;
}
else {
$limit = $queue->count_rows;
$update = FALSE;
}
$update = $update && !$keep;
$batch = array(
'uid' => $queue->uid,
'destination' => $queue->destination,
'module' => $queue->module,
'send_method' => $queue->send_method,
'send_interval' => $send_interval,
'cron' => 1,
'max_sqid' => $max_sqid,
);
if ($language) {
$batch['language'] = $language->language;
}
// These rows may be processed by a different module. Defaults to notifications_process_rows()
$processed = notifications_callback($queue->module, 'process_rows', $batch, $limit, $update);
$count += $processed;
// If we didn't update row by row ($update), we update all at once
if ($processed && !$test && !$update && !$keep) {
notifications_queue_done($batch);
}
}
// If not doing a test run, update event counter and return count
// If doing a test run, return 0 so we don't go through this again
if (!$test && !$keep) {
notifications_event_tracker('update');
return $count;
}
else {
return 0;
}
}
/**
* Keep track of events and update event counter with processed rows eids
*
* @param $op
* count, reset, update
* @param $event
* event object to track
*/
function notifications_event_tracker($op, $event = NULL) {
$events =& messaging_static(__FUNCTION__);
switch ($op) {
case 'count':
$events[$event->eid] = isset($events[$event->eid]) ? $events[$event->eid] + 1 : 1;
break;
case 'delete':
// Delete event and all related rows. For events no longer available, deleted nodes, comments, etc..
foreach (array(
'notifications_queue',
'notifications_event',
) as $table) {
db_query('DELETE FROM {' . $table . '} WHERE eid = %d', $event->eid);
}
if (isset($events[$event->eid])) {
unset($events[$event->eid]);
}
break;
case 'update':
if (!empty($events)) {
foreach ($events as $eid => $count) {
db_query('UPDATE {notifications_event} SET counter = counter - %d WHERE eid = %d', $count, $eid);
}
}
// Intentional no break (update will also reset)
case 'reset':
$events = array();
}
}
/**
* Update user last time sent for each sending method / interval
*/
function notifications_update_sent($account, $method, $interval, $time) {
db_query("UPDATE {notifications_sent} SET sent = %d WHERE uid = %d AND send_interval = '%d' AND send_method = '%s'", $time, $account->uid, $interval, $method);
if (!db_affected_rows()) {
db_query("INSERT INTO {notifications_sent}(uid, send_interval, send_method, sent) VALUES(%d, '%d', '%s', %d)", $account->uid, $interval, $method, $time);
}
}
/**
* Message composition.
*
* Processes everything, included templating and digestion and sends message/s.
*
* Adds some more information into $message['notifications'] that may be used by other modules
*
* @param $account
* User account to send the notification to
* @param $events
* Array of loaded event objects to be processed
* @param $subscriptions
* Array of arrays of subscription ids (sids) for each event(eid)
*
* @return array()
* Array of messages ready for sending out
*/
function notifications_process_compose($params) {
notifications_log('Process compose', array(
'params' => $params,
));
// Digest if send_interval > 0 (not immediate sending)
$build_method = notifications_build_method($params->send_interval);
$build_function = $build_method['build callback'];
// Add sender option parameter if not set
if (!isset($params->sender_option)) {
$params->sender_option = variable_get('notifications_sender', 0);
}
// If it is a digest build method, build all at once
if (!empty($build_method['digest'])) {
$params->digest = $build_method['type'];
// It can be digested in more than one message by some other digest plug-in
$messages = $build_function($params);
}
else {
// Build messages array, the function will advance the event
$messages = array();
while ($event = current($params->events)) {
$messages[] = $build_function($event, $params);
next($params->events);
}
}
// Reset the parameters and return messages
reset($params->events);
return $messages;
}
/**
* Send array of messages through messaging module
*
* @param $messages
* Array of messages prepared for sending
* @param $params
* Parameters object
*/
function notifications_process_send($messages, $params) {
foreach ($messages as $message) {
notifications_process('count', 'send');
notifications_debug('Sending out notification', array(
'method' => $params->send_method,
'message' => (string) $message,
));
notifications_message_send($message->account, $message);
}
return $messages;
}
/**
* Creates a single message for a single event
*
* Replaces notifications_process_message();
*
* @param $account
* Destination user account
* @param $event
* Event object which caused this notification
* @param $subscriptions
* Array of subscription ids
*
* @return
* Message array
*/
function notifications_process_build_simple($event, $params) {
// Get base template with all parameters
$template = Notifications_Template::create_event_template($event, $params->send_method, $params->language, $params->module);
//notifications_event_template($params->event, $params);
$template
->set_params($params);
// Pass only the first subscription for this event
$subscriptions = !empty($params->subscriptions[$event->eid]) ? $params->subscriptions[$event->eid] : array();
$event_subscription = ($sid = current($subscriptions)) ? notifications_load_subscription($sid) : NULL;
$template->subscriptions = array(
$event->eid => $subscriptions,
);
$template
->set_object('subscriptions', $template->subscriptions);
$template
->set_object('events', array(
$event,
));
// Build event parts
$template
->add_part('subject');
$template
->add_part('header');
$template
->add_part('main');
$template
->add_part('footer');
// We pass only the first subscription, which is at least something
// @ TODO Handle nicely the case where there are more than one subscription
$template
->set_object('user', $params->account);
$template
->set_object('subscription', $event_subscription);
notifications_log('Built event template', array(
'template' => $template,
));
// Build message from template and parameters
return $template
->build();
}
/**
* Message sending, pass the message to Messaging back end
*
* @param $account
* User account to send the message to
* @param $message
* Message object
*
* @return boolean
* TRUE if sending was successfull
*/
function notifications_message_send($account, $message) {
$destination = !empty($account->name) ? $account->name : $message->destination;
notifications_debug('Preparing user notification for messaging', array(
'message' => $message,
'destination' => $destination,
));
$message
->set_user($account);
return $message
->send();
}
/**
* Get texts for event
*
* @ TODO Support for configurable texts
*/
function notifications_event_text($event) {
$info = notifications_event_types($event->type, $event->action);
return $info;
}
/**
* Get template object for event
*/
function notifications_event_template($event, $params, $module = 'notifications', $send_method = NULL, $language = NULL) {
$engine = notifications_template_engine();
if ($template = $engine
->get_event_template($event, $module, $send_method, $language)) {
// Set some other template properties
$template->event = $event;
return $template;
}
}
/**
* Get users with static caching for existing users
*
* If not uid passed it will return an anonymous fake user (with destination, send_method)
* We need to pass the send method to produce the right tokens later
*
* This provides some API support for user-less subscriptions, i.e. when we've got just
* an email address but no user associated. The idea is that these fake users will be properly
* handled by messaging module
*
* @todo Possibly all this should be handled by messaging layer
*
* @param $uid
* Uid of the user account to load, none to use anonymous user
* @param $destination
* Messaging destination (mail, sms number, etc..), just for anonymous users
* @param $send_method
* Messaging send method key (mail, sms, xmpp, etc..), just for anonymous users
*/
function notifications_load_user($uid, $destination = NULL, $send_method = NULL) {
if ($uid) {
return messaging_load_user($uid);
}
else {
$account = drupal_anonymous_user();
$account->destination = $destination;
$account->send_method = $send_method;
return $account;
}
}
/**
* Get events with static caching. Handle event deletion if not available anymore
*/
function notifications_load_event($id) {
$cache =& messaging_static(__FUNCTION__);
if (!$cache || !array_key_exists($id, $cache)) {
$event = db_fetch_object(db_query("SELECT * FROM {notifications_event} WHERE eid = %d", $id));
$event->params = unserialize($event->params);
// Load aditional objects for the event
$event->objects = array();
notifications_module_invoke('event load', $event);
// Check event status, it may need deletion if objects are not available
if (!empty($event->delete)) {
notifications_event_tracker('delete', $event);
$event = NULL;
}
$cache[$id] = $event;
}
return $cache[$id];
}
/**
* Mark queue rows as done
*
* Either log, if logging enabled, or delete
*/
function notifications_queue_done($params) {
if (variable_get('notifications_log', 0)) {
notifications_queue_update($params, array(
'cron' => 0,
'sent' => time(),
));
}
else {
notifications_queue_delete($params);
}
}
/**
* Update queue rows with defined values
*
* @arg $params
* Parameters to select the queue rows for updating. Array of field => value pairs
* @arg $update
* Fields values to update. Array of field => value pairs
*/
function notifications_queue_update($params, $updates) {
$values = _messaging_query_conditions('notifications_queue', $updates);
$where = notifications_queue_query($params);
$args = array_merge($values['args'], $where['args']);
return db_query('UPDATE {notifications_queue} SET ' . implode(', ', $values['conditions']) . ' WHERE ' . implode(' AND ', $where['where']), $args);
}
/**
* Delete rows from subscriptions queue
*
* @see notifications_queue_query()
*
* Note: Handle with care if no params may delete all rows
*/
function notifications_queue_delete($params) {
$query = notifications_queue_query($params);
db_query("DELETE FROM {notifications_queue} WHERE " . implode(' AND ', $query['where']), $query['args']);
}
/**
* Build query conditions for queue queries
*
* @param $params
* Array of parameters, field => value form
* Special parameters
* 'max_squid' => max squid to delete
* 'rows' => array of squid values to delte
* @return
* Array with 'where' and 'args' elements. Each of them is an array
*/
function notifications_queue_query($params) {
$where = $args = array();
// Special condition max_sqid
if (isset($params['max_sqid'])) {
$where[] = "sqid <= %d";
$args[] = $params['max_sqid'];
unset($params['max_sqid']);
}
// User generic query builder for the rest of fields
$values = _messaging_query_conditions('notifications_queue', $params);
$where = array_merge($where, $values['conditions']);
$args = array_merge($args, $values['args']);
return array(
'where' => $where,
'args' => $args,
);
}
Functions
Name | Description |
---|---|
notifications_disable_language | Remove a language form the database when a language has been disabled |
notifications_event_clean | Clean up event table |
notifications_event_template | Get template object for event |
notifications_event_text | Get texts for event |
notifications_event_tracker | Keep track of events and update event counter with processed rows eids |
notifications_load_event | Get events with static caching. Handle event deletion if not available anymore |
notifications_load_user | Get users with static caching for existing users |
notifications_message_send | Message sending, pass the message to Messaging back end |
notifications_process | Controls and checks limits for queue processing It can be used by other modules to add their own limits here, like number of sms sent, etc... |
notifications_process_build_simple | Creates a single message for a single event |
notifications_process_compose | Message composition. |
notifications_process_language | If language split mode enabled set global language or mark current language as done |
notifications_process_prepare | Prepare subscriptions queue |
notifications_process_queue | Process subscriptions queue |
notifications_process_rows | Process rows given query conditions |
notifications_process_run | Function to be called on cron by the main notifications_cron |
notifications_process_send | Send array of messages through messaging module |
notifications_queue_delete | Delete rows from subscriptions queue |
notifications_queue_done | Mark queue rows as done |
notifications_queue_query | Build query conditions for queue queries |
notifications_queue_update | Update queue rows with defined values |
notifications_update_sent | Update user last time sent for each sending method / interval |