View source
<?php
define('MESSAGING_STEP_ROWS', 1000);
define('MESSAGING_TIME_MARGIN', 5);
function messaging_store_queue_process() {
$limit = variable_get('messaging_process_limit', array(
'message' => 0,
'time' => 0,
'percent' => 0,
));
$timelimit[] = variable_get('cron_semaphore', 0) + ini_get('max_execution_time') - MESSAGING_TIME_MARGIN;
if ($limit['time']) {
$timelimit[] = time() + $limit['time'];
}
if ($limit['percent']) {
$timelimit[] = time() + ini_get('max_execution_time') * $limit['percent'] / 100;
unset($limit['percent']);
}
$limit['time'] = min($timelimit);
do {
$number = messaging_store_queue_process_step(MESSAGING_STEP_ROWS, $limit['time']);
} while ($number == MESSAGING_STEP_ROWS && time() <= $limit['time']);
}
function messaging_store_queue_process_step($limit, $timeout = 0) {
$count = 0;
$sent = $unsent = array();
$result = db_query_range("SELECT * FROM {messaging_store} WHERE queue = 1 AND cron = 1 ORDER BY mqid", 0, $limit);
while ($message = db_fetch_array($result)) {
messaging_store_unpack($message, TRUE);
if (messaging_message_send_out($message['destination'], $message, $message['method'])) {
$sent[] = $message['mqid'];
}
else {
$unsent[] = $message['mqid'];
}
$count++;
if ($timeout && time() > $timeout) {
break;
}
}
if ($sent) {
messaging_store_sent($sent);
}
if ($unsent) {
messaging_store_sent($unsent, TRUE);
}
return $count;
}
function messaging_store_queue_cleanup() {
if ($expire = variable_get('messaging_log_expire', 0)) {
db_query('DELETE FROM {messaging_store} WHERE log = 1 AND queue = 0 AND sent < %d', time() - $expire);
}
}
function messaging_store_pull_pending($method, $users, $limit = 0, $delete = TRUE) {
$messages = messaging_store_get(array(
'method' => $method,
'uid' => $users,
));
if ($messages && $delete) {
messaging_store_sent(array_keys($messages));
}
return $messages;
}
function messaging_store_get($params, $order = NULL, $limit = NULL, $pager = NULL, $unpack = FALSE) {
$messages = $where = $args = array();
list($where, $args) = messaging_store_query($params);
$sql = 'SELECT * FROM {messaging_store}';
$sql .= $where ? ' WHERE ' . implode(' AND ', $where) : '';
$sql .= $order ? ' ORDER BY ' . implode(', ', $order) : '';
if (!is_null($pager)) {
$result = pager_query($sql, $limit, $pager, NULL, $args);
}
elseif ($limit) {
$result = db_query_range($sql, $args, 0, $limit);
}
else {
$result = db_query($sql, $args);
}
while ($msg = db_fetch_array($result)) {
messaging_store_unpack($msg, $unpack);
$messages[$msg['mqid']] = $msg;
}
return $messages;
}
function messaging_store_query($fields) {
$where = $args = array();
foreach ($fields as $key => $value) {
if (is_array($value)) {
$placeholder = $key == 'mqid' ? '%d' : "'%s'";
$where[] = "{$key} IN (" . implode(',', array_fill(0, count($value), $placeholder)) . ')';
$args = array_merge($args, $value);
}
else {
$where[] = $key . " = '%s'";
$args[] = $value;
}
}
return array(
$where,
$args,
);
}
function messaging_store_unpack(&$message, $full = FALSE) {
if ($message['params']) {
$params = unserialize($message['params']);
$message['params'] = array();
foreach (array(
'destination',
'sender_name',
) as $field) {
if (!empty($params[$field])) {
$message[$field] = $params[$field];
unset($params[$field]);
}
}
$group = messaging_method_info($message['method'], 'group');
$message['params'][$group] = $params;
}
if ($message['uid'] && $full) {
$message['account'] = messaging_load_user($message['uid']);
}
if ($message['sender'] && $full) {
$message['sender_account'] = messaging_load_user($message['sender']);
}
}
function messaging_store_sent($mqid, $log = FALSE) {
$mqid = is_array($mqid) ? $mqid : array(
$mqid,
);
list($where, $args) = messaging_store_query(array(
'mqid' => $mqid,
));
if (!$log) {
db_query("DELETE FROM {messaging_store} WHERE log = 0 AND " . implode(' AND ', $where), $mqid);
}
$args = array_merge(array(
time(),
), $mqid);
db_query("UPDATE {messaging_store} SET queue = 0, log = 1, sent = %d WHERE " . implode(' AND ', $where), $args);
}
function messaging_store_del($params) {
list($where, $args) = messaging_store_query($params);
db_query("DELETE FROM {messaging_store} WHERE " . implode(' AND ', $where), $args);
}
function messaging_store_save($method, $destinations, $message, $sent = 0, $queue = 0, $log = 0, $cron = 1) {
$message += array(
'account' => NULL,
'sender' => 0,
);
if (!empty($message['sender_account'])) {
$message['sender'] = $message['sender_account']->uid;
}
$group = messaging_method_info($method, 'group');
$params = !empty($params[$group]) ? $params[$group] : array();
if (!empty($message['sender_name'])) {
$params['sender_name'] = $message['sender_name'];
}
foreach ($destinations as $destination) {
if ($message['account']) {
$uid = $message['account']->uid;
}
elseif (is_object($destination) && isset($destination->uid)) {
$uid = $destination->uid;
}
else {
$uid = 0;
}
if (is_object($destination) || is_array($destination->uid)) {
$params['destination'] = $destination;
$destination = '';
}
db_query("INSERT INTO {messaging_store} (method, uid, sender, destination, created, sent, queue, log, cron, subject, body, params)" . " VALUES('%s', %d, %d, '%s', %d, %d, %d, %d, %d, '%s', '%s', '%s')", $method, $uid, $message['sender'], $destination, time(), $sent, $queue, $log, $cron, $message['subject'], $message['body'], $params ? serialize($params) : '');
}
}