You are here

JobScheduler.php in Job Scheduler 8.2

Same filename and directory in other branches
  1. 8.3 src/JobScheduler.php

File

src/JobScheduler.php
View source
<?php

namespace Drupal\job_scheduler;

use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\Database\Connection;

/**
 * Manage scheduled jobs.
 */
class JobScheduler implements JobSchedulerInterface {

  /**
   * The time service.
   *
   * @var \Drupal\Component\Datetime\TimeInterface
   */
  public $time;

  /**
   * The job scheduler crontab decorator.
   *
   * @var \Drupal\job_scheduler\JobSchedulerCronTabDecoratorInterface
   */
  protected $crontabDecorator;

  /**
   * The database connection.
   *
   * @var \Drupal\Core\Database\Connection
   */
  protected $database;

  /**
   * Constructs a object.
   *
   * @param \Drupal\job_scheduler\JobSchedulerCronTabDecoratorInterface $crontab_decorator
   *   The job scheduler crontab decorator.
   * @param \Drupal\Core\Database\Connection $database
   *   The database connection.
   * @param \Drupal\Component\Datetime\TimeInterface $time
   *   The time service.
   */
  public function __construct(JobSchedulerCronTabDecoratorInterface $crontab_decorator, Connection $database, TimeInterface $time) {
    $this->database = $database;
    $this->time = $time;
    $this->crontabDecorator = $crontab_decorator;
  }

  /**
   * {@inheritdoc}
   */
  public function info($name) {
    if ($info = job_scheduler_info($name)) {
      return $info;
    }
    throw new JobSchedulerException(t('Could not find Job Scheduler cron information for @name.', [
      '@name' => $name,
    ]));
  }

  /**
   * {@inheritdoc}
   */
  public function set(array $job) {
    $timestamp = $this->time
      ->getRequestTime();
    $job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
    $job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
    $job['last'] = $timestamp;
    if (!empty($job['crontab'])) {
      $crontab = $this->crontabDecorator
        ->decorate($job['crontab']);
      $job['next'] = $crontab
        ->nextTime($timestamp);
    }
    else {
      $job['next'] = $timestamp + $job['period'];
    }
    $job['scheduled'] = 0;
    $this
      ->remove($job);
    $this->database
      ->insert('job_schedule')
      ->fields($job)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public function remove(array $job) {
    $this->database
      ->delete('job_schedule')
      ->condition('name', $job['name'])
      ->condition('type', $job['type'])
      ->condition('id', isset($job['id']) ? $job['id'] : 0)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public function removeAll($name, $type) {
    $this->database
      ->delete('job_schedule')
      ->condition('name', $name)
      ->condition('type', $type)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public function dispatch(array $job) {
    $info = $this
      ->info($job['name']);
    if (!$job['periodic']) {
      $this
        ->remove($job);
    }
    if (!empty($info['queue name'])) {
      $queue_name = 'job_scheduler_queue:' . $info['queue name'];
      if (\Drupal::queue($queue_name)
        ->createItem($job)) {
        $this
          ->reserve($job);
      }
    }
    else {
      $this
        ->execute($job);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function execute(array $job) {
    $info = $this
      ->info($job['name']);

    // If the job is periodic, re-schedule it before calling the worker.
    if ($job['periodic']) {
      $this
        ->reschedule($job);
    }
    if (!empty($info['file']) && file_exists($info['file'])) {
      include_once $info['file'];
    }
    if (function_exists($info['worker callback'])) {
      call_user_func($info['worker callback'], $job);
    }
    else {
      $this
        ->remove($job);
      throw new JobSchedulerException(t('Could not find worker callback function: @function', [
        '@function' => $info['worker callback'],
      ]));
    }
  }

  /**
   * {@inheritdoc}
   */
  public function reschedule(array $job) {
    $timestamp = $this->time
      ->getRequestTime();
    $job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
    $job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
    $job['last'] = $timestamp;
    $job['scheduled'] = 0;
    if (!empty($job['crontab'])) {
      $crontab = $this->crontabDecorator
        ->decorate($job['crontab']);
      $job['next'] = $crontab
        ->nextTime($timestamp);
    }
    else {
      $job['next'] = $timestamp + $job['period'];
    }
    if ($job['next']) {
      $this
        ->doUpdate($job, [
        'item_id',
      ]);
    }
    else {

      // If no next time, it may mean it wont run again the next year (crontab).
      $this
        ->remove($job);
    }
  }

  /**
   * {@inheritdoc}
   */
  public function check(array $job) {
    $job += [
      'id' => 0,
      'period' => 0,
      'crontab' => '',
    ];
    $existing = $this->database
      ->select('job_schedule')
      ->fields('job_schedule')
      ->condition('name', $job['name'])
      ->condition('type', $job['type'])
      ->condition('id', $job['id'])
      ->execute()
      ->fetchAssoc();

    // If existing, and changed period or crontab, reschedule the job.
    if ($existing) {
      if ($job['period'] != $existing['period'] || $job['crontab'] != $existing['crontab']) {
        $existing['period'] = $job['period'];
        $existing['crontab'] = $job['crontab'];
        $this
          ->reschedule($existing);
      }
      return TRUE;
    }
    return FALSE;
  }

  /**
   * {@inheritdoc}
   */
  public function perform() {
    $timestamp = $this->time
      ->getRequestTime();

    // Reschedule stuck periodic jobs after one hour.
    $this->database
      ->update('job_schedule')
      ->fields([
      'scheduled' => 0,
    ])
      ->condition('scheduled', $timestamp - 3600, '<')
      ->condition('periodic', 1)
      ->execute();

    // Query and dispatch scheduled jobs.
    // Process a maximum of 200 jobs in a maximum of 30 seconds.
    $start = time();
    $total = 0;
    $failed = 0;
    $jobs = $this->database
      ->select('job_schedule', NULL, [
      'fetch' => \PDO::FETCH_ASSOC,
    ])
      ->fields('job_schedule')
      ->condition('scheduled', 0)
      ->condition('next', $timestamp, '<=')
      ->orderBy('next', 'ASC')
      ->range(0, 200)
      ->execute();
    foreach ($jobs as $job) {
      $job['data'] = unserialize($job['data']);
      try {
        $this
          ->dispatch($job);
      } catch (\Exception $e) {
        watchdog_exception('job_scheduler', $e);
        $failed++;

        // Drop jobs that have caused exceptions.
        $this
          ->remove($job);
      }
      $total++;
      if (time() > $start + 30) {
        break;
      }
    }
    return [
      'start' => $start,
      'total' => $total,
      'failed' => $failed,
    ];
  }

  /**
   * {@inheritdoc}
   */
  public function rebuild($name, array $info = NULL) {
    $info = $info ?: $this
      ->info($name);
    if (!empty($info['jobs'])) {
      foreach ($info['jobs'] as $job) {
        $job['name'] = $name;
        if (!$this
          ->check($job)) {
          $this
            ->set($job);
        }
      }
    }
  }

  /**
   * {@inheritdoc}
   */
  public function rebuildAll() {
    foreach (job_scheduler_info() as $name => $info) {
      $this
        ->rebuild($name, $info);
    }
  }

  /**
   * Reserves a job.
   *
   * @param array $job
   *   The job to reserve.
   *
   * @see \Drupal\job_scheduler\JobScheduler::dispatch()
   *
   * @throws \Drupal\job_scheduler\JobSchedulerException
   *   Thrown if the job parameters are incorrect.
   */
  protected function reserve(array $job) {
    $timestamp = $this->time
      ->getRequestTime();
    $job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0;
    $job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE;
    $job['scheduled'] = $job['period'] + $timestamp;
    $job['last'] = $timestamp;
    $job['next'] = $job['scheduled'];
    $this
      ->doUpdate($job, [
      'name',
      'type',
      'id',
    ]);
  }

  /**
   * Updates a record to the database.
   *
   * @param array $job
   *   The job to update.
   * @param $primary_keys
   *   An array of the primary keys field names.
   *
   * @see \Drupal\job_scheduler\JobScheduler::reschedule()
   * @see \Drupal\job_scheduler\JobScheduler::reserve()
   *
   * @throws \Drupal\job_scheduler\JobSchedulerException
   *   Thrown if the job parameters are incorrect.
   */
  protected function doUpdate(array $job, $primary_keys) {
    $fields = [];
    foreach ($job as $key => $value) {
      if (!in_array($key, $primary_keys)) {
        $fields[$key] = $value;
      }
    }
    $query = $this->database
      ->update('job_schedule')
      ->fields($fields);
    foreach ($primary_keys as $key) {
      if (!isset($job[$key])) {
        throw new JobSchedulerException(t('Could not find job parameter: @parameter', [
          '@parameter' => $key,
        ]));
      }
      $query
        ->condition($key, $job[$key]);
    }
    $query
      ->execute();
  }

}

Classes

Namesort descending Description
JobScheduler Manage scheduled jobs.