View source
<?php
define('MESSAGING_STEP_ROWS', 1000);
define('MESSAGING_TIME_MARGIN', 5);
function messaging_store_queue_process($timeout = 0) {
$limit = variable_get('messaging_process_limit', array(
'message' => 0,
'time' => 0,
'percent' => 0,
));
if ($timeout) {
$timelimit[] = time() + $timeout;
}
else {
$timelimit[] = variable_get('cron_semaphore', 0) + ini_get('max_execution_time') - MESSAGING_TIME_MARGIN;
}
if ($limit['time']) {
$timelimit[] = time() + $limit['time'];
}
if ($limit['percent']) {
$timelimit[] = time() + ini_get('max_execution_time') * $limit['percent'] / 100;
unset($limit['percent']);
}
$limit['time'] = min($timelimit);
$count = 0;
$max = !empty($limit['message']) ? $limit['message'] : 0;
do {
$step = $max ? min(MESSAGING_STEP_ROWS, $max - $count) : MESSAGING_STEP_ROWS;
$number = messaging_store_queue_process_step($step, $limit['time']);
$count += $number;
} while ($number == $step && time() <= $limit['time'] && (!$max || $max > $count));
}
function messaging_store_queue_process_step($limit, $timeout = 0) {
$count = 0;
$sent = $unsent = array();
$result = db_query_range("SELECT * FROM {messaging_store} WHERE queue = 1 AND cron = 1 ORDER BY mqid", 0, $limit);
while ($message = db_fetch_object($result)) {
messaging_store_unpack($message, TRUE);
$message->process = TRUE;
$message
->send();
if ($message->success) {
$sent[] = $message->mqid;
}
else {
$unsent[] = $message->mqid;
}
$count++;
if ($timeout && time() > $timeout) {
break;
}
}
if ($sent) {
messaging_store_sent($sent);
}
if ($unsent) {
messaging_store_sent($unsent, TRUE);
}
return $count;
}
function messaging_store_queue_cleanup() {
if ($expire = variable_get('messaging_log', 0)) {
db_query('DELETE FROM {messaging_store} WHERE log = 1 AND queue = 0 AND sent < %d', time() - $expire);
}
}
function messaging_store_get($params, $order = NULL, $limit = NULL, $pager = NULL, $unpack = TRUE) {
$messages = $where = $args = array();
list($where, $args) = messaging_store_query($params);
$sql = 'SELECT * FROM {messaging_store}';
$sql .= $where ? ' WHERE ' . implode(' AND ', $where) : '';
$sql .= $order ? ' ORDER BY ' . implode(', ', $order) : '';
if (!is_null($pager)) {
$result = pager_query($sql, $limit, $pager, NULL, $args);
}
elseif ($limit) {
$result = db_query_range($sql, $args, 0, $limit);
}
else {
$result = db_query($sql, $args);
}
while ($msg = db_fetch_object($result)) {
if ($unpack) {
$messages[$msg->mqid] = messaging_store_unpack($msg, 'Messaging_Message');
}
else {
$messages[$msg->mqid] = $msg;
}
}
return $messages;
}
function messaging_store_load($mqid) {
if ($message = db_fetch_object(db_query('SELECT * FROM {messaging_store} WHERE mqid = %d', $mqid))) {
return messaging_store_unpack($message, 'Messaging_Message');
}
}
function messaging_store_query($fields) {
$where = $args = array();
foreach ($fields as $key => $value) {
if (is_array($value)) {
$type = $key == 'mqid' ? 'int' : 'varchar';
$where[] = $key . ' IN(' . db_placeholders($value, $type) . ')';
$args = array_merge($args, $value);
}
else {
$where[] = $key . " = '%s'";
$args[] = $value;
}
}
return array(
$where,
$args,
);
}
function messaging_store_unpack_message(&$message, $full = FALSE) {
if ($message->uid && $full) {
$message->account = messaging_load_user($message->uid);
}
if ($message->sender && $full) {
$message->sender_account = messaging_load_user($message->sender);
}
if (empty($message->destinations)) {
if (!empty($message->account) && ($userdest = messaging_user_destination($message->account, $message->method, $message))) {
$message->destinations = array(
$userdest,
);
}
elseif (!empty($message->destination)) {
$message->destinations = array(
$message->destination,
);
}
}
}
function messaging_store_sent($mqid, $error = FALSE) {
$mqid = is_array($mqid) ? $mqid : array(
$mqid,
);
list($where, $args) = messaging_store_query(array(
'mqid' => $mqid,
));
if ($error) {
$sent = 0;
}
else {
db_query("DELETE FROM {messaging_store} WHERE log = 0 AND " . implode(' AND ', $where), $args);
$sent = time();
}
$args = array_merge(array(
$sent,
), $args);
db_query("UPDATE {messaging_store} SET queue = 0, cron = 0, log = 1, sent = %d WHERE " . implode(' AND ', $where), $args);
}
function messaging_store_del($params) {
list($where, $args) = messaging_store_query($params);
db_query("DELETE FROM {messaging_store} WHERE " . implode(' AND ', $where), $args);
}
function messaging_store_save($message) {
messaging_store_save_object($message, 'messaging_store', 'mqid', $message
->data_fields());
}
function messaging_store_delete($mqid) {
db_query("DELETE FROM {messaging_store} WHERE mqid = %d", $mqid);
}
function messaging_store_save_object($object, $table, $key, $fields = NULL) {
if ($fields) {
foreach ($fields as $field) {
if (isset($object->{$field})) {
$object->data[$field] = $object->{$field};
}
}
}
if (empty($object->{$key})) {
$update = array();
$object->created = $object->updated = time();
}
else {
$update = $key;
$object->updated = time();
}
return drupal_write_record($table, $object, $update);
}
function messaging_store_load_object($table, $field, $value, $class = NULL) {
if ($stored = db_fetch_object(db_query("SELECT * FROM {$table} WHERE {$field} = %d", $value))) {
$class = $class ? $class : $stored->class;
return $this
->unpack($stored, $class);
}
}
function messaging_store_unpack($stored, $class) {
drupal_unpack($stored);
if (is_a($stored, $class)) {
return $stored;
}
else {
$object = new $class($stored);
return $object;
}
}