class JobScheduler in Job Scheduler 8.2
Same name and namespace in other branches
- 8.3 src/JobScheduler.php \Drupal\job_scheduler\JobScheduler
Manage scheduled jobs.
Hierarchy
- class \Drupal\job_scheduler\JobScheduler implements JobSchedulerInterface
Expanded class hierarchy of JobScheduler
1 string reference to 'JobScheduler'
1 service uses JobScheduler
File
- src/
JobScheduler.php, line 11
Namespace
Drupal\job_schedulerView source
class JobScheduler implements JobSchedulerInterface {
/**
* The time service.
*
* @var \Drupal\Component\Datetime\TimeInterface
*/
public $time;
/**
* The job scheduler crontab decorator.
*
* @var \Drupal\job_scheduler\JobSchedulerCronTabDecoratorInterface
*/
protected $crontabDecorator;
/**
* The database connection.
*
* @var \Drupal\Core\Database\Connection
*/
protected $database;
/**
* Constructs a object.
*
* @param \Drupal\job_scheduler\JobSchedulerCronTabDecoratorInterface $crontab_decorator
* The job scheduler crontab decorator.
* @param \Drupal\Core\Database\Connection $database
* The database connection.
* @param \Drupal\Component\Datetime\TimeInterface $time
* The time service.
*/
public function __construct(JobSchedulerCronTabDecoratorInterface $crontab_decorator, Connection $database, TimeInterface $time) {
$this->database = $database;
$this->time = $time;
$this->crontabDecorator = $crontab_decorator;
}
/**
* {@inheritdoc}
*/
public function info($name) {
if ($info = job_scheduler_info($name)) {
return $info;
}
throw new JobSchedulerException(t('Could not find Job Scheduler cron information for @name.', [
'@name' => $name,
]));
}
/**
* {@inheritdoc}
*/
public function set(array $job) {
$timestamp = $this->time
->getRequestTime();
$job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
$job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
$job['last'] = $timestamp;
if (!empty($job['crontab'])) {
$crontab = $this->crontabDecorator
->decorate($job['crontab']);
$job['next'] = $crontab
->nextTime($timestamp);
}
else {
$job['next'] = $timestamp + $job['period'];
}
$job['scheduled'] = 0;
$this
->remove($job);
$this->database
->insert('job_schedule')
->fields($job)
->execute();
}
/**
* {@inheritdoc}
*/
public function remove(array $job) {
$this->database
->delete('job_schedule')
->condition('name', $job['name'])
->condition('type', $job['type'])
->condition('id', isset($job['id']) ? $job['id'] : 0)
->execute();
}
/**
* {@inheritdoc}
*/
public function removeAll($name, $type) {
$this->database
->delete('job_schedule')
->condition('name', $name)
->condition('type', $type)
->execute();
}
/**
* {@inheritdoc}
*/
public function dispatch(array $job) {
$info = $this
->info($job['name']);
if (!$job['periodic']) {
$this
->remove($job);
}
if (!empty($info['queue name'])) {
$queue_name = 'job_scheduler_queue:' . $info['queue name'];
if (\Drupal::queue($queue_name)
->createItem($job)) {
$this
->reserve($job);
}
}
else {
$this
->execute($job);
}
}
/**
* {@inheritdoc}
*/
public function execute(array $job) {
$info = $this
->info($job['name']);
// If the job is periodic, re-schedule it before calling the worker.
if ($job['periodic']) {
$this
->reschedule($job);
}
if (!empty($info['file']) && file_exists($info['file'])) {
include_once $info['file'];
}
if (function_exists($info['worker callback'])) {
call_user_func($info['worker callback'], $job);
}
else {
$this
->remove($job);
throw new JobSchedulerException(t('Could not find worker callback function: @function', [
'@function' => $info['worker callback'],
]));
}
}
/**
* {@inheritdoc}
*/
public function reschedule(array $job) {
$timestamp = $this->time
->getRequestTime();
$job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
$job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
$job['last'] = $timestamp;
$job['scheduled'] = 0;
if (!empty($job['crontab'])) {
$crontab = $this->crontabDecorator
->decorate($job['crontab']);
$job['next'] = $crontab
->nextTime($timestamp);
}
else {
$job['next'] = $timestamp + $job['period'];
}
if ($job['next']) {
$this
->doUpdate($job, [
'item_id',
]);
}
else {
// If no next time, it may mean it wont run again the next year (crontab).
$this
->remove($job);
}
}
/**
* {@inheritdoc}
*/
public function check(array $job) {
$job += [
'id' => 0,
'period' => 0,
'crontab' => '',
];
$existing = $this->database
->select('job_schedule')
->fields('job_schedule')
->condition('name', $job['name'])
->condition('type', $job['type'])
->condition('id', $job['id'])
->execute()
->fetchAssoc();
// If existing, and changed period or crontab, reschedule the job.
if ($existing) {
if ($job['period'] != $existing['period'] || $job['crontab'] != $existing['crontab']) {
$existing['period'] = $job['period'];
$existing['crontab'] = $job['crontab'];
$this
->reschedule($existing);
}
return TRUE;
}
return FALSE;
}
/**
* {@inheritdoc}
*/
public function perform() {
$timestamp = $this->time
->getRequestTime();
// Reschedule stuck periodic jobs after one hour.
$this->database
->update('job_schedule')
->fields([
'scheduled' => 0,
])
->condition('scheduled', $timestamp - 3600, '<')
->condition('periodic', 1)
->execute();
// Query and dispatch scheduled jobs.
// Process a maximum of 200 jobs in a maximum of 30 seconds.
$start = time();
$total = 0;
$failed = 0;
$jobs = $this->database
->select('job_schedule', NULL, [
'fetch' => \PDO::FETCH_ASSOC,
])
->fields('job_schedule')
->condition('scheduled', 0)
->condition('next', $timestamp, '<=')
->orderBy('next', 'ASC')
->range(0, 200)
->execute();
foreach ($jobs as $job) {
$job['data'] = unserialize($job['data']);
try {
$this
->dispatch($job);
} catch (\Exception $e) {
watchdog_exception('job_scheduler', $e);
$failed++;
// Drop jobs that have caused exceptions.
$this
->remove($job);
}
$total++;
if (time() > $start + 30) {
break;
}
}
return [
'start' => $start,
'total' => $total,
'failed' => $failed,
];
}
/**
* {@inheritdoc}
*/
public function rebuild($name, array $info = NULL) {
$info = $info ?: $this
->info($name);
if (!empty($info['jobs'])) {
foreach ($info['jobs'] as $job) {
$job['name'] = $name;
if (!$this
->check($job)) {
$this
->set($job);
}
}
}
}
/**
* {@inheritdoc}
*/
public function rebuildAll() {
foreach (job_scheduler_info() as $name => $info) {
$this
->rebuild($name, $info);
}
}
/**
* Reserves a job.
*
* @param array $job
* The job to reserve.
*
* @see \Drupal\job_scheduler\JobScheduler::dispatch()
*
* @throws \Drupal\job_scheduler\JobSchedulerException
* Thrown if the job parameters are incorrect.
*/
protected function reserve(array $job) {
$timestamp = $this->time
->getRequestTime();
$job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
$job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
$job['scheduled'] = $job['period'] + $timestamp;
$job['last'] = $timestamp;
$job['next'] = $job['scheduled'];
$this
->doUpdate($job, [
'name',
'type',
'id',
]);
}
/**
* Updates a record to the database.
*
* @param array $job
* The job to update.
* @param $primary_keys
* An array of the primary keys field names.
*
* @see \Drupal\job_scheduler\JobScheduler::reschedule()
* @see \Drupal\job_scheduler\JobScheduler::reserve()
*
* @throws \Drupal\job_scheduler\JobSchedulerException
* Thrown if the job parameters are incorrect.
*/
protected function doUpdate(array $job, $primary_keys) {
$fields = [];
foreach ($job as $key => $value) {
if (!in_array($key, $primary_keys)) {
$fields[$key] = $value;
}
}
$query = $this->database
->update('job_schedule')
->fields($fields);
foreach ($primary_keys as $key) {
if (!isset($job[$key])) {
throw new JobSchedulerException(t('Could not find job parameter: @parameter', [
'@parameter' => $key,
]));
}
$query
->condition($key, $job[$key]);
}
$query
->execute();
}
}
Members
Name![]() |
Modifiers | Type | Description | Overrides |
---|---|---|---|---|
JobScheduler:: |
protected | property | The job scheduler crontab decorator. | |
JobScheduler:: |
protected | property | The database connection. | |
JobScheduler:: |
public | property | The time service. | |
JobScheduler:: |
public | function |
Checks whether a job exists in the queue and update its parameters if so. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Dispatches a job. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
protected | function | Updates a record to the database. | |
JobScheduler:: |
public | function |
Executes a job. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Returns scheduler info. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Perform periodic jobs. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Rebuilds a single scheduler. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Rebuilds all schedulers. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Removes a job from the schedule, replace any existing job. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Removes all jobs for a given type. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function |
Re-schedules a job if intended to run again. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
protected | function | Reserves a job. | |
JobScheduler:: |
public | function |
Adds a job to the schedule, replace any existing job. Overrides JobSchedulerInterface:: |
|
JobScheduler:: |
public | function | Constructs a object. |