From 41bf8c1c26708232252d85310402163c563e7e7e Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Fri, 24 Aug 2012 16:51:03 +0200 Subject: [PATCH 1/6] Basic support for blocking list pop --- lib/Resque.php | 17 ++++++++++----- lib/Resque/Job.php | 6 +++--- lib/Resque/Redis.php | 1 + lib/Resque/RedisCluster.php | 1 + lib/Resque/Worker.php | 42 ++++++++++++++++++------------------- 5 files changed, 38 insertions(+), 29 deletions(-) diff --git a/lib/Resque.php b/lib/Resque.php index 9bc8143..4fe5a16 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -14,6 +14,8 @@ class Resque { const VERSION = '1.0'; + const DEFAULT_INTERVAL = 5; + /** * @var Resque_Redis Instance of Resque_Redis that talks to redis. */ @@ -115,14 +117,19 @@ class Resque * @param string $queue The name of the queue to fetch an item from. * @return array Decoded item from the queue. */ - public static function pop($queue) + public static function pop($queue, $interval = null) { - $item = self::redis()->lpop('queue:' . $queue); + if($interval == null) { + $item = self::redis()->lpop('queue:' . $queue); + } else { + $item = self::redis()->blpop('queue:' . $queue, $interval ?: Resque::DEFAULT_INTERVAL); + } + if(!$item) { return; } - return json_decode($item, true); + return json_decode($interval == 0 ? $item : $item[1], true); } /** @@ -168,10 +175,10 @@ class Resque * @param string $queue Queue to fetch next available job from. * @return Resque_Job Instance of Resque_Job to be processed, false if none or error. */ - public static function reserve($queue) + public static function reserve($queue, $interval = null) { require_once dirname(__FILE__) . '/Resque/Job.php'; - return Resque_Job::reserve($queue); + return Resque_Job::reserve($queue, $interval); } /** diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 0d275a1..6af0219 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -83,9 +83,9 @@ class Resque_Job * @param string $queue The name of the queue to check for a job in. * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. */ - public static function reserve($queue) + public static function reserve($queue, $interval = null) { - $payload = Resque::pop($queue); + $payload = Resque::pop($queue, $interval); if(!is_array($payload)) { return false; } @@ -156,7 +156,7 @@ class Resque_Job ); } - $this->instance = new $this->payload['class'](); + $this->instance = new $this->payload['class']; $this->instance->job = $this; $this->instance->args = $this->getArguments(); $this->instance->queue = $this->queue; diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index b6d7522..e1f077f 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -50,6 +50,7 @@ class Resque_Redis extends Redisent 'lset', 'lrem', 'lpop', + 'blpop', 'rpop', 'sadd', 'srem', diff --git a/lib/Resque/RedisCluster.php b/lib/Resque/RedisCluster.php index 39c3f5c..e884653 100644 --- a/lib/Resque/RedisCluster.php +++ b/lib/Resque/RedisCluster.php @@ -50,6 +50,7 @@ class Resque_RedisCluster extends RedisentCluster 'lset', 'lrem', 'lpop', + 'blpop', 'rpop', 'sadd', 'srem', diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 31f2f3c..58571be 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -153,7 +153,7 @@ class Resque_Worker * * @param int $interval How often to check for new jobs across the queues. */ - public function work($interval = 5) + public function work($interval = Resque::DEFAULT_INTERVAL) { $this->updateProcLine('Starting'); $this->startup(); @@ -166,25 +166,25 @@ class Resque_Worker // Attempt to find and reserve a job $job = false; if(!$this->paused) { - $job = $this->reserve(); + $job = $this->reserve($interval); } - if(!$job) { - // For an interval of 0, break now - helps with unit testing etc - if($interval == 0) { - break; - } - // If no job was found, we sleep for $interval before continuing and checking again - $this->log('Sleeping for ' . $interval, true); - if($this->paused) { - $this->updateProcLine('Paused'); - } - else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); - } - usleep($interval * 1000000); - continue; - } + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + + if(!$job) { + // For an interval of 0, break now - helps with unit testing etc + if($interval == 0) { + break; + } + + // If no job was found, we sleep for $interval before continuing and checking again + if($this->paused) { + $this->updateProcLine('Paused'); + usleep($interval * 1000000); //it's paused, so don't hog redis with requests. + } + + continue; + } $this->log('got ' . $job); Resque_Event::trigger('beforeFork', $job); @@ -252,15 +252,15 @@ class Resque_Worker * * @return object|boolean Instance of Resque_Job if a job is found, false if not. */ - public function reserve() + public function reserve($interval = null) { $queues = $this->queues(); if(!is_array($queues)) { return; } foreach($queues as $queue) { - $this->log('Checking ' . $queue, self::LOG_VERBOSE); - $job = Resque_Job::reserve($queue); + $this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE); + $job = Resque_Job::reserve($queue, $interval); if($job) { $this->log('Found job on ' . $queue, self::LOG_VERBOSE); return $job; From 5eff86d3c3c54eedcfeecc515e02805d70d66ac2 Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Mon, 10 Sep 2012 10:23:18 +0200 Subject: [PATCH 2/6] Fixed bug --- lib/Resque.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Resque.php b/lib/Resque.php index 4fe5a16..29249f0 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -122,7 +122,7 @@ class Resque if($interval == null) { $item = self::redis()->lpop('queue:' . $queue); } else { - $item = self::redis()->blpop('queue:' . $queue, $interval ?: Resque::DEFAULT_INTERVAL); + $item = self::redis()->blpop('queue:' . $queue, $interval ? $interval : Resque::DEFAULT_INTERVAL); } if(!$item) { From ff0d2bc655339fa7d2cc284e97a36cc5bf96a910 Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Tue, 12 Mar 2013 09:55:03 +0100 Subject: [PATCH 3/6] Added test for blpop --- test/Resque/Tests/WorkerTest.php | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index b2f0e00..b0001c9 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -247,4 +247,27 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase $this->assertEquals(1, Resque_Stat::get('failed')); } + + public function testBlockingListPop() + { + $worker = new Resque_Worker('jobs'); + $worker->registerWorker(); + + Resque::enqueue('jobs', 'Test_Job_1'); + Resque::enqueue('jobs', 'Test_Job_2'); + + $i = 1; + while($job = $worker->reserve(60)) + { + $this->assertEquals('Test_Job_' . $i, $job->payload['class']); + + if($i == 2) { + break; + } + + $i++; + } + + $this->assertEquals(2, $i); + } } \ No newline at end of file From 132531e1a2a2d1e0905c353914e5b0511e1bb77b Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Tue, 12 Mar 2013 11:18:37 +0100 Subject: [PATCH 4/6] WIP --- bin/resque | 1 - demo/job.php | 5 +- demo/queue.php | 2 +- lib/Resque.php | 33 +++++++-- lib/Resque/Job.php | 49 ++++++++++---- lib/Resque/Redis.php | 8 ++- lib/Resque/Worker.php | 154 ++++++++++++++++++++++-------------------- 7 files changed, 150 insertions(+), 102 deletions(-) diff --git a/bin/resque b/bin/resque index 78befad..2cb5ffd 100644 --- a/bin/resque +++ b/bin/resque @@ -1,4 +1,3 @@ -#!/usr/bin/env php '); + sleep(1); + fwrite(STDOUT, 'Job ended!' . PHP_EOL); } } ?> \ No newline at end of file diff --git a/demo/queue.php b/demo/queue.php index 95ff5b0..393df87 100644 --- a/demo/queue.php +++ b/demo/queue.php @@ -14,6 +14,6 @@ $args = array( ), ); -$jobId = Resque::enqueue('default', $argv[1], $args, true); +$jobId = Resque::enqueue($argv[1], $argv[2], $args, true); echo "Queued job ".$jobId."\n\n"; ?> \ No newline at end of file diff --git a/lib/Resque.php b/lib/Resque.php index bd29e84..2e459f0 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -110,21 +110,40 @@ class Resque * @param string $queue The name of the queue to fetch an item from. * @return array Decoded item from the queue. */ - public static function pop($queue, $interval = null) + public static function pop($queue) { - if($interval == null) { - $item = self::redis()->lpop('queue:' . $queue); - } else { - $item = self::redis()->blpop('queue:' . $queue, $interval ? $interval : Resque::DEFAULT_INTERVAL); - } + $item = self::redis()->lpop('queue:' . $queue); if(!$item) { return; } - return json_decode($interval == 0 ? $item : $item[1], true); + return json_decode($item, true); } + /** + * Pop an item off the end of the specified queue, decode it and + * return it. + * + * @param string $queue The name of the queue to fetch an item from. + * @return array Decoded item from the queue. + */ + public static function blpop($queues, $interval = null) + { + $list = array(); + foreach($queues AS $queue) { + $list[] = 'queue:' . $queue; + } + + $item = self::redis()->blpop($list, $interval ? (int)$interval : Resque::DEFAULT_INTERVAL); + + if(!$item) { + return; + } + + return json_decode($item[1], true); + } + /** * Return the size (number of pending jobs) of the specified queue. * diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 5066c27..0b1f01e 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -71,22 +71,41 @@ class Resque_Job return $id; } - /** - * Find the next available job from the specified queue and return an - * instance of Resque_Job for it. - * - * @param string $queue The name of the queue to check for a job in. - * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. - */ - public static function reserve($queue, $interval = null) - { - $payload = Resque::pop($queue, $interval); - if(!is_array($payload)) { - return false; - } + /** + * Find the next available job from the specified queue and return an + * instance of Resque_Job for it. + * + * @param string $queue The name of the queue to check for a job in. + * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserve($queue) + { + $payload = Resque::pop($queue); + if(!is_array($payload)) { + return false; + } - return new Resque_Job($queue, $payload); - } + return new Resque_Job($queue, $payload); + } + + /** + * Find the next available job from the specified queue and return an + * instance of Resque_Job for it. + * + * @param string $queue The name of the queue to check for a job in. + * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserveBlocking($queues, $interval = null) + { + $payload = Resque::blpop($queues, $interval); + if(!is_array($payload)) { + return false; + } + + var_dump($payload); + + return new Resque_Job($payload->queue, $payload); + } /** * Update the status of the current job. diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 4447146..6cef3d2 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -143,7 +143,13 @@ class Resque_Redis */ public function __call($name, $args) { if(in_array($name, $this->keyCommands)) { - $args[0] = self::$defaultNamespace . $args[0]; + if(is_array($args[0])) { + foreach($args[0] AS $i => $v) { + $args[0][$i] = self::$defaultNamespace . $v; + } + } else { + $args[0] = self::$defaultNamespace . $args[0]; + } } try { return $this->driver->__call($name, $args); diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 204aae6..03487f5 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -152,70 +152,53 @@ class Resque_Worker $this->updateProcLine('Starting'); $this->startup(); - while(true) { - if($this->shutdown) { - break; - } - - // Attempt to find and reserve a job - $job = false; - if(!$this->paused) { - $job = $this->reserve($interval); - } - + while($job = $this->reserveBlocking($interval)) { $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); - if(!$job) { - // For an interval of 0, break now - helps with unit testing etc - if($interval == 0) { - break; - } + $this->log('got ' . $job); + Resque_Event::trigger('beforeFork', $job); + $this->workingOn($job); - // If no job was found, we sleep for $interval before continuing and checking again - if($this->paused) { - $this->updateProcLine('Paused'); - usleep($interval * 1000000); //it's paused, so don't hog redis with requests. - } + $this->child = Resque::fork(); - continue; + // Forked and we're the child. Run the job. + if ($this->child === 0 || $this->child === false) { + $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->log($status, self::LOG_VERBOSE); + $this->perform($job); + if ($this->child === 0) { + exit(0); + } } - $this->log('got ' . $job); - Resque_Event::trigger('beforeFork', $job); - $this->workingOn($job); + if($this->child > 0) { + // Parent process, sit and wait + $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->log($status, self::LOG_VERBOSE); - $this->child = Resque::fork(); + // Wait until the child process finishes before continuing + pcntl_wait($status); + $exitStatus = pcntl_wexitstatus($status); + if($exitStatus !== 0) { + $job->fail(new Resque_Job_DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + } + } - // Forked and we're the child. Run the job. - if ($this->child === 0 || $this->child === false) { - $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); - $this->perform($job); - if ($this->child === 0) { - exit(0); - } - } + $this->child = null; + $this->doneWorking(); - if($this->child > 0) { - // Parent process, sit and wait - $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); + if($this->shutdown) { + break; + } - // Wait until the child process finishes before continuing - pcntl_wait($status); - $exitStatus = pcntl_wexitstatus($status); - if($exitStatus !== 0) { - $job->fail(new Resque_Job_DirtyExitException( - 'Job exited with exit code ' . $exitStatus - )); - } - } - - $this->child = null; - $this->doneWorking(); - } + if($this->paused) { + break; + } + } $this->unregisterWorker(); } @@ -241,28 +224,49 @@ class Resque_Worker $this->log('done ' . $job); } - /** - * Attempt to find a job from the top of one of the queues for this worker. - * - * @return object|boolean Instance of Resque_Job if a job is found, false if not. - */ - public function reserve($interval = null) - { - $queues = $this->queues(); - if(!is_array($queues)) { - return; - } - foreach($queues as $queue) { - $this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE); - $job = Resque_Job::reserve($queue, $interval); - if($job) { - $this->log('Found job on ' . $queue, self::LOG_VERBOSE); - return $job; - } - } + /** + * Attempt to find a job from the top of one of the queues for this worker. + * + * @return object|boolean Instance of Resque_Job if a job is found, false if not. + */ + public function reserve() + { + $queues = $this->queues(); + if(!is_array($queues)) { + return; + } + foreach($queues as $queue) { + $this->log('Checking ' . $queue, self::LOG_VERBOSE); + $job = Resque_Job::reserve($queue); + if($job) { + $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + return $job; + } + } - return false; - } + return false; + } + + /** + * Attempt to find a job from the top of one of the queues for this worker. + * + * @return object|boolean Instance of Resque_Job if a job is found, false if not. + */ + public function reserveBlocking($interval = null) + { + $queues = $this->queues(); + if(!is_array($queues)) { + return; + } + + $job = Resque_Job::reserveBlocking($queues, $interval); + if($job) { + $this->log('Found job on ' . $job->queue, self::LOG_VERBOSE); + return $job; + } + + return false; + } /** * Return an array containing all of the queues that this worker should use From c2c4d06f7b72a3d8c68c61ef443fbb0b00e87200 Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Wed, 13 Mar 2013 12:41:32 +0100 Subject: [PATCH 5/6] Working blocking list pop :) --- bin/resque | 106 +------ bin/resque.php | 105 +++++++ composer.lock | 416 +++++++++++++++++++++++++++- demo/bad_job.php | 3 +- demo/check_status.php | 3 +- demo/job.php | 3 +- demo/long_job.php | 3 +- demo/php_error_job.php | 3 +- demo/queue.php | 3 +- demo/resque.php | 3 +- lib/Resque.php | 17 +- lib/Resque/Job.php | 13 +- lib/Resque/Worker.php | 99 ++++--- test/Resque/Tests/JobStatusTest.php | 6 + 14 files changed, 606 insertions(+), 177 deletions(-) mode change 100644 => 100755 bin/resque create mode 100644 bin/resque.php diff --git a/bin/resque b/bin/resque old mode 100644 new mode 100755 index 2cb5ffd..5475c98 --- a/bin/resque +++ b/bin/resque @@ -1,105 +1,3 @@ +#!/usr/bin/env php 1) { - $count = $COUNT; -} - -if($count > 1) { - for($i = 0; $i < $count; ++$i) { - $pid = Resque::fork(); - if($pid == -1) { - die("Could not fork worker ".$i."\n"); - } - // Child, start the worker - else if(!$pid) { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; - fwrite(STDOUT, '*** Starting worker '.$worker."\n"); - $worker->work($interval); - break; - } - } -} -// Start a single worker -else { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; - - $PIDFILE = getenv('PIDFILE'); - if ($PIDFILE) { - file_put_contents($PIDFILE, getmypid()) or - die('Could not write PID information to ' . $PIDFILE); - } - - fwrite(STDOUT, '*** Starting worker '.$worker."\n"); - $worker->work($interval); -} -?> +include("resque.php"); \ No newline at end of file diff --git a/bin/resque.php b/bin/resque.php new file mode 100644 index 0000000..3fc788f --- /dev/null +++ b/bin/resque.php @@ -0,0 +1,105 @@ + 1) { + $count = $COUNT; +} + +if($count > 1) { + for($i = 0; $i < $count; ++$i) { + $pid = Resque::fork(); + if($pid == -1) { + die("Could not fork worker ".$i."\n"); + } + // Child, start the worker + else if(!$pid) { + $queues = explode(',', $QUEUE); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->work($interval, $BLOCKING); + break; + } + } +} +// Start a single worker +else { + $queues = explode(',', $QUEUE); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + + $PIDFILE = getenv('PIDFILE'); + if ($PIDFILE) { + file_put_contents($PIDFILE, getmypid()) or + die('Could not write PID information to ' . $PIDFILE); + } + + fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->work($interval, $BLOCKING); +} \ No newline at end of file diff --git a/composer.lock b/composer.lock index 877c243..760d4cf 100644 --- a/composer.lock +++ b/composer.lock @@ -18,7 +18,6 @@ "require": { "php": ">=5.3.0" }, - "time": "2013-01-12 10:15:31", "type": "library", "autoload": { "classmap": [ @@ -39,15 +38,424 @@ "homepage": "https://github.com/colinmollenhour/credis", "support": { "source": "https://github.com/chrisboulton/credis/tree/master" - } + }, + "time": "2013-01-12 10:15:31" + } + ], + "packages-dev": [ + { + "name": "phpunit/php-code-coverage", + "version": "1.2.9", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-code-coverage.git", + "reference": "1.2.9" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/1.2.9", + "reference": "1.2.9", + "shasum": "" + }, + "require": { + "php": ">=5.3.3", + "phpunit/php-file-iterator": ">=1.3.0@stable", + "phpunit/php-text-template": ">=1.1.1@stable", + "phpunit/php-token-stream": ">=1.1.3@stable" + }, + "suggest": { + "ext-dom": "*", + "ext-xdebug": ">=2.0.5" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHP/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Library that provides collection, processing, and rendering functionality for PHP code coverage information.", + "homepage": "https://github.com/sebastianbergmann/php-code-coverage", + "keywords": [ + "coverage", + "testing", + "xunit" + ], + "time": "2013-02-26 18:55:56" + }, + { + "name": "phpunit/php-file-iterator", + "version": "1.3.3", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/php-file-iterator.git", + "reference": "1.3.3" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/php-file-iterator/zipball/1.3.3", + "reference": "1.3.3", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "File/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "FilterIterator implementation that filters files based on a list of suffixes.", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "filesystem", + "iterator" + ], + "time": "2012-10-11 04:44:38" + }, + { + "name": "phpunit/php-text-template", + "version": "1.1.4", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/php-text-template.git", + "reference": "1.1.4" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/php-text-template/zipball/1.1.4", + "reference": "1.1.4", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "Text/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Simple template engine.", + "homepage": "https://github.com/sebastianbergmann/php-text-template/", + "keywords": [ + "template" + ], + "time": "2012-10-31 11:15:28" + }, + { + "name": "phpunit/php-timer", + "version": "1.0.4", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/php-timer.git", + "reference": "1.0.4" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/php-timer/zipball/1.0.4", + "reference": "1.0.4", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHP/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Utility class for timing", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "timer" + ], + "time": "2012-10-11 04:45:58" + }, + { + "name": "phpunit/php-token-stream", + "version": "1.1.5", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/php-token-stream.git", + "reference": "1.1.5" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/php-token-stream/zipball/1.1.5", + "reference": "1.1.5", + "shasum": "" + }, + "require": { + "ext-tokenizer": "*", + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHP/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Wrapper around PHP's tokenizer extension.", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "tokenizer" + ], + "time": "2012-10-11 04:47:14" + }, + { + "name": "phpunit/phpunit", + "version": "3.7.18", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/phpunit.git", + "reference": "3.7.18" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/3.7.18", + "reference": "3.7.18", + "shasum": "" + }, + "require": { + "ext-dom": "*", + "ext-pcre": "*", + "ext-reflection": "*", + "ext-spl": "*", + "php": ">=5.3.3", + "phpunit/php-code-coverage": ">=1.2.1,<1.3.0", + "phpunit/php-file-iterator": ">=1.3.1", + "phpunit/php-text-template": ">=1.1.1", + "phpunit/php-timer": ">=1.0.2,<1.1.0", + "phpunit/phpunit-mock-objects": ">=1.2.0,<1.3.0", + "symfony/yaml": ">=2.2.0" + }, + "require-dev": { + "pear-pear/pear": "1.9.4" + }, + "suggest": { + "ext-json": "*", + "ext-simplexml": "*", + "ext-tokenizer": "*", + "phpunit/php-invoker": ">=1.1.0,<1.2.0" + }, + "bin": [ + "composer/bin/phpunit" + ], + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.7.x-dev" + } + }, + "autoload": { + "classmap": [ + "PHPUnit/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "", + "../../symfony/yaml/" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sebastian@phpunit.de", + "role": "lead" + } + ], + "description": "The PHP Unit Testing framework.", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "phpunit", + "testing", + "xunit" + ], + "time": "2013-03-07 21:45:39" + }, + { + "name": "phpunit/phpunit-mock-objects", + "version": "1.2.3", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/phpunit-mock-objects.git", + "reference": "1.2.3" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/phpunit-mock-objects/archive/1.2.3.zip", + "reference": "1.2.3", + "shasum": "" + }, + "require": { + "php": ">=5.3.3", + "phpunit/php-text-template": ">=1.1.1@stable" + }, + "suggest": { + "ext-soap": "*" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHPUnit/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Mock Object library for PHPUnit", + "homepage": "https://github.com/sebastianbergmann/phpunit-mock-objects/", + "keywords": [ + "mock", + "xunit" + ], + "time": "2013-01-13 10:24:48" + }, + { + "name": "symfony/yaml", + "version": "v2.2.0", + "target-dir": "Symfony/Component/Yaml", + "source": { + "type": "git", + "url": "https://github.com/symfony/Yaml.git", + "reference": "v2.2.0-RC3" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/Yaml/zipball/v2.2.0-RC3", + "reference": "v2.2.0-RC3", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.2-dev" + } + }, + "autoload": { + "psr-0": { + "Symfony\\Component\\Yaml\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "http://symfony.com/contributors" + } + ], + "description": "Symfony Yaml Component", + "homepage": "http://symfony.com", + "time": "2013-01-27 16:49:19" } ], - "packages-dev": null, "aliases": [ ], "minimum-stability": "stable", "stability-flags": { "colinmollenhour/credis": 20 - } + }, + "platform": { + "php": ">=5.3.0" + }, + "platform-dev": [ + + ] } diff --git a/demo/bad_job.php b/demo/bad_job.php index bc12620..cd719cc 100644 --- a/demo/bad_job.php +++ b/demo/bad_job.php @@ -5,5 +5,4 @@ class Bad_PHP_Job { throw new Exception('Unable to run this job!'); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/check_status.php b/demo/check_status.php index 061a83a..645bf6d 100644 --- a/demo/check_status.php +++ b/demo/check_status.php @@ -17,5 +17,4 @@ echo "Tracking status of ".$argv[1].". Press [break] to stop.\n\n"; while(true) { fwrite(STDOUT, "Status of ".$argv[1]." is: ".$status->get()."\n"); sleep(1); -} -?> +} \ No newline at end of file diff --git a/demo/job.php b/demo/job.php index ec3d143..ddda893 100644 --- a/demo/job.php +++ b/demo/job.php @@ -7,5 +7,4 @@ class PHP_Job sleep(1); fwrite(STDOUT, 'Job ended!' . PHP_EOL); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/long_job.php b/demo/long_job.php index 8c9f0f9..1cfe5cb 100644 --- a/demo/long_job.php +++ b/demo/long_job.php @@ -5,5 +5,4 @@ class Long_PHP_Job { sleep(600); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/php_error_job.php b/demo/php_error_job.php index 93bf2bc..9824405 100644 --- a/demo/php_error_job.php +++ b/demo/php_error_job.php @@ -5,5 +5,4 @@ class PHP_Error_Job { callToUndefinedFunction(); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/queue.php b/demo/queue.php index 393df87..52f2f0b 100644 --- a/demo/queue.php +++ b/demo/queue.php @@ -15,5 +15,4 @@ $args = array( ); $jobId = Resque::enqueue($argv[1], $argv[2], $args, true); -echo "Queued job ".$jobId."\n\n"; -?> \ No newline at end of file +echo "Queued job ".$jobId."\n\n"; \ No newline at end of file diff --git a/demo/resque.php b/demo/resque.php index cdaf756..3dfa5a9 100644 --- a/demo/resque.php +++ b/demo/resque.php @@ -4,5 +4,4 @@ require 'bad_job.php'; require 'job.php'; require 'php_error_job.php'; -require '../bin/resque'; -?> \ No newline at end of file +require '../bin/resque.php'; \ No newline at end of file diff --git a/lib/Resque.php b/lib/Resque.php index 2e459f0..d65680b 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -122,20 +122,21 @@ class Resque } /** - * Pop an item off the end of the specified queue, decode it and - * return it. + * Pop an item off the end of the specified queues, using blocking list pop, + * decode it and return it. * - * @param string $queue The name of the queue to fetch an item from. - * @return array Decoded item from the queue. + * @param array $queues + * @param int $timeout + * @return null|array Decoded item from the queue. */ - public static function blpop($queues, $interval = null) + public static function blpop(array $queues, $timeout) { $list = array(); foreach($queues AS $queue) { $list[] = 'queue:' . $queue; } - $item = self::redis()->blpop($list, $interval ? (int)$interval : Resque::DEFAULT_INTERVAL); + $item = self::redis()->blpop($list, (int)$timeout); if(!$item) { return; @@ -186,9 +187,9 @@ class Resque * @param string $queue Queue to fetch next available job from. * @return Resque_Job Instance of Resque_Job to be processed, false if none or error. */ - public static function reserve($queue, $interval = null) + public static function reserve($queue) { - return Resque_Job::reserve($queue, $interval); + return Resque_Job::reserve($queue); } /** diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 0b1f01e..ff1dd5c 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -89,21 +89,20 @@ class Resque_Job } /** - * Find the next available job from the specified queue and return an - * instance of Resque_Job for it. + * Find the next available job from the specified queues using blocking list pop + * and return an instance of Resque_Job for it. * - * @param string $queue The name of the queue to check for a job in. + * @param array $queues + * @param int $timeout * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. */ - public static function reserveBlocking($queues, $interval = null) + public static function reserveBlocking(array $queues, $timeout = null) { - $payload = Resque::blpop($queues, $interval); + $payload = Resque::blpop($queues, $timeout); if(!is_array($payload)) { return false; } - var_dump($payload); - return new Resque_Job($payload->queue, $payload); } diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 03487f5..3a730d8 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -147,13 +147,52 @@ class Resque_Worker * * @param int $interval How often to check for new jobs across the queues. */ - public function work($interval = Resque::DEFAULT_INTERVAL) + public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) { $this->updateProcLine('Starting'); $this->startup(); - while($job = $this->reserveBlocking($interval)) { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + while(true) { + if($this->shutdown) { + break; + } + + // Attempt to find and reserve a job + $job = false; + if(!$this->paused) { + if($blocking === true) { + $this->log('Starting blocking with timeout of ' . $interval, self::LOG_VERBOSE); + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); + } else { + $this->log('Starting with interval of ' . $interval, self::LOG_VERBOSE); + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + } + + $job = $this->reserve($blocking, $interval); + } + + if(!$job) { + // For an interval of 0, break now - helps with unit testing etc + if($interval == 0) { + break; + } + + if($blocking === false) + { + // If no job was found, we sleep for $interval before continuing and checking again + $this->log('Sleeping for ' . $interval, self::LOG_VERBOSE); + if($this->paused) { + $this->updateProcLine('Paused'); + } + else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + } + + usleep($interval * 1000000); + } + + continue; + } $this->log('got ' . $job); Resque_Event::trigger('beforeFork', $job); @@ -190,14 +229,6 @@ class Resque_Worker $this->child = null; $this->doneWorking(); - - if($this->shutdown) { - break; - } - - if($this->paused) { - break; - } } $this->unregisterWorker(); @@ -225,44 +256,32 @@ class Resque_Worker } /** - * Attempt to find a job from the top of one of the queues for this worker. - * - * @return object|boolean Instance of Resque_Job if a job is found, false if not. + * @param bool $blocking + * @param int $timeout + * @return object|boolean Instance of Resque_Job if a job is found, false if not. */ - public function reserve() + public function reserve($blocking = false, $timeout = null) { $queues = $this->queues(); if(!is_array($queues)) { return; } - foreach($queues as $queue) { - $this->log('Checking ' . $queue, self::LOG_VERBOSE); - $job = Resque_Job::reserve($queue); + + if($blocking === true) { + $job = Resque_Job::reserveBlocking($queues, $timeout); if($job) { - $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + $this->log('Found job on ' . $job->queue, self::LOG_VERBOSE); return $job; } - } - - return false; - } - - /** - * Attempt to find a job from the top of one of the queues for this worker. - * - * @return object|boolean Instance of Resque_Job if a job is found, false if not. - */ - public function reserveBlocking($interval = null) - { - $queues = $this->queues(); - if(!is_array($queues)) { - return; - } - - $job = Resque_Job::reserveBlocking($queues, $interval); - if($job) { - $this->log('Found job on ' . $job->queue, self::LOG_VERBOSE); - return $job; + } else { + foreach($queues as $queue) { + $this->log('Checking ' . $queue, self::LOG_VERBOSE); + $job = Resque_Job::reserve($queue); + if($job) { + $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + return $job; + } + } } return false; diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php index 3a12b4b..68a25ff 100644 --- a/test/Resque/Tests/JobStatusTest.php +++ b/test/Resque/Tests/JobStatusTest.php @@ -8,6 +8,11 @@ */ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase { + /** + * @var \Resque_Worker + */ + protected $worker; + public function setUp() { parent::setUp(); @@ -36,6 +41,7 @@ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase $status = new Resque_Job_Status($token); $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get()); } + public function testRunningJobReturnsRunningStatus() { $token = Resque::enqueue('jobs', 'Failing_Job', null, true); From 66fec608a1efc499c151e5c7790626fc235ed44f Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Wed, 13 Mar 2013 13:03:19 +0100 Subject: [PATCH 6/6] Fixed unit test --- lib/Resque/Worker.php | 1 - test/Resque/Tests/WorkerTest.php | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 3a730d8..870d4c8 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -164,7 +164,6 @@ class Resque_Worker $this->log('Starting blocking with timeout of ' . $interval, self::LOG_VERBOSE); $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); } else { - $this->log('Starting with interval of ' . $interval, self::LOG_VERBOSE); $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); } diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index b0001c9..1ab7dd2 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -257,7 +257,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase Resque::enqueue('jobs', 'Test_Job_2'); $i = 1; - while($job = $worker->reserve(60)) + while($job = $worker->reserve(true, 1)) { $this->assertEquals('Test_Job_' . $i, $job->payload['class']);