diff --git a/demo/check_status.php b/demo/check_status.php index 871daba..72767f3 100644 --- a/demo/check_status.php +++ b/demo/check_status.php @@ -1,6 +1,6 @@ isTracking()) { - die("Resque is not tracking the status of this job.\n"); +if (!$status->isTracking()) { + die("Resque is not tracking the status of this job.\n"); } -echo "Tracking status of ".$argv[1].". Press [break] to stop.\n\n"; -while(true) { - fwrite(STDOUT, "Status of ".$argv[1]." is: ".$status->get()."\n"); - sleep(1); +echo "Tracking status of " . $argv[1] . ". Press [break] to stop.\n\n"; +while (true) { + fwrite(STDOUT, "Status of " . $argv[1] . " is: " . $status->get() . "\n"); + sleep(1); } \ No newline at end of file diff --git a/demo/init.php b/demo/init.php index 9078bcd..62dff24 100644 --- a/demo/init.php +++ b/demo/init.php @@ -3,23 +3,23 @@ // NOTE: You should NOT use this when developing against php-resque. // The autoload code below is specifically for this demo. $files = array( - __DIR__ . '/../../vendor/autoload.php', - __DIR__ . '/../../../../autoload.php', - __DIR__ . '/../vendor/autoload.php', + __DIR__ . '/../../vendor/autoload.php', + __DIR__ . '/../../../../autoload.php', + __DIR__ . '/../vendor/autoload.php', ); $found = false; foreach ($files as $file) { - if (file_exists($file)) { - require_once $file; - break; - } + if (file_exists($file)) { + require_once $file; + break; + } } if (!class_exists('Composer\Autoload\ClassLoader', false)) { - die( - 'You need to set up the project dependencies using the following commands:' . PHP_EOL . - 'curl -s http://getcomposer.org/installer | php' . PHP_EOL . - 'php composer.phar install' . PHP_EOL - ); + die( + 'You need to set up the project dependencies using the following commands:' . PHP_EOL . + 'curl -s http://getcomposer.org/installer | php' . PHP_EOL . + 'php composer.phar install' . PHP_EOL + ); } \ No newline at end of file diff --git a/demo/job.php b/demo/job.php index ddda893..d861ee9 100644 --- a/demo/job.php +++ b/demo/job.php @@ -1,10 +1,11 @@ '); - sleep(1); - fwrite(STDOUT, 'Job ended!' . PHP_EOL); - } + sleep(1); + fwrite(STDOUT, 'Job ended!' . PHP_EOL); + } } \ No newline at end of file diff --git a/demo/long_job.php b/demo/long_job.php index 1cfe5cb..47d0a2d 100644 --- a/demo/long_job.php +++ b/demo/long_job.php @@ -1,8 +1,9 @@ time(), - 'array' => array( - 'test' => 'test', - ), + 'time' => time(), + 'array' => array( + 'test' => 'test', + ), ); if (empty($argv[2])) { - $jobId = Resque::enqueue('default', $argv[1], $args, true); + $jobId = Resque::enqueue('default', $argv[1], $args, true); } else { - $jobId = Resque::enqueue($argv[1], $argv[2], $args, true); + $jobId = Resque::enqueue($argv[1], $argv[2], $args, true); } -echo "Queued job ".$jobId."\n\n"; +echo "Queued job " . $jobId . "\n\n"; diff --git a/extras/sample-plugin.php b/extras/sample-plugin.php index e70dffd..340bca7 100644 --- a/extras/sample-plugin.php +++ b/extras/sample-plugin.php @@ -8,42 +8,42 @@ Resque_Event::listen('beforePerform', array('My_Resque_Plugin', 'beforePerform') Resque_Event::listen('afterPerform', array('My_Resque_Plugin', 'afterPerform')); Resque_Event::listen('onFailure', array('My_Resque_Plugin', 'onFailure')); -class My_Resque_Plugin +class Sample_Resque_Plugin { - public static function afterEnqueue($class, $arguments) - { - echo "Job was queued for " . $class . ". Arguments:"; - print_r($arguments); - } - - public static function beforeFirstFork($worker) - { - echo "Worker started. Listening on queues: " . implode(', ', $worker->queues(false)) . "\n"; - } - - public static function beforeFork($job) - { - echo "Just about to fork to run " . $job; - } - - public static function afterFork($job) - { - echo "Forked to run " . $job . ". This is the child process.\n"; - } - - public static function beforePerform($job) - { - echo "Cancelling " . $job . "\n"; - // throw new Resque_Job_DontPerform; - } - - public static function afterPerform($job) - { - echo "Just performed " . $job . "\n"; - } - - public static function onFailure($exception, $job) - { - echo $job . " threw an exception:\n" . $exception; - } + public static function afterEnqueue($class, $arguments) + { + echo "Job was queued for " . $class . ". Arguments:"; + print_r($arguments); + } + + public static function beforeFirstFork($worker) + { + echo "Worker started. Listening on queues: " . implode(', ', $worker->queues(false)) . "\n"; + } + + public static function beforeFork($job) + { + echo "Just about to fork to run " . $job; + } + + public static function afterFork($job) + { + echo "Forked to run " . $job . ". This is the child process.\n"; + } + + public static function beforePerform($job) + { + echo "Cancelling " . $job . "\n"; + // throw new Resque_Job_DontPerform; + } + + public static function afterPerform($job) + { + echo "Just performed " . $job . "\n"; + } + + public static function onFailure($exception, $job) + { + echo $job . " threw an exception:\n" . $exception; + } } \ No newline at end of file diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index e1e51c2..b47b515 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -3,279 +3,278 @@ /** * Resque job. * - * @package Resque/Job - * @author Chris Boulton - * @license http://www.opensource.org/licenses/mit-license.php + * @package Resque/Job + * @author Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Job { - /** - * @var string The name of the queue that this job belongs to. - */ - public $queue; + /** + * @var string The name of the queue that this job belongs to. + */ + public $queue; - /** - * @var Resque_Worker Instance of the Resque worker running this job. - */ - public $worker; + /** + * @var Resque_Worker Instance of the Resque worker running this job. + */ + public $worker; - /** - * @var array Array containing details of the job. - */ - public $payload; + /** + * @var array Array containing details of the job. + */ + public $payload; - /** - * @var object|Resque_JobInterface Instance of the class performing work for this job. - */ - private $instance; + /** + * @var object|Resque_JobInterface Instance of the class performing work for this job. + */ + private $instance; - /** - * @var Resque_Job_FactoryInterface - */ - private $jobFactory; + /** + * @var Resque_Job_FactoryInterface + */ + private $jobFactory; - /** - * Instantiate a new instance of a job. - * - * @param string $queue The queue that the job belongs to. - * @param array $payload array containing details of the job. - */ - public function __construct($queue, $payload) - { - $this->queue = $queue; - $this->payload = $payload; - } + /** + * Instantiate a new instance of a job. + * + * @param string $queue The queue that the job belongs to. + * @param array $payload array containing details of the job. + */ + public function __construct($queue, $payload) + { + $this->queue = $queue; + $this->payload = $payload; + } - /** - * Create a new job and save it to the specified queue. - * - * @param string $queue The name of the queue to place the job in. - * @param string $class The name of the class that contains the code to execute the job. - * @param array $args Any optional arguments that should be passed when the job is executed. - * @param boolean $monitor Set to true to be able to monitor the status of a job. - * @param string $id Unique identifier for tracking the job. Generated if not supplied. - * - * @return string - * @throws \InvalidArgumentException - */ - public static function create($queue, $class, $args = null, $monitor = false, $id = null) - { - if (is_null($id)) { - $id = Resque::generateJobId(); - } + /** + * Create a new job and save it to the specified queue. + * + * @param string $queue The name of the queue to place the job in. + * @param string $class The name of the class that contains the code to execute the job. + * @param array $args Any optional arguments that should be passed when the job is executed. + * @param boolean $monitor Set to true to be able to monitor the status of a job. + * @param string $id Unique identifier for tracking the job. Generated if not supplied. + * + * @return string + * @throws \InvalidArgumentException + */ + public static function create($queue, $class, $args = null, $monitor = false, $id = null) + { + if (is_null($id)) { + $id = Resque::generateJobId(); + } - if($args !== null && !is_array($args)) { - throw new InvalidArgumentException( - 'Supplied $args must be an array.' - ); - } - Resque::push($queue, array( - 'class' => $class, - 'args' => array($args), - 'id' => $id, - 'queue_time' => microtime(true), - )); + if ($args !== null && !is_array($args)) { + throw new InvalidArgumentException( + 'Supplied $args must be an array.' + ); + } + Resque::push($queue, array( + 'class' => $class, + 'args' => array($args), + 'id' => $id, + 'queue_time' => microtime(true), + )); - if($monitor) { - Resque_Job_Status::create($id); - } + if ($monitor) { + Resque_Job_Status::create($id); + } - return $id; - } + return $id; + } - /** - * Find the next available job from the specified queue and return an - * instance of Resque_Job for it. - * - * @param string $queue The name of the queue to check for a job in. - * @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. - */ - public static function reserve($queue) - { + /** + * Find the next available job from the specified queue and return an + * instance of Resque_Job for it. + * + * @param string $queue The name of the queue to check for a job in. + * @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserve($queue) + { $payload = Resque::pop($queue); - if(!is_array($payload)) { - return false; - } + if (!is_array($payload)) { + return false; + } - return new Resque_Job($queue, $payload); - } + return new Resque_Job($queue, $payload); + } - /** - * Find the next available job from the specified queues using blocking list pop - * and return an instance of Resque_Job for it. - * - * @param array $queues - * @param int $timeout - * @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. - */ - public static function reserveBlocking(array $queues, $timeout = null) - { - $item = Resque::blpop($queues, $timeout); - if(!is_array($item)) { - return false; - } + /** + * Find the next available job from the specified queues using blocking list pop + * and return an instance of Resque_Job for it. + * + * @param array $queues + * @param int $timeout + * @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserveBlocking(array $queues, $timeout = null) + { + $item = Resque::blpop($queues, $timeout); + if (!is_array($item)) { + return false; + } - return new Resque_Job($item['queue'], $item['payload']); - } + return new Resque_Job($item['queue'], $item['payload']); + } - /** - * Update the status of the current job. - * - * @param int $status Status constant from Resque_Job_Status indicating the current status of a job. - */ - public function updateStatus($status) - { - if(empty($this->payload['id'])) { - return; - } + /** + * Update the status of the current job. + * + * @param int $status Status constant from Resque_Job_Status indicating the current status of a job. + */ + public function updateStatus($status) + { + if (empty($this->payload['id'])) { + return; + } - $statusInstance = new Resque_Job_Status($this->payload['id']); - $statusInstance->update($status); - } + $statusInstance = new Resque_Job_Status($this->payload['id']); + $statusInstance->update($status); + } - /** - * Return the status of the current job. - * - * @return int The status of the job as one of the Resque_Job_Status constants. - */ - public function getStatus() - { - $status = new Resque_Job_Status($this->payload['id']); - return $status->get(); - } + /** + * Return the status of the current job. + * + * @return int The status of the job as one of the Resque_Job_Status constants. + */ + public function getStatus() + { + $status = new Resque_Job_Status($this->payload['id']); + return $status->get(); + } - /** - * Get the arguments supplied to this job. - * - * @return array Array of arguments. - */ - public function getArguments() - { - if (!isset($this->payload['args'])) { - return array(); - } + /** + * Get the arguments supplied to this job. + * + * @return array Array of arguments. + */ + public function getArguments() + { + if (!isset($this->payload['args'])) { + return array(); + } - return $this->payload['args'][0]; - } + return $this->payload['args'][0]; + } - /** - * Get the instantiated object for this job that will be performing work. - * @return Resque_JobInterface Instance of the object that this job belongs to. - * @throws Resque_Exception - */ - public function getInstance() - { - if (!is_null($this->instance)) { - return $this->instance; - } + /** + * Get the instantiated object for this job that will be performing work. + * @return Resque_JobInterface Instance of the object that this job belongs to. + * @throws Resque_Exception + */ + public function getInstance() + { + if (!is_null($this->instance)) { + return $this->instance; + } $this->instance = $this->getJobFactory()->create($this->payload['class'], $this->getArguments(), $this->queue); $this->instance->job = $this; return $this->instance; - } + } - /** - * Actually execute a job by calling the perform method on the class - * associated with the job with the supplied arguments. - * - * @return bool - * @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. - */ - public function perform() - { - try { - Resque_Event::trigger('beforePerform', $this); + /** + * Actually execute a job by calling the perform method on the class + * associated with the job with the supplied arguments. + * + * @return bool + * @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. + */ + public function perform() + { + try { + Resque_Event::trigger('beforePerform', $this); - $instance = $this->getInstance(); - if(method_exists($instance, 'setUp')) { - $instance->setUp(); - } + $instance = $this->getInstance(); + if (method_exists($instance, 'setUp')) { + $instance->setUp(); + } - $instance->perform(); + $instance->perform(); - if(method_exists($instance, 'tearDown')) { - $instance->tearDown(); - } + if (method_exists($instance, 'tearDown')) { + $instance->tearDown(); + } - Resque_Event::trigger('afterPerform', $this); - } - // beforePerform/setUp have said don't perform this job. Return. - catch(Resque_Job_DontPerform $e) { - return false; - } + Resque_Event::trigger('afterPerform', $this); + } // beforePerform/setUp have said don't perform this job. Return. + catch (Resque_Job_DontPerform $e) { + return false; + } - return true; - } + return true; + } - /** - * Mark the current job as having failed. - * - * @param $exception - */ - public function fail($exception) - { - Resque_Event::trigger('onFailure', array( - 'exception' => $exception, - 'job' => $this, - )); + /** + * Mark the current job as having failed. + * + * @param $exception + */ + public function fail($exception) + { + Resque_Event::trigger('onFailure', array( + 'exception' => $exception, + 'job' => $this, + )); - $this->updateStatus(Resque_Job_Status::STATUS_FAILED); - Resque_Failure::create( - $this->payload, - $exception, - $this->worker, - $this->queue - ); - Resque_Stat::incr('failed'); - Resque_Stat::incr('failed:' . $this->worker); - } + $this->updateStatus(Resque_Job_Status::STATUS_FAILED); + Resque_Failure::create( + $this->payload, + $exception, + $this->worker, + $this->queue + ); + Resque_Stat::incr('failed'); + Resque_Stat::incr('failed:' . $this->worker); + } - /** - * Re-queue the current job. - * @return string - */ - public function recreate() - { - $status = new Resque_Job_Status($this->payload['id']); - $monitor = false; - if($status->isTracking()) { - $monitor = true; - } + /** + * Re-queue the current job. + * @return string + */ + public function recreate() + { + $status = new Resque_Job_Status($this->payload['id']); + $monitor = false; + if ($status->isTracking()) { + $monitor = true; + } - return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor); - } + return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor); + } - /** - * Generate a string representation used to describe the current job. - * - * @return string The string representation of the job. - */ - public function __toString() - { - $name = array( - 'Job{' . $this->queue .'}' - ); - if(!empty($this->payload['id'])) { - $name[] = 'ID: ' . $this->payload['id']; - } - $name[] = $this->payload['class']; - if(!empty($this->payload['args'])) { - $name[] = json_encode($this->payload['args']); - } - return '(' . implode(' | ', $name) . ')'; - } + /** + * Generate a string representation used to describe the current job. + * + * @return string The string representation of the job. + */ + public function __toString() + { + $name = array( + 'Job{' . $this->queue . '}' + ); + if (!empty($this->payload['id'])) { + $name[] = 'ID: ' . $this->payload['id']; + } + $name[] = $this->payload['class']; + if (!empty($this->payload['args'])) { + $name[] = json_encode($this->payload['args']); + } + return '(' . implode(' | ', $name) . ')'; + } - /** - * @param Resque_Job_FactoryInterface $jobFactory - * @return Resque_Job - */ - public function setJobFactory(Resque_Job_FactoryInterface $jobFactory) - { - $this->jobFactory = $jobFactory; + /** + * @param Resque_Job_FactoryInterface $jobFactory + * @return Resque_Job + */ + public function setJobFactory(Resque_Job_FactoryInterface $jobFactory) + { + $this->jobFactory = $jobFactory; - return $this; - } + return $this; + } /** * @return Resque_Job_FactoryInterface diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index f07739e..b68f4f3 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -239,8 +239,7 @@ class Resque_Redis foreach ($args[0] AS $i => $v) { $args[0][$i] = self::$defaultNamespace . $v; } - } - else { + } else { $args[0] = self::$defaultNamespace . $args[0]; } } diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index cb2c92d..37f0fad 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -11,7 +11,6 @@ use Psr\Log\LoggerInterface; * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ - class Resque_Worker { /** diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index eb1ff35..3dc4e2f 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -3,271 +3,271 @@ /** * Resque_Worker tests. * - * @package Resque/Tests - * @author Chris Boulton - * @license http://www.opensource.org/licenses/mit-license.php + * @package Resque/Tests + * @author Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase { - public function testWorkerRegistersInList() - { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); + public function testWorkerRegistersInList() + { + $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); - // Make sure the worker is in the list - $this->assertTrue((bool)$this->redis->sismember('resque:workers', (string)$worker)); - } + // Make sure the worker is in the list + $this->assertTrue((bool)$this->redis->sismember('resque:workers', (string)$worker)); + } - public function testGetAllWorkers() - { - $num = 3; - // Register a few workers - for($i = 0; $i < $num; ++$i) { - $worker = new Resque_Worker('queue_' . $i); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); - } + public function testGetAllWorkers() + { + $num = 3; + // Register a few workers + for ($i = 0; $i < $num; ++$i) { + $worker = new Resque_Worker('queue_' . $i); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); + } - // Now try to get them - $this->assertEquals($num, count(Resque_Worker::all())); - } + // Now try to get them + $this->assertEquals($num, count(Resque_Worker::all())); + } - public function testGetWorkerById() - { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); + public function testGetWorkerById() + { + $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); - $newWorker = Resque_Worker::find((string)$worker); - $this->assertEquals((string)$worker, (string)$newWorker); - } + $newWorker = Resque_Worker::find((string)$worker); + $this->assertEquals((string)$worker, (string)$newWorker); + } - public function testInvalidWorkerDoesNotExist() - { - $this->assertFalse(Resque_Worker::exists('blah')); - } + public function testInvalidWorkerDoesNotExist() + { + $this->assertFalse(Resque_Worker::exists('blah')); + } - public function testWorkerCanUnregister() - { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); - $worker->unregisterWorker(); + public function testWorkerCanUnregister() + { + $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); + $worker->unregisterWorker(); - $this->assertFalse(Resque_Worker::exists((string)$worker)); - $this->assertEquals(array(), Resque_Worker::all()); - $this->assertEquals(array(), $this->redis->smembers('resque:workers')); - } + $this->assertFalse(Resque_Worker::exists((string)$worker)); + $this->assertEquals(array(), Resque_Worker::all()); + $this->assertEquals(array(), $this->redis->smembers('resque:workers')); + } - public function testPausedWorkerDoesNotPickUpJobs() - { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); - $worker->pauseProcessing(); - Resque::enqueue('jobs', 'Test_Job'); - $worker->work(0); - $worker->work(0); - $this->assertEquals(0, Resque_Stat::get('processed')); - } + public function testPausedWorkerDoesNotPickUpJobs() + { + $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); + $worker->pauseProcessing(); + Resque::enqueue('jobs', 'Test_Job'); + $worker->work(0); + $worker->work(0); + $this->assertEquals(0, Resque_Stat::get('processed')); + } - public function testResumedWorkerPicksUpJobs() - { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); - $worker->pauseProcessing(); - Resque::enqueue('jobs', 'Test_Job'); - $worker->work(0); - $this->assertEquals(0, Resque_Stat::get('processed')); - $worker->unPauseProcessing(); - $worker->work(0); - $this->assertEquals(1, Resque_Stat::get('processed')); - } + public function testResumedWorkerPicksUpJobs() + { + $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); + $worker->pauseProcessing(); + Resque::enqueue('jobs', 'Test_Job'); + $worker->work(0); + $this->assertEquals(0, Resque_Stat::get('processed')); + $worker->unPauseProcessing(); + $worker->work(0); + $this->assertEquals(1, Resque_Stat::get('processed')); + } - public function testWorkerCanWorkOverMultipleQueues() - { - $worker = new Resque_Worker(array( - 'queue1', - 'queue2' - )); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); - Resque::enqueue('queue1', 'Test_Job_1'); - Resque::enqueue('queue2', 'Test_Job_2'); + public function testWorkerCanWorkOverMultipleQueues() + { + $worker = new Resque_Worker(array( + 'queue1', + 'queue2' + )); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); + Resque::enqueue('queue1', 'Test_Job_1'); + Resque::enqueue('queue2', 'Test_Job_2'); - $job = $worker->reserve(); - $this->assertEquals('queue1', $job->queue); + $job = $worker->reserve(); + $this->assertEquals('queue1', $job->queue); - $job = $worker->reserve(); - $this->assertEquals('queue2', $job->queue); - } + $job = $worker->reserve(); + $this->assertEquals('queue2', $job->queue); + } - public function testWorkerWorksQueuesInSpecifiedOrder() - { - $worker = new Resque_Worker(array( - 'high', - 'medium', - 'low' - )); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); + public function testWorkerWorksQueuesInSpecifiedOrder() + { + $worker = new Resque_Worker(array( + 'high', + 'medium', + 'low' + )); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); - // Queue the jobs in a different order - Resque::enqueue('low', 'Test_Job_1'); - Resque::enqueue('high', 'Test_Job_2'); - Resque::enqueue('medium', 'Test_Job_3'); + // Queue the jobs in a different order + Resque::enqueue('low', 'Test_Job_1'); + Resque::enqueue('high', 'Test_Job_2'); + Resque::enqueue('medium', 'Test_Job_3'); - // Now check we get the jobs back in the right order - $job = $worker->reserve(); - $this->assertEquals('high', $job->queue); + // Now check we get the jobs back in the right order + $job = $worker->reserve(); + $this->assertEquals('high', $job->queue); - $job = $worker->reserve(); - $this->assertEquals('medium', $job->queue); + $job = $worker->reserve(); + $this->assertEquals('medium', $job->queue); - $job = $worker->reserve(); - $this->assertEquals('low', $job->queue); - } + $job = $worker->reserve(); + $this->assertEquals('low', $job->queue); + } - public function testWildcardQueueWorkerWorksAllQueues() - { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); + public function testWildcardQueueWorkerWorksAllQueues() + { + $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); - Resque::enqueue('queue1', 'Test_Job_1'); - Resque::enqueue('queue2', 'Test_Job_2'); + Resque::enqueue('queue1', 'Test_Job_1'); + Resque::enqueue('queue2', 'Test_Job_2'); - $job = $worker->reserve(); - $this->assertEquals('queue1', $job->queue); + $job = $worker->reserve(); + $this->assertEquals('queue1', $job->queue); - $job = $worker->reserve(); - $this->assertEquals('queue2', $job->queue); - } + $job = $worker->reserve(); + $this->assertEquals('queue2', $job->queue); + } - public function testWorkerDoesNotWorkOnUnknownQueues() - { - $worker = new Resque_Worker('queue1'); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); - Resque::enqueue('queue2', 'Test_Job'); + public function testWorkerDoesNotWorkOnUnknownQueues() + { + $worker = new Resque_Worker('queue1'); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); + Resque::enqueue('queue2', 'Test_Job'); - $this->assertFalse($worker->reserve()); - } + $this->assertFalse($worker->reserve()); + } - public function testWorkerClearsItsStatusWhenNotWorking() - { - Resque::enqueue('jobs', 'Test_Job'); - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); - $job = $worker->reserve(); - $worker->workingOn($job); - $worker->doneWorking(); - $this->assertEquals(array(), $worker->job()); - } + public function testWorkerClearsItsStatusWhenNotWorking() + { + Resque::enqueue('jobs', 'Test_Job'); + $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); + $job = $worker->reserve(); + $worker->workingOn($job); + $worker->doneWorking(); + $this->assertEquals(array(), $worker->job()); + } - public function testWorkerRecordsWhatItIsWorkingOn() - { - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); + public function testWorkerRecordsWhatItIsWorkingOn() + { + $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); - $payload = array( - 'class' => 'Test_Job' - ); - $job = new Resque_Job('jobs', $payload); - $worker->workingOn($job); + $payload = array( + 'class' => 'Test_Job' + ); + $job = new Resque_Job('jobs', $payload); + $worker->workingOn($job); - $job = $worker->job(); - $this->assertEquals('jobs', $job['queue']); - if(!isset($job['run_at'])) { - $this->fail('Job does not have run_at time'); - } - $this->assertEquals($payload, $job['payload']); - } + $job = $worker->job(); + $this->assertEquals('jobs', $job['queue']); + if (!isset($job['run_at'])) { + $this->fail('Job does not have run_at time'); + } + $this->assertEquals($payload, $job['payload']); + } - public function testWorkerErasesItsStatsWhenShutdown() - { - Resque::enqueue('jobs', 'Test_Job'); - Resque::enqueue('jobs', 'Invalid_Job'); + public function testWorkerErasesItsStatsWhenShutdown() + { + Resque::enqueue('jobs', 'Test_Job'); + Resque::enqueue('jobs', 'Invalid_Job'); - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); - $worker->work(0); - $worker->work(0); + $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); + $worker->work(0); + $worker->work(0); - $this->assertEquals(0, $worker->getStat('processed')); - $this->assertEquals(0, $worker->getStat('failed')); - } + $this->assertEquals(0, $worker->getStat('processed')); + $this->assertEquals(0, $worker->getStat('failed')); + } - public function testWorkerCleansUpDeadWorkersOnStartup() - { - // Register a good worker - $goodWorker = new Resque_Worker('jobs'); - $goodWorker->setLogger(new Resque_Log()); - $goodWorker->registerWorker(); - $workerId = explode(':', $goodWorker); + public function testWorkerCleansUpDeadWorkersOnStartup() + { + // Register a good worker + $goodWorker = new Resque_Worker('jobs'); + $goodWorker->setLogger(new Resque_Log()); + $goodWorker->registerWorker(); + $workerId = explode(':', $goodWorker); - // Register some bad workers - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); - $worker->setId($workerId[0].':1:jobs'); - $worker->registerWorker(); + // Register some bad workers + $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); + $worker->setId($workerId[0] . ':1:jobs'); + $worker->registerWorker(); - $worker = new Resque_Worker(array('high', 'low')); - $worker->setLogger(new Resque_Log()); - $worker->setId($workerId[0].':2:high,low'); - $worker->registerWorker(); + $worker = new Resque_Worker(array('high', 'low')); + $worker->setLogger(new Resque_Log()); + $worker->setId($workerId[0] . ':2:high,low'); + $worker->registerWorker(); - $this->assertEquals(3, count(Resque_Worker::all())); + $this->assertEquals(3, count(Resque_Worker::all())); - $goodWorker->pruneDeadWorkers(); + $goodWorker->pruneDeadWorkers(); - // There should only be $goodWorker left now - $this->assertEquals(1, count(Resque_Worker::all())); - } + // There should only be $goodWorker left now + $this->assertEquals(1, count(Resque_Worker::all())); + } - public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() - { - // Register a bad worker on this machine - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); - $workerId = explode(':', $worker); - $worker->setId($workerId[0].':1:jobs'); - $worker->registerWorker(); + public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() + { + // Register a bad worker on this machine + $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); + $workerId = explode(':', $worker); + $worker->setId($workerId[0] . ':1:jobs'); + $worker->registerWorker(); - // Register some other false workers - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); - $worker->setId('my.other.host:1:jobs'); - $worker->registerWorker(); + // Register some other false workers + $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); + $worker->setId('my.other.host:1:jobs'); + $worker->registerWorker(); - $this->assertEquals(2, count(Resque_Worker::all())); + $this->assertEquals(2, count(Resque_Worker::all())); - $worker->pruneDeadWorkers(); + $worker->pruneDeadWorkers(); - // my.other.host should be left - $workers = Resque_Worker::all(); - $this->assertEquals(1, count($workers)); - $this->assertEquals((string)$worker, (string)$workers[0]); - } + // my.other.host should be left + $workers = Resque_Worker::all(); + $this->assertEquals(1, count($workers)); + $this->assertEquals((string)$worker, (string)$workers[0]); + } - public function testWorkerFailsUncompletedJobsOnExit() - { - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); - $worker->registerWorker(); + public function testWorkerFailsUncompletedJobsOnExit() + { + $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); - $payload = array( - 'class' => 'Test_Job' - ); - $job = new Resque_Job('jobs', $payload); + $payload = array( + 'class' => 'Test_Job' + ); + $job = new Resque_Job('jobs', $payload); - $worker->workingOn($job); - $worker->unregisterWorker(); + $worker->workingOn($job); + $worker->unregisterWorker(); - $this->assertEquals(1, Resque_Stat::get('failed')); - } + $this->assertEquals(1, Resque_Stat::get('failed')); + } public function testBlockingListPop() { @@ -275,18 +275,17 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase return; $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('jobs', 'Test_Job_1'); Resque::enqueue('jobs', 'Test_Job_2'); $i = 1; - while($job = $worker->reserve(true, 2)) - { + while ($job = $worker->reserve(true, 2)) { $this->assertEquals('Test_Job_' . $i, $job->payload['class']); - if($i == 2) { + if ($i == 2) { break; }