View source
<?php
define('MESSAGING_STEP_ROWS', variable_get('messaging_step_rows', 1000));
define('MESSAGING_TIME_MARGIN', variable_get('messaging_time_margin', 5));
class Messaging_Store {
const STEP_ROWS = MESSAGING_STEP_ROWS;
const TIME_MARGIN = MESSAGING_TIME_MARGIN;
const DB_TABLE = 'messaging_store';
const DB_KEY = 'mqid';
public static function can_expire() {
return TRUE;
}
public static function cron_process() {
$limit['timeout'] = variable_get('cron_semaphore', 0) + ini_get('max_execution_time') - self::TIME_MARGIN;
self::queue_process($limit);
self::queue_expire_messages();
self::queue_expire_logs();
}
public static function queue_process($limits = array()) {
$results = array();
$limit = self::process_limits($limits);
$count = 0;
$max = !empty($limit['message']) ? $limit['message'] : 0;
do {
$step = $max ? min(self::STEP_ROWS, $max - $count) : self::STEP_ROWS;
$result = self::queue_process_step($step, $limit['timeout']);
$number = count($result);
$count += $number;
$results = array_merge($results, $result);
} while ($number == $step && (!$limit['timeout'] || time() <= $limit['timeout']) && (!$max || $max > $count));
return $results;
}
public static function process_limits($limits = array()) {
$limits += variable_get('messaging_process_limit', array(
'message' => 0,
'time' => 0,
'percent' => MESSAGING_DEFAULT_CRON_PERCENT,
));
if (!empty($limit['timeout'])) {
$times[] = $limit['timeout'];
}
if (!empty($limit['time'])) {
$times[] = time() + $limit['time'];
}
if (!empty($limit['percent'])) {
$times[] = time() + ini_get('max_execution_time') * $limit['percent'] / 100;
}
$limits['timeout'] = !empty($times) ? min($times) : 0;
return $limits;
}
protected static function queue_process_step($limit, $timeout = 0) {
$count = 0;
$sent = $unsent = $processed = array();
$result = self::select_query('*', array(
'queue' => 1,
'cron' => 1,
), array(
'order' => array(
self::DB_KEY,
),
'limit' => $limit,
));
while ($object = db_fetch_object($result)) {
$message = self::message_unpack($object, TRUE);
$success = self::queue_process_message($message);
$processed[$message->mqid] = $success;
if ($success) {
$sent[] = $message->mqid;
messaging_debug('Processed message from queue', array(
'message' => $message,
'success' => $success,
));
}
else {
$unsent[] = $message->mqid;
watchdog('messaging', 'Failed queue processing for @message', array(
'@message' => (string) $message,
), WATCHDOG_WARNING);
}
$count++;
if ($timeout && time() > $timeout) {
break;
}
}
if ($sent) {
self::message_sent($sent);
}
if ($unsent) {
self::message_sent($unsent, TRUE);
}
return $processed;
}
protected static function queue_process_message($message) {
$message->queue = 0;
if (!empty($message->destinations)) {
$success = $message
->send_multiple();
}
else {
$success = $message
->send();
}
return $success;
}
public static function queue_clean() {
$count = self::queue_expire_messages();
$count += self::queue_expire_logs(TRUE);
return $count;
}
public static function queue_count() {
return db_result(self::select_query('COUNT(*)', array(
'queue' => 1,
'cron' => 1,
)));
}
public static function queue_expire_logs($force = FALSE) {
if (($expire_logs = variable_get('messaging_log', 0)) || $force) {
$time = time() - $expire_logs;
db_query('DELETE FROM {' . self::DB_TABLE . '} WHERE log = 1 AND queue = 0 AND sent < %d OR error = 1 AND created < %d', $time, $time);
return db_affected_rows();
}
else {
return 0;
}
}
public static function queue_expire_messages() {
if ($expire_messages = variable_get('messaging_queue_expire', 0)) {
$time = time() - $expire_messages;
db_query('DELETE FROM {' . self::DB_TABLE . '} WHERE created < %d', $time);
return db_affected_rows();
}
else {
return 0;
}
}
public static function get_messages($params, $order = NULL, $limit = NULL, $pager = NULL, $full = TRUE) {
$messages = array();
$result = self::select_query('*', $params, array(
'limit' => $limit,
'order' => $order,
'pager' => $pager,
));
while ($msg = db_fetch_object($result)) {
$messages[$msg->mqid] = self::message_unpack($msg, $full);
}
return $messages;
}
protected static function query_select($fields = '*', $conditions = NULL, $query_params = array()) {
$fields = is_array($fields) ? $fields : array(
$fields,
);
$query['select'] = $fields;
$query['from'][] = '{' . self::DB_TABLE . '}';
if ($conditions) {
$query += self::query_fields($conditions);
}
$query += $query_params;
return $query;
}
public static function select_query($fields = '*', $conditions = NULL, $query_params = array()) {
$query = self::query_select($fields, $conditions, $query_params);
return messaging_query_sql($query, TRUE);
}
public static function get_status($fields, $conditions = array()) {
$status = array();
$group = $fields;
$fields[] = 'count(*) AS total';
$result = self::select_query($fields, $conditions, array(
'group' => $group,
));
while ($data = db_fetch_array($result)) {
$status[] = $data;
}
return $status;
}
public static function message_load($mqid, $refresh = FALSE) {
if (!$refresh) {
$cached = self::cache_get($mqid);
}
if (isset($cached)) {
return $cached;
}
else {
$message = self::_message_load($mqid);
self::cache_set($mqid, $message ? $message : FALSE);
return $message;
}
}
protected static function _message_load($key) {
if ($message = db_fetch_object(self::select_query('*', array(
self::DB_KEY => $key,
), array(
'limit' => 1,
)))) {
return self::message_unpack($message, TRUE);
}
}
protected static function query_fields($fields) {
$query = _messaging_query_where(self::DB_TABLE, $fields);
if (isset($fields['max_mqid'])) {
$query['where'][] = 'mqid <= %d';
$query['args'][] = $fields['max_mqid'];
}
return $query;
}
protected static function message_unpack($message, $full = FALSE) {
if ($message->params) {
$message->params = unserialize($message->params);
}
drupal_unpack($message);
$message->prepared = $message->rendered = TRUE;
$message->queued = $message->queue;
$message->logged = $message->log;
return messaging_message_build($message);
}
public static function message_sent($mqid, $error = FALSE) {
$mqid = is_array($mqid) ? $mqid : array(
$mqid,
);
$where = self::query_fields(array(
'mqid' => $mqid,
));
if ($error) {
$sent = 0;
}
else {
db_query("DELETE FROM {messaging_store} WHERE log = 0 AND " . implode(' AND ', $where['where']), $where['args']);
$sent = time();
}
$args = array_merge(array(
$sent,
), $where['args']);
db_query("UPDATE {messaging_store} SET queue = 0, cron = 0, log = 1, sent = %d WHERE " . implode(' AND ', $where['where']), $args);
}
public static function delete_multiple($params) {
$where = self::query_fields($params);
db_query('DELETE FROM {' . self::DB_TABLE . '} WHERE ' . implode(' AND ', $where['where']), $where['args']);
}
public static function delete_all() {
db_query('DELETE FROM {' . self::DB_TABLE . '}');
return db_affected_rows();
}
public static function message_delete($message) {
$mqid = self::message_key($message);
self::cache_set($mqid, FALSE);
db_query('DELETE FROM {' . self::DB_TABLE . '} WHERE ' . self::DB_KEY . ' = %d', $mqid);
}
protected static function message_key($message) {
return is_object($message) ? $message->mqid : $message;
}
public static function message_save($message) {
$message = messaging_message_build($message);
self::message_prepare($message);
$update = empty($message->mqid) ? array() : 'mqid';
$result = drupal_write_record(self::DB_TABLE, $message, $update);
if ($result) {
$message->updated = FALSE;
$message->queued = $message->queue;
$message->logged = $message->log;
self::cache_set(self::message_key($message), $message);
}
messaging_debug('Saved message to store', array(
'message' => $message,
));
return $result;
}
public static function message_log($message) {
$message->log = 1;
$message->queue = $message->cron = 0;
return self::message_save($message);
}
public static function message_queue($message) {
$message->queue = 1;
return self::message_save($message);
}
protected static function message_prepare($message) {
$message
->prepare();
$message
->render();
if (empty($message->created)) {
$message->created = time();
}
foreach (array(
'queue',
'log',
'cron',
) as $field) {
$message->{$field} = empty($message->{$field}) ? 0 : 1;
}
$schema = drupal_get_schema(self::DB_TABLE);
$serialize_fields = array_diff($message
->data_fields(), array_keys($schema['fields']));
foreach ($serialize_fields as $field) {
if (isset($message->{$field})) {
$data[$field] = $message->{$field};
}
}
$message->data = !empty($data) ? $data : NULL;
}
protected static function cache_set($key, $value) {
messaging_static_cache_set('messaging_store', $key, $value);
}
protected static function cache_get($key) {
return messaging_static_cache_get('messaging_store', $key);
}
public static function admin_help() {
return array(
'name' => t('Messaging Store (built-in)'),
'queue' => t('Queued messages will be processed on cron.'),
);
}
public static function admin_settings() {
}
}