View source
<?php
define('MESSAGING_STEP_ROWS', 1000);
define('MESSAGING_TIME_MARGIN', 5);
function messaging_store_queue_process($timeout = 0) {
$limit = variable_get('messaging_process_limit', array(
'message' => 0,
'time' => 0,
'percent' => 0,
));
if ($timeout) {
$timelimit[] = time() + $timeout;
}
else {
$timelimit[] = variable_get('cron_semaphore', 0) + ini_get('max_execution_time') - MESSAGING_TIME_MARGIN;
}
if ($limit['time']) {
$timelimit[] = time() + $limit['time'];
}
if ($limit['percent']) {
$timelimit[] = time() + ini_get('max_execution_time') * $limit['percent'] / 100;
unset($limit['percent']);
}
$limit['time'] = min($timelimit);
$count = 0;
$max = !empty($limit['message']) ? $limit['message'] : 0;
do {
$step = $max ? min(MESSAGING_STEP_ROWS, $max - $count) : MESSAGING_STEP_ROWS;
$number = messaging_store_queue_process_step($step, $limit['time']);
$count += $number;
} while ($number == $step && time() <= $limit['time'] && (!$max || $max > $count));
}
function messaging_store_queue_process_step($limit, $timeout = 0) {
$count = 0;
$sent = $unsent = array();
$result = db_query_range("SELECT * FROM {messaging_store} WHERE queue = 1 AND cron = 1 ORDER BY mqid", 0, $limit);
while ($message = db_fetch_object($result)) {
messaging_store_unpack($message, TRUE);
$message->process = TRUE;
$message = messaging_message_callbacks(array(
'multisend',
'alftersend',
), $message, messaging_method_info($message->method));
if ($message->success) {
$sent[] = $message->mqid;
}
else {
$unsent[] = $message->mqid;
}
$count++;
if ($timeout && time() > $timeout) {
break;
}
}
if ($sent) {
messaging_store_sent($sent);
}
if ($unsent) {
messaging_store_sent($unsent, TRUE);
}
return $count;
}
function messaging_store_queue_cleanup() {
if ($expire = variable_get('messaging_log', 0)) {
db_query('DELETE FROM {messaging_store} WHERE log = 1 AND queue = 0 AND sent < %d', time() - $expire);
}
}
function messaging_store_get($params, $order = NULL, $limit = NULL, $pager = NULL, $unpack = FALSE) {
$messages = $where = $args = array();
list($where, $args) = messaging_store_query($params);
$sql = 'SELECT * FROM {messaging_store}';
$sql .= $where ? ' WHERE ' . implode(' AND ', $where) : '';
$sql .= $order ? ' ORDER BY ' . implode(', ', $order) : '';
if (!is_null($pager)) {
$result = pager_query($sql, $limit, $pager, NULL, $args);
}
elseif ($limit) {
$result = db_query_range($sql, $args, 0, $limit);
}
else {
$result = db_query($sql, $args);
}
while ($msg = db_fetch_object($result)) {
messaging_store_unpack($msg, $unpack);
$messages[$msg->mqid] = $msg;
}
return $messages;
}
function messaging_store_load($mqid) {
if ($message = db_fetch_object(db_query('SELECT * FROM {messaging_store} WHERE mqid = %d', $mqid))) {
messaging_store_unpack($message, TRUE);
return $message;
}
}
function messaging_store_query($fields) {
$where = $args = array();
foreach ($fields as $key => $value) {
if (is_array($value)) {
$type = $key == 'mqid' ? 'int' : 'varchar';
$where[] = $key . ' IN(' . db_placeholders($value, $type) . ')';
$args = array_merge($args, $value);
}
else {
$where[] = $key . " = '%s'";
$args[] = $value;
}
}
return array(
$where,
$args,
);
}
function messaging_store_unpack(&$message, $full = FALSE) {
if ($message->params) {
$params = unserialize($message->params);
$message->params = array();
foreach (array(
'destination',
'sender_name',
'destinations',
) as $field) {
if (!empty($params[$field])) {
$message->{$field} = $params[$field];
unset($params[$field]);
}
}
$group = messaging_method_info($message->method, 'group');
if ($group && empty($message->params[$group])) {
$message->params[$group] = $params;
}
else {
$message->params = $params;
}
}
if ($message->uid && $full) {
$message->account = messaging_load_user($message->uid);
}
if ($message->sender && $full) {
$message->sender_account = messaging_load_user($message->sender);
}
if (empty($message->destinations)) {
if (!empty($message->account) && ($userdest = messaging_user_destination($message->account, $message->method, $message))) {
$message->destinations = array(
$userdest,
);
}
elseif (!empty($message->destination)) {
$message->destinations = array(
$message->destination,
);
}
}
}
function messaging_store_sent($mqid, $error = FALSE) {
$mqid = is_array($mqid) ? $mqid : array(
$mqid,
);
list($where, $args) = messaging_store_query(array(
'mqid' => $mqid,
));
if ($error) {
$sent = 0;
}
else {
db_query("DELETE FROM {messaging_store} WHERE log = 0 AND " . implode(' AND ', $where), $args);
$sent = time();
}
$args = array_merge(array(
$sent,
), $args);
db_query("UPDATE {messaging_store} SET queue = 0, cron = 0, log = 1, sent = %d WHERE " . implode(' AND ', $where), $args);
}
function messaging_store_del($params) {
list($where, $args) = messaging_store_query($params);
db_query("DELETE FROM {messaging_store} WHERE " . implode(' AND ', $where), $args);
}
function messaging_store_save($message) {
foreach (array(
'queue',
'log',
'cron',
) as $field) {
$message->{$field} = empty($message->{$field}) ? 0 : 1;
}
if (!empty($message->sender_account)) {
$message->sender = $message->sender_account->uid;
}
$group = messaging_method_info($message->method, 'group');
$params = !empty($params[$group]) ? $params[$group] : array();
if (!empty($message->sender_name)) {
$params['sender_name'] = $message->sender_name;
}
if (!empty($message->account)) {
$message->uid = $message->account->uid;
$message->destination = 'user:' . $message->uid;
}
if (!empty($message->destinations)) {
$params['destinations'] = $message->destinations;
}
if (empty($message->destination) && !empty($message->destinations)) {
if (count($message->destinations) > 1) {
$message->destination = 'multiple';
}
elseif ($destination = $message->destinations[0]) {
if (is_string($destination) || is_numeric($destination)) {
$message->destination = $destination;
}
}
}
$message->params = $params ? $params : NULL;
$message->created = time();
drupal_write_record('messaging_store', $message);
return $message;
}