php-resque/test/Resque/Tests/WorkerTest.php

297 lines
8.7 KiB
PHP
Raw Normal View History

2010-04-18 13:58:43 +00:00
<?php
2010-04-18 13:58:43 +00:00
/**
* Resque_Worker tests.
*
2018-05-25 09:03:48 +00:00
* @package Resque/Tests
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
2010-04-18 13:58:43 +00:00
*/
2010-04-18 13:58:43 +00:00
class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
{
2018-05-25 09:03:48 +00:00
public function testWorkerRegistersInList()
{
$worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
// Make sure the worker is in the list
$this->assertTrue((bool)$this->redis->sismember('resque:workers', (string)$worker));
}
public function testGetAllWorkers()
{
$num = 3;
// Register a few workers
for ($i = 0; $i < $num; ++$i) {
$worker = new Resque_Worker('queue_' . $i);
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
}
// Now try to get them
$this->assertEquals($num, count(Resque_Worker::all()));
}
public function testGetWorkerById()
{
$worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
$newWorker = Resque_Worker::find((string)$worker);
$this->assertEquals((string)$worker, (string)$newWorker);
}
public function testInvalidWorkerDoesNotExist()
{
$this->assertFalse(Resque_Worker::exists('blah'));
}
public function testWorkerCanUnregister()
{
$worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
$worker->unregisterWorker();
$this->assertFalse(Resque_Worker::exists((string)$worker));
$this->assertEquals([], Resque_Worker::all());
$this->assertEquals([], $this->redis->smembers('resque:workers'));
2018-05-25 09:03:48 +00:00
}
public function testPausedWorkerDoesNotPickUpJobs()
{
$worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log());
$worker->pauseProcessing();
Resque::enqueue('jobs', 'Test_Job');
$worker->work(0);
$worker->work(0);
$this->assertEquals(0, Resque_Stat::get('processed'));
}
public function testResumedWorkerPicksUpJobs()
{
$worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log());
$worker->pauseProcessing();
Resque::enqueue('jobs', 'Test_Job');
$worker->work(0);
$this->assertEquals(0, Resque_Stat::get('processed'));
$worker->unPauseProcessing();
$worker->work(0);
$this->assertEquals(1, Resque_Stat::get('processed'));
}
public function testWorkerCanWorkOverMultipleQueues()
{
$worker = new Resque_Worker([
2018-05-25 09:03:48 +00:00
'queue1',
'queue2'
]);
2018-05-25 09:03:48 +00:00
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
Resque::enqueue('queue1', 'Test_Job_1');
Resque::enqueue('queue2', 'Test_Job_2');
$job = $worker->reserve();
$this->assertEquals('queue1', $job->queue);
$job = $worker->reserve();
$this->assertEquals('queue2', $job->queue);
}
public function testWorkerWorksQueuesInSpecifiedOrder()
{
$worker = new Resque_Worker([
2018-05-25 09:03:48 +00:00
'high',
'medium',
'low'
]);
2018-05-25 09:03:48 +00:00
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
// Queue the jobs in a different order
Resque::enqueue('low', 'Test_Job_1');
Resque::enqueue('high', 'Test_Job_2');
Resque::enqueue('medium', 'Test_Job_3');
// Now check we get the jobs back in the right order
$job = $worker->reserve();
$this->assertEquals('high', $job->queue);
$job = $worker->reserve();
$this->assertEquals('medium', $job->queue);
$job = $worker->reserve();
$this->assertEquals('low', $job->queue);
}
public function testWildcardQueueWorkerWorksAllQueues()
{
$worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
Resque::enqueue('queue1', 'Test_Job_1');
Resque::enqueue('queue2', 'Test_Job_2');
$job = $worker->reserve();
$this->assertEquals('queue1', $job->queue);
$job = $worker->reserve();
$this->assertEquals('queue2', $job->queue);
}
public function testWorkerDoesNotWorkOnUnknownQueues()
{
$worker = new Resque_Worker('queue1');
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
Resque::enqueue('queue2', 'Test_Job');
$this->assertFalse($worker->reserve());
}
public function testWorkerClearsItsStatusWhenNotWorking()
{
Resque::enqueue('jobs', 'Test_Job');
$worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log());
$job = $worker->reserve();
$worker->workingOn($job);
$worker->doneWorking();
$this->assertEquals([], $worker->job());
2018-05-25 09:03:48 +00:00
}
public function testWorkerRecordsWhatItIsWorkingOn()
{
$worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
$payload = [
2018-05-25 09:03:48 +00:00
'class' => 'Test_Job'
];
2018-05-25 09:03:48 +00:00
$job = new Resque_Job('jobs', $payload);
$worker->workingOn($job);
$job = $worker->job();
$this->assertEquals('jobs', $job['queue']);
if (!isset($job['run_at'])) {
$this->fail('Job does not have run_at time');
}
$this->assertEquals($payload, $job['payload']);
}
public function testWorkerErasesItsStatsWhenShutdown()
{
Resque::enqueue('jobs', 'Test_Job');
Resque::enqueue('jobs', 'Invalid_Job');
$worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log());
$worker->work(0);
$worker->work(0);
$this->assertEquals(0, $worker->getStat('processed'));
$this->assertEquals(0, $worker->getStat('failed'));
}
public function testWorkerCleansUpDeadWorkersOnStartup()
{
// Register a good worker
$goodWorker = new Resque_Worker('jobs');
$goodWorker->setLogger(new Resque_Log());
$goodWorker->registerWorker();
$workerId = explode(':', $goodWorker);
// Register some bad workers
$worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log());
$worker->setId($workerId[0] . ':1:jobs');
$worker->registerWorker();
$worker = new Resque_Worker(['high', 'low']);
2018-05-25 09:03:48 +00:00
$worker->setLogger(new Resque_Log());
$worker->setId($workerId[0] . ':2:high,low');
$worker->registerWorker();
$this->assertEquals(3, count(Resque_Worker::all()));
$goodWorker->pruneDeadWorkers();
// There should only be $goodWorker left now
$this->assertEquals(1, count(Resque_Worker::all()));
}
public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers()
{
// Register a bad worker on this machine
$worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log());
$workerId = explode(':', $worker);
$worker->setId($workerId[0] . ':1:jobs');
$worker->registerWorker();
// Register some other false workers
$worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log());
$worker->setId('my.other.host:1:jobs');
$worker->registerWorker();
$this->assertEquals(2, count(Resque_Worker::all()));
$worker->pruneDeadWorkers();
// my.other.host should be left
$workers = Resque_Worker::all();
$this->assertEquals(1, count($workers));
$this->assertEquals((string)$worker, (string)$workers[0]);
}
public function testWorkerFailsUncompletedJobsOnExit()
{
$worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log());
$worker->registerWorker();
$payload = [
2018-05-25 09:03:48 +00:00
'class' => 'Test_Job'
];
2018-05-25 09:03:48 +00:00
$job = new Resque_Job('jobs', $payload);
$worker->workingOn($job);
$worker->unregisterWorker();
$this->assertEquals(1, Resque_Stat::get('failed'));
}
2013-03-12 08:55:03 +00:00
public function testBlockingListPop()
{
2018-05-25 09:00:53 +00:00
$this->markTestSkipped("Skip temporarily");
return;
2013-03-12 08:55:03 +00:00
$worker = new Resque_Worker('jobs');
2018-05-25 09:03:48 +00:00
$worker->setLogger(new Resque_Log());
2013-03-12 08:55:03 +00:00
$worker->registerWorker();
Resque::enqueue('jobs', 'Test_Job_1');
Resque::enqueue('jobs', 'Test_Job_2');
$i = 1;
2018-05-25 09:03:48 +00:00
while ($job = $worker->reserve(true, 2)) {
2013-03-12 08:55:03 +00:00
$this->assertEquals('Test_Job_' . $i, $job->payload['class']);
2018-05-25 09:03:48 +00:00
if ($i == 2) {
2013-03-12 08:55:03 +00:00
break;
}
$i++;
}
2018-05-25 09:00:53 +00:00
$this->assertEquals(2, $i);
2013-03-12 08:55:03 +00:00
}
2010-04-18 13:58:43 +00:00
}