View source
<?php
namespace Drupal\job_scheduler;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Database\Connection;
class JobScheduler implements JobSchedulerInterface {
public $time;
protected $crontabDecorator;
protected $database;
public function __construct(JobSchedulerCronTabDecoratorInterface $crontab_decorator, Connection $database, TimeInterface $time) {
$this->database = $database;
$this->time = $time;
$this->crontabDecorator = $crontab_decorator;
}
public function info($name) {
if ($info = job_scheduler_info($name)) {
return $info;
}
throw new JobSchedulerException(t('Could not find Job Scheduler cron information for @name.', [
'@name' => $name,
]));
}
public function set(array $job) {
$timestamp = $this->time
->getRequestTime();
$job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
$job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
$job['last'] = $timestamp;
if (!empty($job['crontab'])) {
$crontab = $this->crontabDecorator
->decorate($job['crontab']);
$job['next'] = $crontab
->nextTime($timestamp);
}
else {
$job['next'] = $timestamp + $job['period'];
}
$job['scheduled'] = 0;
$this
->remove($job);
$this->database
->insert('job_schedule')
->fields($job)
->execute();
}
public function remove(array $job) {
$this->database
->delete('job_schedule')
->condition('name', $job['name'])
->condition('type', $job['type'])
->condition('id', isset($job['id']) ? $job['id'] : 0)
->execute();
}
public function removeAll($name, $type) {
$this->database
->delete('job_schedule')
->condition('name', $name)
->condition('type', $type)
->execute();
}
public function dispatch(array $job) {
$info = $this
->info($job['name']);
if (!$job['periodic']) {
$this
->remove($job);
}
if (!empty($info['queue name'])) {
$queue_name = 'job_scheduler_queue:' . $info['queue name'];
if (\Drupal::queue($queue_name)
->createItem($job)) {
$this
->reserve($job);
}
}
else {
$this
->execute($job);
}
}
public function execute(array $job) {
$info = $this
->info($job['name']);
if ($job['periodic']) {
$this
->reschedule($job);
}
if (!empty($info['file']) && file_exists($info['file'])) {
include_once $info['file'];
}
if (function_exists($info['worker callback'])) {
call_user_func($info['worker callback'], $job);
}
else {
$this
->remove($job);
throw new JobSchedulerException(t('Could not find worker callback function: @function', [
'@function' => $info['worker callback'],
]));
}
}
public function reschedule(array $job) {
$timestamp = $this->time
->getRequestTime();
$job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
$job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
$job['last'] = $timestamp;
$job['scheduled'] = 0;
if (!empty($job['crontab'])) {
$crontab = $this->crontabDecorator
->decorate($job['crontab']);
$job['next'] = $crontab
->nextTime($timestamp);
}
else {
$job['next'] = $timestamp + $job['period'];
}
if ($job['next']) {
$this
->doUpdate($job, [
'item_id',
]);
}
else {
$this
->remove($job);
}
}
public function check(array $job) {
$job += [
'id' => 0,
'period' => 0,
'crontab' => '',
];
$existing = $this->database
->select('job_schedule')
->fields('job_schedule')
->condition('name', $job['name'])
->condition('type', $job['type'])
->condition('id', $job['id'])
->execute()
->fetchAssoc();
if ($existing) {
if ($job['period'] != $existing['period'] || $job['crontab'] != $existing['crontab']) {
$existing['period'] = $job['period'];
$existing['crontab'] = $job['crontab'];
$this
->reschedule($existing);
}
return TRUE;
}
return FALSE;
}
public function perform() {
$timestamp = $this->time
->getRequestTime();
$this->database
->update('job_schedule')
->fields([
'scheduled' => 0,
])
->condition('scheduled', $timestamp - 3600, '<')
->condition('periodic', 1)
->execute();
$start = time();
$total = 0;
$failed = 0;
$jobs = $this->database
->select('job_schedule', NULL, [
'fetch' => \PDO::FETCH_ASSOC,
])
->fields('job_schedule')
->condition('scheduled', 0)
->condition('next', $timestamp, '<=')
->orderBy('next', 'ASC')
->range(0, 200)
->execute();
foreach ($jobs as $job) {
$job['data'] = unserialize($job['data']);
try {
$this
->dispatch($job);
} catch (\Exception $e) {
watchdog_exception('job_scheduler', $e);
$failed++;
$this
->remove($job);
}
$total++;
if (time() > $start + 30) {
break;
}
}
return [
'start' => $start,
'total' => $total,
'failed' => $failed,
];
}
public function rebuild($name, array $info = NULL) {
$info = $info ?: $this
->info($name);
if (!empty($info['jobs'])) {
foreach ($info['jobs'] as $job) {
$job['name'] = $name;
if (!$this
->check($job)) {
$this
->set($job);
}
}
}
}
public function rebuildAll() {
foreach (job_scheduler_info() as $name => $info) {
$this
->rebuild($name, $info);
}
}
protected function reserve(array $job) {
$timestamp = $this->time
->getRequestTime();
$job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
$job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
$job['scheduled'] = $job['period'] + $timestamp;
$job['last'] = $timestamp;
$job['next'] = $job['scheduled'];
$this
->doUpdate($job, [
'name',
'type',
'id',
]);
}
protected function doUpdate(array $job, $primary_keys) {
$fields = [];
foreach ($job as $key => $value) {
if (!in_array($key, $primary_keys)) {
$fields[$key] = $value;
}
}
$query = $this->database
->update('job_schedule')
->fields($fields);
foreach ($primary_keys as $key) {
if (!isset($job[$key])) {
throw new JobSchedulerException(t('Could not find job parameter: @parameter', [
'@parameter' => $key,
]));
}
$query
->condition($key, $job[$key]);
}
$query
->execute();
}
}