Global reformat

This commit is contained in:
Daniel Mason 2018-05-25 21:03:48 +12:00
parent 14c0e26559
commit 51fda513f4
11 changed files with 532 additions and 533 deletions

View File

@ -1,6 +1,6 @@
<?php <?php
if(empty($argv[1])) { if (empty($argv[1])) {
die('Specify the ID of a job to monitor the status of.'); die('Specify the ID of a job to monitor the status of.');
} }
require __DIR__ . '/init.php'; require __DIR__ . '/init.php';
@ -12,12 +12,12 @@ Resque::setBackend('127.0.0.1:6379');
//Resque::setBackend('redis://user:pass@a.host.name:3432/2'); //Resque::setBackend('redis://user:pass@a.host.name:3432/2');
$status = new Resque_Job_Status($argv[1]); $status = new Resque_Job_Status($argv[1]);
if(!$status->isTracking()) { if (!$status->isTracking()) {
die("Resque is not tracking the status of this job.\n"); die("Resque is not tracking the status of this job.\n");
} }
echo "Tracking status of ".$argv[1].". Press [break] to stop.\n\n"; echo "Tracking status of " . $argv[1] . ". Press [break] to stop.\n\n";
while(true) { while (true) {
fwrite(STDOUT, "Status of ".$argv[1]." is: ".$status->get()."\n"); fwrite(STDOUT, "Status of " . $argv[1] . " is: " . $status->get() . "\n");
sleep(1); sleep(1);
} }

View File

@ -3,23 +3,23 @@
// NOTE: You should NOT use this when developing against php-resque. // NOTE: You should NOT use this when developing against php-resque.
// The autoload code below is specifically for this demo. // The autoload code below is specifically for this demo.
$files = array( $files = array(
__DIR__ . '/../../vendor/autoload.php', __DIR__ . '/../../vendor/autoload.php',
__DIR__ . '/../../../../autoload.php', __DIR__ . '/../../../../autoload.php',
__DIR__ . '/../vendor/autoload.php', __DIR__ . '/../vendor/autoload.php',
); );
$found = false; $found = false;
foreach ($files as $file) { foreach ($files as $file) {
if (file_exists($file)) { if (file_exists($file)) {
require_once $file; require_once $file;
break; break;
} }
} }
if (!class_exists('Composer\Autoload\ClassLoader', false)) { if (!class_exists('Composer\Autoload\ClassLoader', false)) {
die( die(
'You need to set up the project dependencies using the following commands:' . PHP_EOL . 'You need to set up the project dependencies using the following commands:' . PHP_EOL .
'curl -s http://getcomposer.org/installer | php' . PHP_EOL . 'curl -s http://getcomposer.org/installer | php' . PHP_EOL .
'php composer.phar install' . PHP_EOL 'php composer.phar install' . PHP_EOL
); );
} }

View File

@ -1,10 +1,11 @@
<?php <?php
class PHP_Job class PHP_Job
{ {
public function perform() public function perform()
{ {
fwrite(STDOUT, 'Start job! -> '); fwrite(STDOUT, 'Start job! -> ');
sleep(1); sleep(1);
fwrite(STDOUT, 'Job ended!' . PHP_EOL); fwrite(STDOUT, 'Job ended!' . PHP_EOL);
} }
} }

View File

@ -1,8 +1,9 @@
<?php <?php
class Long_PHP_Job class Long_PHP_Job
{ {
public function perform() public function perform()
{ {
sleep(600); sleep(600);
} }
} }

View File

@ -1,8 +1,9 @@
<?php <?php
class PHP_Error_Job class PHP_Error_Job
{ {
public function perform() public function perform()
{ {
callToUndefinedFunction(); callToUndefinedFunction();
} }
} }

View File

@ -1,6 +1,6 @@
<?php <?php
if(empty($argv[1])) { if (empty($argv[1])) {
die('Specify the name of a job to add. e.g, php queue.php PHP_Job'); die('Specify the name of a job to add. e.g, php queue.php PHP_Job');
} }
require __DIR__ . '/init.php'; require __DIR__ . '/init.php';
@ -12,15 +12,15 @@ Resque::setBackend('127.0.0.1:6379');
//Resque::setBackend('redis://user:pass@a.host.name:3432/2'); //Resque::setBackend('redis://user:pass@a.host.name:3432/2');
$args = array( $args = array(
'time' => time(), 'time' => time(),
'array' => array( 'array' => array(
'test' => 'test', 'test' => 'test',
), ),
); );
if (empty($argv[2])) { if (empty($argv[2])) {
$jobId = Resque::enqueue('default', $argv[1], $args, true); $jobId = Resque::enqueue('default', $argv[1], $args, true);
} else { } else {
$jobId = Resque::enqueue($argv[1], $argv[2], $args, true); $jobId = Resque::enqueue($argv[1], $argv[2], $args, true);
} }
echo "Queued job ".$jobId."\n\n"; echo "Queued job " . $jobId . "\n\n";

View File

@ -8,42 +8,42 @@ Resque_Event::listen('beforePerform', array('My_Resque_Plugin', 'beforePerform')
Resque_Event::listen('afterPerform', array('My_Resque_Plugin', 'afterPerform')); Resque_Event::listen('afterPerform', array('My_Resque_Plugin', 'afterPerform'));
Resque_Event::listen('onFailure', array('My_Resque_Plugin', 'onFailure')); Resque_Event::listen('onFailure', array('My_Resque_Plugin', 'onFailure'));
class My_Resque_Plugin class Sample_Resque_Plugin
{ {
public static function afterEnqueue($class, $arguments) public static function afterEnqueue($class, $arguments)
{ {
echo "Job was queued for " . $class . ". Arguments:"; echo "Job was queued for " . $class . ". Arguments:";
print_r($arguments); print_r($arguments);
} }
public static function beforeFirstFork($worker) public static function beforeFirstFork($worker)
{ {
echo "Worker started. Listening on queues: " . implode(', ', $worker->queues(false)) . "\n"; echo "Worker started. Listening on queues: " . implode(', ', $worker->queues(false)) . "\n";
} }
public static function beforeFork($job) public static function beforeFork($job)
{ {
echo "Just about to fork to run " . $job; echo "Just about to fork to run " . $job;
} }
public static function afterFork($job) public static function afterFork($job)
{ {
echo "Forked to run " . $job . ". This is the child process.\n"; echo "Forked to run " . $job . ". This is the child process.\n";
} }
public static function beforePerform($job) public static function beforePerform($job)
{ {
echo "Cancelling " . $job . "\n"; echo "Cancelling " . $job . "\n";
// throw new Resque_Job_DontPerform; // throw new Resque_Job_DontPerform;
} }
public static function afterPerform($job) public static function afterPerform($job)
{ {
echo "Just performed " . $job . "\n"; echo "Just performed " . $job . "\n";
} }
public static function onFailure($exception, $job) public static function onFailure($exception, $job)
{ {
echo $job . " threw an exception:\n" . $exception; echo $job . " threw an exception:\n" . $exception;
} }
} }

View File

@ -3,279 +3,278 @@
/** /**
* Resque job. * Resque job.
* *
* @package Resque/Job * @package Resque/Job
* @author Chris Boulton <chris@bigcommerce.com> * @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php * @license http://www.opensource.org/licenses/mit-license.php
*/ */
class Resque_Job class Resque_Job
{ {
/** /**
* @var string The name of the queue that this job belongs to. * @var string The name of the queue that this job belongs to.
*/ */
public $queue; public $queue;
/** /**
* @var Resque_Worker Instance of the Resque worker running this job. * @var Resque_Worker Instance of the Resque worker running this job.
*/ */
public $worker; public $worker;
/** /**
* @var array Array containing details of the job. * @var array Array containing details of the job.
*/ */
public $payload; public $payload;
/** /**
* @var object|Resque_JobInterface Instance of the class performing work for this job. * @var object|Resque_JobInterface Instance of the class performing work for this job.
*/ */
private $instance; private $instance;
/** /**
* @var Resque_Job_FactoryInterface * @var Resque_Job_FactoryInterface
*/ */
private $jobFactory; private $jobFactory;
/** /**
* Instantiate a new instance of a job. * Instantiate a new instance of a job.
* *
* @param string $queue The queue that the job belongs to. * @param string $queue The queue that the job belongs to.
* @param array $payload array containing details of the job. * @param array $payload array containing details of the job.
*/ */
public function __construct($queue, $payload) public function __construct($queue, $payload)
{ {
$this->queue = $queue; $this->queue = $queue;
$this->payload = $payload; $this->payload = $payload;
} }
/** /**
* Create a new job and save it to the specified queue. * Create a new job and save it to the specified queue.
* *
* @param string $queue The name of the queue to place the job in. * @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job. * @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed. * @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $monitor Set to true to be able to monitor the status of a job. * @param boolean $monitor Set to true to be able to monitor the status of a job.
* @param string $id Unique identifier for tracking the job. Generated if not supplied. * @param string $id Unique identifier for tracking the job. Generated if not supplied.
* *
* @return string * @return string
* @throws \InvalidArgumentException * @throws \InvalidArgumentException
*/ */
public static function create($queue, $class, $args = null, $monitor = false, $id = null) public static function create($queue, $class, $args = null, $monitor = false, $id = null)
{ {
if (is_null($id)) { if (is_null($id)) {
$id = Resque::generateJobId(); $id = Resque::generateJobId();
} }
if($args !== null && !is_array($args)) { if ($args !== null && !is_array($args)) {
throw new InvalidArgumentException( throw new InvalidArgumentException(
'Supplied $args must be an array.' 'Supplied $args must be an array.'
); );
} }
Resque::push($queue, array( Resque::push($queue, array(
'class' => $class, 'class' => $class,
'args' => array($args), 'args' => array($args),
'id' => $id, 'id' => $id,
'queue_time' => microtime(true), 'queue_time' => microtime(true),
)); ));
if($monitor) { if ($monitor) {
Resque_Job_Status::create($id); Resque_Job_Status::create($id);
} }
return $id; return $id;
} }
/** /**
* Find the next available job from the specified queue and return an * Find the next available job from the specified queue and return an
* instance of Resque_Job for it. * instance of Resque_Job for it.
* *
* @param string $queue The name of the queue to check for a job in. * @param string $queue The name of the queue to check for a job in.
* @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. * @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/ */
public static function reserve($queue) public static function reserve($queue)
{ {
$payload = Resque::pop($queue); $payload = Resque::pop($queue);
if(!is_array($payload)) { if (!is_array($payload)) {
return false; return false;
} }
return new Resque_Job($queue, $payload); return new Resque_Job($queue, $payload);
} }
/** /**
* Find the next available job from the specified queues using blocking list pop * Find the next available job from the specified queues using blocking list pop
* and return an instance of Resque_Job for it. * and return an instance of Resque_Job for it.
* *
* @param array $queues * @param array $queues
* @param int $timeout * @param int $timeout
* @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. * @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/ */
public static function reserveBlocking(array $queues, $timeout = null) public static function reserveBlocking(array $queues, $timeout = null)
{ {
$item = Resque::blpop($queues, $timeout); $item = Resque::blpop($queues, $timeout);
if(!is_array($item)) { if (!is_array($item)) {
return false; return false;
} }
return new Resque_Job($item['queue'], $item['payload']); return new Resque_Job($item['queue'], $item['payload']);
} }
/** /**
* Update the status of the current job. * Update the status of the current job.
* *
* @param int $status Status constant from Resque_Job_Status indicating the current status of a job. * @param int $status Status constant from Resque_Job_Status indicating the current status of a job.
*/ */
public function updateStatus($status) public function updateStatus($status)
{ {
if(empty($this->payload['id'])) { if (empty($this->payload['id'])) {
return; return;
} }
$statusInstance = new Resque_Job_Status($this->payload['id']); $statusInstance = new Resque_Job_Status($this->payload['id']);
$statusInstance->update($status); $statusInstance->update($status);
} }
/** /**
* Return the status of the current job. * Return the status of the current job.
* *
* @return int The status of the job as one of the Resque_Job_Status constants. * @return int The status of the job as one of the Resque_Job_Status constants.
*/ */
public function getStatus() public function getStatus()
{ {
$status = new Resque_Job_Status($this->payload['id']); $status = new Resque_Job_Status($this->payload['id']);
return $status->get(); return $status->get();
} }
/** /**
* Get the arguments supplied to this job. * Get the arguments supplied to this job.
* *
* @return array Array of arguments. * @return array Array of arguments.
*/ */
public function getArguments() public function getArguments()
{ {
if (!isset($this->payload['args'])) { if (!isset($this->payload['args'])) {
return array(); return array();
} }
return $this->payload['args'][0]; return $this->payload['args'][0];
} }
/** /**
* Get the instantiated object for this job that will be performing work. * Get the instantiated object for this job that will be performing work.
* @return Resque_JobInterface Instance of the object that this job belongs to. * @return Resque_JobInterface Instance of the object that this job belongs to.
* @throws Resque_Exception * @throws Resque_Exception
*/ */
public function getInstance() public function getInstance()
{ {
if (!is_null($this->instance)) { if (!is_null($this->instance)) {
return $this->instance; return $this->instance;
} }
$this->instance = $this->getJobFactory()->create($this->payload['class'], $this->getArguments(), $this->queue); $this->instance = $this->getJobFactory()->create($this->payload['class'], $this->getArguments(), $this->queue);
$this->instance->job = $this; $this->instance->job = $this;
return $this->instance; return $this->instance;
} }
/** /**
* Actually execute a job by calling the perform method on the class * Actually execute a job by calling the perform method on the class
* associated with the job with the supplied arguments. * associated with the job with the supplied arguments.
* *
* @return bool * @return bool
* @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. * @throws Resque_Exception When the job's class could not be found or it does not contain a perform method.
*/ */
public function perform() public function perform()
{ {
try { try {
Resque_Event::trigger('beforePerform', $this); Resque_Event::trigger('beforePerform', $this);
$instance = $this->getInstance(); $instance = $this->getInstance();
if(method_exists($instance, 'setUp')) { if (method_exists($instance, 'setUp')) {
$instance->setUp(); $instance->setUp();
} }
$instance->perform(); $instance->perform();
if(method_exists($instance, 'tearDown')) { if (method_exists($instance, 'tearDown')) {
$instance->tearDown(); $instance->tearDown();
} }
Resque_Event::trigger('afterPerform', $this); Resque_Event::trigger('afterPerform', $this);
} } // beforePerform/setUp have said don't perform this job. Return.
// beforePerform/setUp have said don't perform this job. Return. catch (Resque_Job_DontPerform $e) {
catch(Resque_Job_DontPerform $e) { return false;
return false; }
}
return true; return true;
} }
/** /**
* Mark the current job as having failed. * Mark the current job as having failed.
* *
* @param $exception * @param $exception
*/ */
public function fail($exception) public function fail($exception)
{ {
Resque_Event::trigger('onFailure', array( Resque_Event::trigger('onFailure', array(
'exception' => $exception, 'exception' => $exception,
'job' => $this, 'job' => $this,
)); ));
$this->updateStatus(Resque_Job_Status::STATUS_FAILED); $this->updateStatus(Resque_Job_Status::STATUS_FAILED);
Resque_Failure::create( Resque_Failure::create(
$this->payload, $this->payload,
$exception, $exception,
$this->worker, $this->worker,
$this->queue $this->queue
); );
Resque_Stat::incr('failed'); Resque_Stat::incr('failed');
Resque_Stat::incr('failed:' . $this->worker); Resque_Stat::incr('failed:' . $this->worker);
} }
/** /**
* Re-queue the current job. * Re-queue the current job.
* @return string * @return string
*/ */
public function recreate() public function recreate()
{ {
$status = new Resque_Job_Status($this->payload['id']); $status = new Resque_Job_Status($this->payload['id']);
$monitor = false; $monitor = false;
if($status->isTracking()) { if ($status->isTracking()) {
$monitor = true; $monitor = true;
} }
return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor); return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor);
} }
/** /**
* Generate a string representation used to describe the current job. * Generate a string representation used to describe the current job.
* *
* @return string The string representation of the job. * @return string The string representation of the job.
*/ */
public function __toString() public function __toString()
{ {
$name = array( $name = array(
'Job{' . $this->queue .'}' 'Job{' . $this->queue . '}'
); );
if(!empty($this->payload['id'])) { if (!empty($this->payload['id'])) {
$name[] = 'ID: ' . $this->payload['id']; $name[] = 'ID: ' . $this->payload['id'];
} }
$name[] = $this->payload['class']; $name[] = $this->payload['class'];
if(!empty($this->payload['args'])) { if (!empty($this->payload['args'])) {
$name[] = json_encode($this->payload['args']); $name[] = json_encode($this->payload['args']);
} }
return '(' . implode(' | ', $name) . ')'; return '(' . implode(' | ', $name) . ')';
} }
/** /**
* @param Resque_Job_FactoryInterface $jobFactory * @param Resque_Job_FactoryInterface $jobFactory
* @return Resque_Job * @return Resque_Job
*/ */
public function setJobFactory(Resque_Job_FactoryInterface $jobFactory) public function setJobFactory(Resque_Job_FactoryInterface $jobFactory)
{ {
$this->jobFactory = $jobFactory; $this->jobFactory = $jobFactory;
return $this; return $this;
} }
/** /**
* @return Resque_Job_FactoryInterface * @return Resque_Job_FactoryInterface

View File

@ -239,8 +239,7 @@ class Resque_Redis
foreach ($args[0] AS $i => $v) { foreach ($args[0] AS $i => $v) {
$args[0][$i] = self::$defaultNamespace . $v; $args[0][$i] = self::$defaultNamespace . $v;
} }
} } else {
else {
$args[0] = self::$defaultNamespace . $args[0]; $args[0] = self::$defaultNamespace . $args[0];
} }
} }

View File

@ -11,7 +11,6 @@ use Psr\Log\LoggerInterface;
* @author Chris Boulton <chris@bigcommerce.com> * @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php * @license http://www.opensource.org/licenses/mit-license.php
*/ */
class Resque_Worker class Resque_Worker
{ {
/** /**

View File

@ -3,271 +3,271 @@
/** /**
* Resque_Worker tests. * Resque_Worker tests.
* *
* @package Resque/Tests * @package Resque/Tests
* @author Chris Boulton <chris@bigcommerce.com> * @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php * @license http://www.opensource.org/licenses/mit-license.php
*/ */
class Resque_Tests_WorkerTest extends Resque_Tests_TestCase class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
{ {
public function testWorkerRegistersInList() public function testWorkerRegistersInList()
{ {
$worker = new Resque_Worker('*'); $worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
// Make sure the worker is in the list // Make sure the worker is in the list
$this->assertTrue((bool)$this->redis->sismember('resque:workers', (string)$worker)); $this->assertTrue((bool)$this->redis->sismember('resque:workers', (string)$worker));
} }
public function testGetAllWorkers() public function testGetAllWorkers()
{ {
$num = 3; $num = 3;
// Register a few workers // Register a few workers
for($i = 0; $i < $num; ++$i) { for ($i = 0; $i < $num; ++$i) {
$worker = new Resque_Worker('queue_' . $i); $worker = new Resque_Worker('queue_' . $i);
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
} }
// Now try to get them // Now try to get them
$this->assertEquals($num, count(Resque_Worker::all())); $this->assertEquals($num, count(Resque_Worker::all()));
} }
public function testGetWorkerById() public function testGetWorkerById()
{ {
$worker = new Resque_Worker('*'); $worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
$newWorker = Resque_Worker::find((string)$worker); $newWorker = Resque_Worker::find((string)$worker);
$this->assertEquals((string)$worker, (string)$newWorker); $this->assertEquals((string)$worker, (string)$newWorker);
} }
public function testInvalidWorkerDoesNotExist() public function testInvalidWorkerDoesNotExist()
{ {
$this->assertFalse(Resque_Worker::exists('blah')); $this->assertFalse(Resque_Worker::exists('blah'));
} }
public function testWorkerCanUnregister() public function testWorkerCanUnregister()
{ {
$worker = new Resque_Worker('*'); $worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
$worker->unregisterWorker(); $worker->unregisterWorker();
$this->assertFalse(Resque_Worker::exists((string)$worker)); $this->assertFalse(Resque_Worker::exists((string)$worker));
$this->assertEquals(array(), Resque_Worker::all()); $this->assertEquals(array(), Resque_Worker::all());
$this->assertEquals(array(), $this->redis->smembers('resque:workers')); $this->assertEquals(array(), $this->redis->smembers('resque:workers'));
} }
public function testPausedWorkerDoesNotPickUpJobs() public function testPausedWorkerDoesNotPickUpJobs()
{ {
$worker = new Resque_Worker('*'); $worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->pauseProcessing(); $worker->pauseProcessing();
Resque::enqueue('jobs', 'Test_Job'); Resque::enqueue('jobs', 'Test_Job');
$worker->work(0); $worker->work(0);
$worker->work(0); $worker->work(0);
$this->assertEquals(0, Resque_Stat::get('processed')); $this->assertEquals(0, Resque_Stat::get('processed'));
} }
public function testResumedWorkerPicksUpJobs() public function testResumedWorkerPicksUpJobs()
{ {
$worker = new Resque_Worker('*'); $worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->pauseProcessing(); $worker->pauseProcessing();
Resque::enqueue('jobs', 'Test_Job'); Resque::enqueue('jobs', 'Test_Job');
$worker->work(0); $worker->work(0);
$this->assertEquals(0, Resque_Stat::get('processed')); $this->assertEquals(0, Resque_Stat::get('processed'));
$worker->unPauseProcessing(); $worker->unPauseProcessing();
$worker->work(0); $worker->work(0);
$this->assertEquals(1, Resque_Stat::get('processed')); $this->assertEquals(1, Resque_Stat::get('processed'));
} }
public function testWorkerCanWorkOverMultipleQueues() public function testWorkerCanWorkOverMultipleQueues()
{ {
$worker = new Resque_Worker(array( $worker = new Resque_Worker(array(
'queue1', 'queue1',
'queue2' 'queue2'
)); ));
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
Resque::enqueue('queue1', 'Test_Job_1'); Resque::enqueue('queue1', 'Test_Job_1');
Resque::enqueue('queue2', 'Test_Job_2'); Resque::enqueue('queue2', 'Test_Job_2');
$job = $worker->reserve(); $job = $worker->reserve();
$this->assertEquals('queue1', $job->queue); $this->assertEquals('queue1', $job->queue);
$job = $worker->reserve(); $job = $worker->reserve();
$this->assertEquals('queue2', $job->queue); $this->assertEquals('queue2', $job->queue);
} }
public function testWorkerWorksQueuesInSpecifiedOrder() public function testWorkerWorksQueuesInSpecifiedOrder()
{ {
$worker = new Resque_Worker(array( $worker = new Resque_Worker(array(
'high', 'high',
'medium', 'medium',
'low' 'low'
)); ));
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
// Queue the jobs in a different order // Queue the jobs in a different order
Resque::enqueue('low', 'Test_Job_1'); Resque::enqueue('low', 'Test_Job_1');
Resque::enqueue('high', 'Test_Job_2'); Resque::enqueue('high', 'Test_Job_2');
Resque::enqueue('medium', 'Test_Job_3'); Resque::enqueue('medium', 'Test_Job_3');
// Now check we get the jobs back in the right order // Now check we get the jobs back in the right order
$job = $worker->reserve(); $job = $worker->reserve();
$this->assertEquals('high', $job->queue); $this->assertEquals('high', $job->queue);
$job = $worker->reserve(); $job = $worker->reserve();
$this->assertEquals('medium', $job->queue); $this->assertEquals('medium', $job->queue);
$job = $worker->reserve(); $job = $worker->reserve();
$this->assertEquals('low', $job->queue); $this->assertEquals('low', $job->queue);
} }
public function testWildcardQueueWorkerWorksAllQueues() public function testWildcardQueueWorkerWorksAllQueues()
{ {
$worker = new Resque_Worker('*'); $worker = new Resque_Worker('*');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
Resque::enqueue('queue1', 'Test_Job_1'); Resque::enqueue('queue1', 'Test_Job_1');
Resque::enqueue('queue2', 'Test_Job_2'); Resque::enqueue('queue2', 'Test_Job_2');
$job = $worker->reserve(); $job = $worker->reserve();
$this->assertEquals('queue1', $job->queue); $this->assertEquals('queue1', $job->queue);
$job = $worker->reserve(); $job = $worker->reserve();
$this->assertEquals('queue2', $job->queue); $this->assertEquals('queue2', $job->queue);
} }
public function testWorkerDoesNotWorkOnUnknownQueues() public function testWorkerDoesNotWorkOnUnknownQueues()
{ {
$worker = new Resque_Worker('queue1'); $worker = new Resque_Worker('queue1');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
Resque::enqueue('queue2', 'Test_Job'); Resque::enqueue('queue2', 'Test_Job');
$this->assertFalse($worker->reserve()); $this->assertFalse($worker->reserve());
} }
public function testWorkerClearsItsStatusWhenNotWorking() public function testWorkerClearsItsStatusWhenNotWorking()
{ {
Resque::enqueue('jobs', 'Test_Job'); Resque::enqueue('jobs', 'Test_Job');
$worker = new Resque_Worker('jobs'); $worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$job = $worker->reserve(); $job = $worker->reserve();
$worker->workingOn($job); $worker->workingOn($job);
$worker->doneWorking(); $worker->doneWorking();
$this->assertEquals(array(), $worker->job()); $this->assertEquals(array(), $worker->job());
} }
public function testWorkerRecordsWhatItIsWorkingOn() public function testWorkerRecordsWhatItIsWorkingOn()
{ {
$worker = new Resque_Worker('jobs'); $worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
$payload = array( $payload = array(
'class' => 'Test_Job' 'class' => 'Test_Job'
); );
$job = new Resque_Job('jobs', $payload); $job = new Resque_Job('jobs', $payload);
$worker->workingOn($job); $worker->workingOn($job);
$job = $worker->job(); $job = $worker->job();
$this->assertEquals('jobs', $job['queue']); $this->assertEquals('jobs', $job['queue']);
if(!isset($job['run_at'])) { if (!isset($job['run_at'])) {
$this->fail('Job does not have run_at time'); $this->fail('Job does not have run_at time');
} }
$this->assertEquals($payload, $job['payload']); $this->assertEquals($payload, $job['payload']);
} }
public function testWorkerErasesItsStatsWhenShutdown() public function testWorkerErasesItsStatsWhenShutdown()
{ {
Resque::enqueue('jobs', 'Test_Job'); Resque::enqueue('jobs', 'Test_Job');
Resque::enqueue('jobs', 'Invalid_Job'); Resque::enqueue('jobs', 'Invalid_Job');
$worker = new Resque_Worker('jobs'); $worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->work(0); $worker->work(0);
$worker->work(0); $worker->work(0);
$this->assertEquals(0, $worker->getStat('processed')); $this->assertEquals(0, $worker->getStat('processed'));
$this->assertEquals(0, $worker->getStat('failed')); $this->assertEquals(0, $worker->getStat('failed'));
} }
public function testWorkerCleansUpDeadWorkersOnStartup() public function testWorkerCleansUpDeadWorkersOnStartup()
{ {
// Register a good worker // Register a good worker
$goodWorker = new Resque_Worker('jobs'); $goodWorker = new Resque_Worker('jobs');
$goodWorker->setLogger(new Resque_Log()); $goodWorker->setLogger(new Resque_Log());
$goodWorker->registerWorker(); $goodWorker->registerWorker();
$workerId = explode(':', $goodWorker); $workerId = explode(':', $goodWorker);
// Register some bad workers // Register some bad workers
$worker = new Resque_Worker('jobs'); $worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->setId($workerId[0].':1:jobs'); $worker->setId($workerId[0] . ':1:jobs');
$worker->registerWorker(); $worker->registerWorker();
$worker = new Resque_Worker(array('high', 'low')); $worker = new Resque_Worker(array('high', 'low'));
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->setId($workerId[0].':2:high,low'); $worker->setId($workerId[0] . ':2:high,low');
$worker->registerWorker(); $worker->registerWorker();
$this->assertEquals(3, count(Resque_Worker::all())); $this->assertEquals(3, count(Resque_Worker::all()));
$goodWorker->pruneDeadWorkers(); $goodWorker->pruneDeadWorkers();
// There should only be $goodWorker left now // There should only be $goodWorker left now
$this->assertEquals(1, count(Resque_Worker::all())); $this->assertEquals(1, count(Resque_Worker::all()));
} }
public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers()
{ {
// Register a bad worker on this machine // Register a bad worker on this machine
$worker = new Resque_Worker('jobs'); $worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$workerId = explode(':', $worker); $workerId = explode(':', $worker);
$worker->setId($workerId[0].':1:jobs'); $worker->setId($workerId[0] . ':1:jobs');
$worker->registerWorker(); $worker->registerWorker();
// Register some other false workers // Register some other false workers
$worker = new Resque_Worker('jobs'); $worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->setId('my.other.host:1:jobs'); $worker->setId('my.other.host:1:jobs');
$worker->registerWorker(); $worker->registerWorker();
$this->assertEquals(2, count(Resque_Worker::all())); $this->assertEquals(2, count(Resque_Worker::all()));
$worker->pruneDeadWorkers(); $worker->pruneDeadWorkers();
// my.other.host should be left // my.other.host should be left
$workers = Resque_Worker::all(); $workers = Resque_Worker::all();
$this->assertEquals(1, count($workers)); $this->assertEquals(1, count($workers));
$this->assertEquals((string)$worker, (string)$workers[0]); $this->assertEquals((string)$worker, (string)$workers[0]);
} }
public function testWorkerFailsUncompletedJobsOnExit() public function testWorkerFailsUncompletedJobsOnExit()
{ {
$worker = new Resque_Worker('jobs'); $worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
$payload = array( $payload = array(
'class' => 'Test_Job' 'class' => 'Test_Job'
); );
$job = new Resque_Job('jobs', $payload); $job = new Resque_Job('jobs', $payload);
$worker->workingOn($job); $worker->workingOn($job);
$worker->unregisterWorker(); $worker->unregisterWorker();
$this->assertEquals(1, Resque_Stat::get('failed')); $this->assertEquals(1, Resque_Stat::get('failed'));
} }
public function testBlockingListPop() public function testBlockingListPop()
{ {
@ -275,18 +275,17 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
return; return;
$worker = new Resque_Worker('jobs'); $worker = new Resque_Worker('jobs');
$worker->setLogger(new Resque_Log()); $worker->setLogger(new Resque_Log());
$worker->registerWorker(); $worker->registerWorker();
Resque::enqueue('jobs', 'Test_Job_1'); Resque::enqueue('jobs', 'Test_Job_1');
Resque::enqueue('jobs', 'Test_Job_2'); Resque::enqueue('jobs', 'Test_Job_2');
$i = 1; $i = 1;
while($job = $worker->reserve(true, 2)) while ($job = $worker->reserve(true, 2)) {
{
$this->assertEquals('Test_Job_' . $i, $job->payload['class']); $this->assertEquals('Test_Job_' . $i, $job->payload['class']);
if($i == 2) { if ($i == 2) {
break; break;
} }