class JobScheduler in Job Scheduler 8.3
Same name and namespace in other branches
- 8.2 src/JobScheduler.php \Drupal\job_scheduler\JobScheduler
Manage scheduled jobs.
Hierarchy
- class \Drupal\job_scheduler\JobScheduler implements JobSchedulerInterface
Expanded class hierarchy of JobScheduler
1 string reference to 'JobScheduler'
1 service uses JobScheduler
File
- src/
JobScheduler.php, line 11
Namespace
Drupal\job_schedulerView source
class JobScheduler implements JobSchedulerInterface {
/**
* The job scheduler crontab decorator.
*
* @var \Drupal\job_scheduler\JobSchedulerCronTabDecoratorInterface
*/
protected $crontabDecorator;
/**
* The job schedule storage.
*
* @var \Drupal\Core\Entity\EntityStorageInterface
*/
protected $jobScheduleStorage;
/**
* Constructs a object.
*
* @param \Drupal\job_scheduler\JobSchedulerCronTabDecoratorInterface $crontab_decorator
* The job scheduler crontab decorator.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entityTypeManager
* The entity type manager service.
*/
public function __construct(JobSchedulerCronTabDecoratorInterface $crontab_decorator, EntityTypeManagerInterface $entityTypeManager) {
$this->crontabDecorator = $crontab_decorator;
$this->jobScheduleStorage = $entityTypeManager
->getStorage('job_schedule');
}
/**
* {@inheritdoc}
*/
public function info($name) {
if ($info = job_scheduler_info($name)) {
return $info;
}
throw new JobSchedulerException(t('Could not find Job Scheduler cron information for @name.', [
'@name' => $name,
]));
}
/**
* {@inheritdoc}
*/
public function set(array $job) {
$storage = $this->jobScheduleStorage;
$timestamp = time();
$job['last'] = $timestamp;
if (!empty($job['crontab'])) {
$crontab = $this->crontabDecorator
->decorate($job['crontab']);
$job['next'] = $crontab
->nextTime($timestamp);
}
else {
$job['next'] = $timestamp + $job['period'];
}
$entity = $storage
->create($job);
$entity
->save();
}
/**
* {@inheritdoc}
*/
public function remove(array $job) {
$storage = $this->jobScheduleStorage;
$query = $storage
->getQuery();
$query
->condition('name', $job['name']);
$query
->condition('type', $job['type']);
$query
->condition('id', isset($job['id']) ? $job['id'] : 0);
$entity_ids = $query
->execute();
if (!empty($entity_ids)) {
$entities = $storage
->loadMultiple($entity_ids);
$storage
->delete($entities);
}
}
/**
* {@inheritdoc}
*/
public function removeAll($name, $type) {
$storage = $this->jobScheduleStorage;
$query = $storage
->getQuery();
$query
->condition('name', $name);
$query
->condition('type', $type);
$entity_ids = $query
->execute();
if (!empty($entity_ids)) {
$entities = $storage
->loadMultiple($entity_ids);
$storage
->delete($entities);
}
}
/**
* {@inheritdoc}
*/
public function dispatch(JobSchedule $job) {
$info = $this
->info($job
->getName());
if (!empty($info['queue name'])) {
$queue_name = 'job_scheduler_queue:' . $info['queue name'];
if (\Drupal::queue($queue_name)
->createItem($job
->id())) {
$this
->reserve($job);
}
}
else {
$this
->execute($job);
}
}
/**
* {@inheritdoc}
*/
public function execute(JobSchedule $job) {
$info = $this
->info($job
->getName());
// If the job is periodic, re-schedule it before calling the worker.
if ($job
->getPeriodic()) {
$this
->reschedule($job);
}
else {
$job
->delete();
}
if (!empty($info['file']) && file_exists($info['file'])) {
include_once $info['file'];
}
if (function_exists($info['worker callback'])) {
call_user_func($info['worker callback'], $job);
}
else {
throw new JobSchedulerException(t('Could not find worker callback function: @function', [
'@function' => $info['worker callback'],
]));
}
}
/**
* {@inheritdoc}
*/
public function reschedule(JobSchedule $job) {
$timestamp = time();
$job
->setScheduled(0);
$job
->setLast($timestamp);
$crontab = $job
->getCrontab();
if (!empty($crontab)) {
$crontab = $this->crontabDecorator
->decorate($crontab);
$next = $crontab
->nextTime($timestamp);
}
else {
$next = $timestamp + $job
->getPeriod();
}
if ($next) {
$job
->setNext($next);
$job
->save();
}
else {
// If no next time, it may mean it wont run again the next year (crontab).
$job
->delete();
}
}
/**
* {@inheritdoc}
*/
public function check(array $job) {
$storage = $this->jobScheduleStorage;
$job += [
'id' => 0,
'period' => 0,
'crontab' => '',
];
$query = $storage
->getQuery();
$query
->condition('name', $job['name']);
$query
->condition('type', $job['type']);
$query
->condition('id', $job['id']);
$entity_ids = $query
->execute();
// If existing, and changed period or crontab, reschedule the job.
if ($entity_ids) {
/** @var JobSchedule $existing */
$existing = $storage
->load(reset($entity_ids));
if ($job['period'] != $existing
->getPeriod() || $job['crontab'] != $existing
->getCrontab()) {
$existing
->setPeriod($job['period']);
$existing
->setCrontab($job['crontab']);
$this
->reschedule($existing);
}
return TRUE;
}
return FALSE;
}
/**
* {@inheritdoc}
*/
public function perform($name = NULL, $limit = 200, $time = 30) {
$storage = $this->jobScheduleStorage;
$timestamp = time();
// Reschedule stuck periodic jobs after one hour.
$query = $storage
->getQuery();
$query
->condition('scheduled', $timestamp - 3600, '<');
$query
->condition('periodic', 1);
if (!empty($name)) {
$query
->condition('name', $name);
}
$entity_ids = $query
->execute();
if (!empty($entity_ids)) {
$jobs = $storage
->loadMultiple($entity_ids);
foreach ($jobs as $job) {
$job
->setScheduled(0);
$job
->save();
}
}
// Query and dispatch scheduled jobs.
// Process a maximum of 200 jobs in a maximum of 30 seconds.
$start = time();
$total = 0;
$failed = 0;
$query = $storage
->getQuery();
$query
->condition('scheduled', 0);
$query
->condition('next', $timestamp, '<=');
if (!empty($name)) {
$query
->condition('name', $name);
}
$query
->sort('next', 'ASC');
$query
->range(0, $limit);
$entity_ids = $query
->execute();
if (!empty($entity_ids)) {
$jobs = $storage
->loadMultiple($entity_ids);
foreach ($jobs as $job) {
try {
$this
->dispatch($job);
} catch (\Exception $e) {
watchdog_exception('job_scheduler', $e);
$failed++;
// Drop jobs that have caused exceptions.
$job
->delete();
}
$total++;
if (time() > $start + $time) {
break;
}
}
}
return [
'start' => $start,
'total' => $total,
'failed' => $failed,
];
}
/**
* {@inheritdoc}
*/
public function rebuild($name, array $info = NULL) {
$info = $info ?: $this
->info($name);
if (!empty($info['jobs'])) {
foreach ($info['jobs'] as $job) {
$job['name'] = $name;
if (!$this
->check($job)) {
$this
->set($job);
}
}
}
}
/**
* {@inheritdoc}
*/
public function rebuildAll() {
foreach (job_scheduler_info() as $name => $info) {
$this
->rebuild($name, $info);
}
}
/**
* Reserves a job.
*
* @param JobSchedule $job
* The job to reserve.
*
* @see \Drupal\job_scheduler\JobScheduler::dispatch()
*
* @throws \Drupal\Core\Entity\EntityStorageException
* In case of failures at the configuration storage level.
*/
protected function reserve(JobSchedule $job) {
$timestamp = time();
$scheduled = $job
->getPeriod() + $timestamp;
$job
->setScheduled($scheduled);
$job
->setLast($timestamp);
$job
->setNext($scheduled);
$job
->save();
}
}
Members
Name | Modifiers | Type | Description | Overrides |
---|---|---|---|---|
JobScheduler:: |
protected | property | The job scheduler crontab decorator. | |
JobScheduler:: |
protected | property | The job schedule storage. | |
JobScheduler:: |
public | function |
Checks whether a job exists in the queue and update its parameters if so. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Dispatches a job. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Executes a job. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Returns scheduler info. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Perform periodic jobs. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Rebuilds a single scheduler. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Rebuilds all schedulers. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Removes a job from the schedule, replace any existing job. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Removes all jobs for a given type. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Re-schedules a job if intended to run again. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
protected | function | Reserves a job. | |
JobScheduler:: |
public | function |
Adds a job to the schedule, replace any existing job. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function | Constructs a object. |