View source
<?php
namespace Drupal\Tests\system\Kernel\System;
use Drupal\Core\Database\Database;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\Memory;
use Drupal\KernelTests\KernelTestBase;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException;
use Prophecy\Argument;
class CronQueueTest extends KernelTestBase {
protected static $modules = [
'system',
'cron_queue_test',
];
protected $connection;
protected $cron;
protected $currentTime = 1000;
protected function setUp() : void {
parent::setUp();
$this->connection = Database::getConnection();
$this->cron = \Drupal::service('cron');
$time = $this
->prophesize('Drupal\\Component\\Datetime\\TimeInterface');
$time
->getCurrentTime()
->willReturn($this->currentTime);
$time
->getRequestTime()
->willReturn($this->currentTime);
\Drupal::getContainer()
->set('datetime.time', $time
->reveal());
$this
->assertEquals($this->currentTime, \Drupal::time()
->getCurrentTime());
$this
->assertEquals($this->currentTime, \Drupal::time()
->getRequestTime());
$realQueueFactory = $this->container
->get('queue');
$queue_factory = $this
->prophesize(get_class($realQueueFactory));
$database = new DatabaseQueue('cron_queue_test_database_delay_exception', $this->connection);
$memory = new Memory('cron_queue_test_memory_delay_exception');
$queue_factory
->get('cron_queue_test_database_delay_exception', Argument::cetera())
->willReturn($database);
$queue_factory
->get('cron_queue_test_memory_delay_exception', Argument::cetera())
->willReturn($memory);
$queue_factory
->get(Argument::any(), Argument::cetera())
->will(function ($args) use ($realQueueFactory) {
return $realQueueFactory
->get($args[0], $args[1] ?? FALSE);
});
$this->container
->set('queue', $queue_factory
->reveal());
}
public function testDelayException() {
$database = $this->container
->get('queue')
->get('cron_queue_test_database_delay_exception');
$memory = $this->container
->get('queue')
->get('cron_queue_test_memory_delay_exception');
$this
->assertInstanceOf('Drupal\\Core\\Queue\\DelayableQueueInterface', $database);
$this
->assertNotInstanceOf('Drupal\\Core\\Queue\\DelayableQueueInterface', $memory);
$manager = $this->container
->get('plugin.manager.queue_worker');
$definitions = $manager
->getDefinitions();
$this
->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['time']);
$this
->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['time']);
$database
->createItem('test');
$memory
->createItem('test');
$this->cron
->run();
$query = $this->connection
->select('queue');
$query
->condition('name', 'cron_queue_test_database_delay_exception');
$query
->addField('queue', 'expire');
$query
->range(0, 1);
$expire = $query
->execute()
->fetchField();
$this
->assertGreaterThan($database_lease_time, CronQueueTestDatabaseDelayException::DELAY_INTERVAL);
$this
->assertGreaterThan($this->currentTime + $database_lease_time, $expire);
$this
->assertEquals(CronQueueTestDatabaseDelayException::DELAY_INTERVAL, $expire - $this->currentTime);
$property = (new \ReflectionClass($memory))
->getProperty('queue');
$property
->setAccessible(TRUE);
$memory_queue_internal = $property
->getValue($memory);
$this
->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire);
}
public function testExceptions() {
$queue = $this->container
->get('queue')
->get('cron_queue_test_exception');
$queue
->createItem([
$this
->randomMachineName() => $this
->randomMachineName(),
]);
$this->cron
->run();
$this
->assertEquals(1, \Drupal::state()
->get('cron_queue_test_exception'));
$this
->assertEquals(1, $queue
->numberOfItems(), 'Failing item still in the queue after throwing an exception.');
$this->connection
->update('queue')
->condition('name', 'cron_queue_test_exception')
->fields([
'expire' => REQUEST_TIME - 1,
])
->execute();
$this->cron
->run();
$this
->assertEquals(2, \Drupal::state()
->get('cron_queue_test_exception'));
$this
->assertEquals(0, $queue
->numberOfItems(), 'Item was processed and removed from the queue.');
$queue = $this->container
->get('queue')
->get('cron_queue_test_broken_queue');
$queue
->createItem('process');
$queue
->createItem('crash');
$queue
->createItem('ignored');
$this->cron
->run();
$this
->assertEquals(2, $queue
->numberOfItems(), 'Failing queue stopped processing at the failing item.');
$item = $queue
->claimItem();
$this
->assertEquals('crash', $item->data, 'Failing item remains in the queue.');
$item = $queue
->claimItem();
$this
->assertEquals('ignored', $item->data, 'Item beyond the failing item remains in the queue.');
$queue = $this->container
->get('queue')
->get('cron_queue_test_requeue_exception');
$queue
->createItem([]);
$this->cron
->run();
$this
->assertEquals(2, \Drupal::state()
->get('cron_queue_test_requeue_exception'));
$this
->assertEquals(0, $queue
->numberOfItems());
}
public function testDatabaseQueueReturnTypes() : void {
$queue = $this->container
->get('queue')
->get('cron_queue_test_database_delay_exception');
static::assertInstanceOf(DatabaseQueue::class, $queue);
$queue
->createItem(12);
$item = $queue
->claimItem();
static::assertTrue($queue
->delayItem($item, 1));
static::assertTrue($queue
->releaseItem($item));
$queue
->deleteItem($item);
static::assertFalse($queue
->delayItem($item, 1));
static::assertFalse($queue
->releaseItem($item));
}
}