From c2c4d06f7b72a3d8c68c61ef443fbb0b00e87200 Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Wed, 13 Mar 2013 12:41:32 +0100 Subject: [PATCH] Working blocking list pop :) --- bin/resque | 106 +------ bin/resque.php | 105 +++++++ composer.lock | 416 +++++++++++++++++++++++++++- demo/bad_job.php | 3 +- demo/check_status.php | 3 +- demo/job.php | 3 +- demo/long_job.php | 3 +- demo/php_error_job.php | 3 +- demo/queue.php | 3 +- demo/resque.php | 3 +- lib/Resque.php | 17 +- lib/Resque/Job.php | 13 +- lib/Resque/Worker.php | 99 ++++--- test/Resque/Tests/JobStatusTest.php | 6 + 14 files changed, 606 insertions(+), 177 deletions(-) mode change 100644 => 100755 bin/resque create mode 100644 bin/resque.php diff --git a/bin/resque b/bin/resque old mode 100644 new mode 100755 index 2cb5ffd..5475c98 --- a/bin/resque +++ b/bin/resque @@ -1,105 +1,3 @@ +#!/usr/bin/env php 1) { - $count = $COUNT; -} - -if($count > 1) { - for($i = 0; $i < $count; ++$i) { - $pid = Resque::fork(); - if($pid == -1) { - die("Could not fork worker ".$i."\n"); - } - // Child, start the worker - else if(!$pid) { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; - fwrite(STDOUT, '*** Starting worker '.$worker."\n"); - $worker->work($interval); - break; - } - } -} -// Start a single worker -else { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; - - $PIDFILE = getenv('PIDFILE'); - if ($PIDFILE) { - file_put_contents($PIDFILE, getmypid()) or - die('Could not write PID information to ' . $PIDFILE); - } - - fwrite(STDOUT, '*** Starting worker '.$worker."\n"); - $worker->work($interval); -} -?> +include("resque.php"); \ No newline at end of file diff --git a/bin/resque.php b/bin/resque.php new file mode 100644 index 0000000..3fc788f --- /dev/null +++ b/bin/resque.php @@ -0,0 +1,105 @@ + 1) { + $count = $COUNT; +} + +if($count > 1) { + for($i = 0; $i < $count; ++$i) { + $pid = Resque::fork(); + if($pid == -1) { + die("Could not fork worker ".$i."\n"); + } + // Child, start the worker + else if(!$pid) { + $queues = explode(',', $QUEUE); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->work($interval, $BLOCKING); + break; + } + } +} +// Start a single worker +else { + $queues = explode(',', $QUEUE); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + + $PIDFILE = getenv('PIDFILE'); + if ($PIDFILE) { + file_put_contents($PIDFILE, getmypid()) or + die('Could not write PID information to ' . $PIDFILE); + } + + fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->work($interval, $BLOCKING); +} \ No newline at end of file diff --git a/composer.lock b/composer.lock index 877c243..760d4cf 100644 --- a/composer.lock +++ b/composer.lock @@ -18,7 +18,6 @@ "require": { "php": ">=5.3.0" }, - "time": "2013-01-12 10:15:31", "type": "library", "autoload": { "classmap": [ @@ -39,15 +38,424 @@ "homepage": "https://github.com/colinmollenhour/credis", "support": { "source": "https://github.com/chrisboulton/credis/tree/master" - } + }, + "time": "2013-01-12 10:15:31" + } + ], + "packages-dev": [ + { + "name": "phpunit/php-code-coverage", + "version": "1.2.9", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-code-coverage.git", + "reference": "1.2.9" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/1.2.9", + "reference": "1.2.9", + "shasum": "" + }, + "require": { + "php": ">=5.3.3", + "phpunit/php-file-iterator": ">=1.3.0@stable", + "phpunit/php-text-template": ">=1.1.1@stable", + "phpunit/php-token-stream": ">=1.1.3@stable" + }, + "suggest": { + "ext-dom": "*", + "ext-xdebug": ">=2.0.5" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHP/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Library that provides collection, processing, and rendering functionality for PHP code coverage information.", + "homepage": "https://github.com/sebastianbergmann/php-code-coverage", + "keywords": [ + "coverage", + "testing", + "xunit" + ], + "time": "2013-02-26 18:55:56" + }, + { + "name": "phpunit/php-file-iterator", + "version": "1.3.3", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/php-file-iterator.git", + "reference": "1.3.3" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/php-file-iterator/zipball/1.3.3", + "reference": "1.3.3", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "File/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "FilterIterator implementation that filters files based on a list of suffixes.", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "filesystem", + "iterator" + ], + "time": "2012-10-11 04:44:38" + }, + { + "name": "phpunit/php-text-template", + "version": "1.1.4", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/php-text-template.git", + "reference": "1.1.4" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/php-text-template/zipball/1.1.4", + "reference": "1.1.4", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "Text/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Simple template engine.", + "homepage": "https://github.com/sebastianbergmann/php-text-template/", + "keywords": [ + "template" + ], + "time": "2012-10-31 11:15:28" + }, + { + "name": "phpunit/php-timer", + "version": "1.0.4", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/php-timer.git", + "reference": "1.0.4" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/php-timer/zipball/1.0.4", + "reference": "1.0.4", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHP/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Utility class for timing", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "timer" + ], + "time": "2012-10-11 04:45:58" + }, + { + "name": "phpunit/php-token-stream", + "version": "1.1.5", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/php-token-stream.git", + "reference": "1.1.5" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/php-token-stream/zipball/1.1.5", + "reference": "1.1.5", + "shasum": "" + }, + "require": { + "ext-tokenizer": "*", + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHP/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Wrapper around PHP's tokenizer extension.", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "tokenizer" + ], + "time": "2012-10-11 04:47:14" + }, + { + "name": "phpunit/phpunit", + "version": "3.7.18", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/phpunit.git", + "reference": "3.7.18" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/3.7.18", + "reference": "3.7.18", + "shasum": "" + }, + "require": { + "ext-dom": "*", + "ext-pcre": "*", + "ext-reflection": "*", + "ext-spl": "*", + "php": ">=5.3.3", + "phpunit/php-code-coverage": ">=1.2.1,<1.3.0", + "phpunit/php-file-iterator": ">=1.3.1", + "phpunit/php-text-template": ">=1.1.1", + "phpunit/php-timer": ">=1.0.2,<1.1.0", + "phpunit/phpunit-mock-objects": ">=1.2.0,<1.3.0", + "symfony/yaml": ">=2.2.0" + }, + "require-dev": { + "pear-pear/pear": "1.9.4" + }, + "suggest": { + "ext-json": "*", + "ext-simplexml": "*", + "ext-tokenizer": "*", + "phpunit/php-invoker": ">=1.1.0,<1.2.0" + }, + "bin": [ + "composer/bin/phpunit" + ], + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.7.x-dev" + } + }, + "autoload": { + "classmap": [ + "PHPUnit/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "", + "../../symfony/yaml/" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sebastian@phpunit.de", + "role": "lead" + } + ], + "description": "The PHP Unit Testing framework.", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "phpunit", + "testing", + "xunit" + ], + "time": "2013-03-07 21:45:39" + }, + { + "name": "phpunit/phpunit-mock-objects", + "version": "1.2.3", + "source": { + "type": "git", + "url": "git://github.com/sebastianbergmann/phpunit-mock-objects.git", + "reference": "1.2.3" + }, + "dist": { + "type": "zip", + "url": "https://github.com/sebastianbergmann/phpunit-mock-objects/archive/1.2.3.zip", + "reference": "1.2.3", + "shasum": "" + }, + "require": { + "php": ">=5.3.3", + "phpunit/php-text-template": ">=1.1.1@stable" + }, + "suggest": { + "ext-soap": "*" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHPUnit/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Mock Object library for PHPUnit", + "homepage": "https://github.com/sebastianbergmann/phpunit-mock-objects/", + "keywords": [ + "mock", + "xunit" + ], + "time": "2013-01-13 10:24:48" + }, + { + "name": "symfony/yaml", + "version": "v2.2.0", + "target-dir": "Symfony/Component/Yaml", + "source": { + "type": "git", + "url": "https://github.com/symfony/Yaml.git", + "reference": "v2.2.0-RC3" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/Yaml/zipball/v2.2.0-RC3", + "reference": "v2.2.0-RC3", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.2-dev" + } + }, + "autoload": { + "psr-0": { + "Symfony\\Component\\Yaml\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "http://symfony.com/contributors" + } + ], + "description": "Symfony Yaml Component", + "homepage": "http://symfony.com", + "time": "2013-01-27 16:49:19" } ], - "packages-dev": null, "aliases": [ ], "minimum-stability": "stable", "stability-flags": { "colinmollenhour/credis": 20 - } + }, + "platform": { + "php": ">=5.3.0" + }, + "platform-dev": [ + + ] } diff --git a/demo/bad_job.php b/demo/bad_job.php index bc12620..cd719cc 100644 --- a/demo/bad_job.php +++ b/demo/bad_job.php @@ -5,5 +5,4 @@ class Bad_PHP_Job { throw new Exception('Unable to run this job!'); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/check_status.php b/demo/check_status.php index 061a83a..645bf6d 100644 --- a/demo/check_status.php +++ b/demo/check_status.php @@ -17,5 +17,4 @@ echo "Tracking status of ".$argv[1].". Press [break] to stop.\n\n"; while(true) { fwrite(STDOUT, "Status of ".$argv[1]." is: ".$status->get()."\n"); sleep(1); -} -?> +} \ No newline at end of file diff --git a/demo/job.php b/demo/job.php index ec3d143..ddda893 100644 --- a/demo/job.php +++ b/demo/job.php @@ -7,5 +7,4 @@ class PHP_Job sleep(1); fwrite(STDOUT, 'Job ended!' . PHP_EOL); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/long_job.php b/demo/long_job.php index 8c9f0f9..1cfe5cb 100644 --- a/demo/long_job.php +++ b/demo/long_job.php @@ -5,5 +5,4 @@ class Long_PHP_Job { sleep(600); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/php_error_job.php b/demo/php_error_job.php index 93bf2bc..9824405 100644 --- a/demo/php_error_job.php +++ b/demo/php_error_job.php @@ -5,5 +5,4 @@ class PHP_Error_Job { callToUndefinedFunction(); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/queue.php b/demo/queue.php index 393df87..52f2f0b 100644 --- a/demo/queue.php +++ b/demo/queue.php @@ -15,5 +15,4 @@ $args = array( ); $jobId = Resque::enqueue($argv[1], $argv[2], $args, true); -echo "Queued job ".$jobId."\n\n"; -?> \ No newline at end of file +echo "Queued job ".$jobId."\n\n"; \ No newline at end of file diff --git a/demo/resque.php b/demo/resque.php index cdaf756..3dfa5a9 100644 --- a/demo/resque.php +++ b/demo/resque.php @@ -4,5 +4,4 @@ require 'bad_job.php'; require 'job.php'; require 'php_error_job.php'; -require '../bin/resque'; -?> \ No newline at end of file +require '../bin/resque.php'; \ No newline at end of file diff --git a/lib/Resque.php b/lib/Resque.php index 2e459f0..d65680b 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -122,20 +122,21 @@ class Resque } /** - * Pop an item off the end of the specified queue, decode it and - * return it. + * Pop an item off the end of the specified queues, using blocking list pop, + * decode it and return it. * - * @param string $queue The name of the queue to fetch an item from. - * @return array Decoded item from the queue. + * @param array $queues + * @param int $timeout + * @return null|array Decoded item from the queue. */ - public static function blpop($queues, $interval = null) + public static function blpop(array $queues, $timeout) { $list = array(); foreach($queues AS $queue) { $list[] = 'queue:' . $queue; } - $item = self::redis()->blpop($list, $interval ? (int)$interval : Resque::DEFAULT_INTERVAL); + $item = self::redis()->blpop($list, (int)$timeout); if(!$item) { return; @@ -186,9 +187,9 @@ class Resque * @param string $queue Queue to fetch next available job from. * @return Resque_Job Instance of Resque_Job to be processed, false if none or error. */ - public static function reserve($queue, $interval = null) + public static function reserve($queue) { - return Resque_Job::reserve($queue, $interval); + return Resque_Job::reserve($queue); } /** diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 0b1f01e..ff1dd5c 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -89,21 +89,20 @@ class Resque_Job } /** - * Find the next available job from the specified queue and return an - * instance of Resque_Job for it. + * Find the next available job from the specified queues using blocking list pop + * and return an instance of Resque_Job for it. * - * @param string $queue The name of the queue to check for a job in. + * @param array $queues + * @param int $timeout * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. */ - public static function reserveBlocking($queues, $interval = null) + public static function reserveBlocking(array $queues, $timeout = null) { - $payload = Resque::blpop($queues, $interval); + $payload = Resque::blpop($queues, $timeout); if(!is_array($payload)) { return false; } - var_dump($payload); - return new Resque_Job($payload->queue, $payload); } diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 03487f5..3a730d8 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -147,13 +147,52 @@ class Resque_Worker * * @param int $interval How often to check for new jobs across the queues. */ - public function work($interval = Resque::DEFAULT_INTERVAL) + public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) { $this->updateProcLine('Starting'); $this->startup(); - while($job = $this->reserveBlocking($interval)) { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + while(true) { + if($this->shutdown) { + break; + } + + // Attempt to find and reserve a job + $job = false; + if(!$this->paused) { + if($blocking === true) { + $this->log('Starting blocking with timeout of ' . $interval, self::LOG_VERBOSE); + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); + } else { + $this->log('Starting with interval of ' . $interval, self::LOG_VERBOSE); + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + } + + $job = $this->reserve($blocking, $interval); + } + + if(!$job) { + // For an interval of 0, break now - helps with unit testing etc + if($interval == 0) { + break; + } + + if($blocking === false) + { + // If no job was found, we sleep for $interval before continuing and checking again + $this->log('Sleeping for ' . $interval, self::LOG_VERBOSE); + if($this->paused) { + $this->updateProcLine('Paused'); + } + else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + } + + usleep($interval * 1000000); + } + + continue; + } $this->log('got ' . $job); Resque_Event::trigger('beforeFork', $job); @@ -190,14 +229,6 @@ class Resque_Worker $this->child = null; $this->doneWorking(); - - if($this->shutdown) { - break; - } - - if($this->paused) { - break; - } } $this->unregisterWorker(); @@ -225,44 +256,32 @@ class Resque_Worker } /** - * Attempt to find a job from the top of one of the queues for this worker. - * - * @return object|boolean Instance of Resque_Job if a job is found, false if not. + * @param bool $blocking + * @param int $timeout + * @return object|boolean Instance of Resque_Job if a job is found, false if not. */ - public function reserve() + public function reserve($blocking = false, $timeout = null) { $queues = $this->queues(); if(!is_array($queues)) { return; } - foreach($queues as $queue) { - $this->log('Checking ' . $queue, self::LOG_VERBOSE); - $job = Resque_Job::reserve($queue); + + if($blocking === true) { + $job = Resque_Job::reserveBlocking($queues, $timeout); if($job) { - $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + $this->log('Found job on ' . $job->queue, self::LOG_VERBOSE); return $job; } - } - - return false; - } - - /** - * Attempt to find a job from the top of one of the queues for this worker. - * - * @return object|boolean Instance of Resque_Job if a job is found, false if not. - */ - public function reserveBlocking($interval = null) - { - $queues = $this->queues(); - if(!is_array($queues)) { - return; - } - - $job = Resque_Job::reserveBlocking($queues, $interval); - if($job) { - $this->log('Found job on ' . $job->queue, self::LOG_VERBOSE); - return $job; + } else { + foreach($queues as $queue) { + $this->log('Checking ' . $queue, self::LOG_VERBOSE); + $job = Resque_Job::reserve($queue); + if($job) { + $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + return $job; + } + } } return false; diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php index 3a12b4b..68a25ff 100644 --- a/test/Resque/Tests/JobStatusTest.php +++ b/test/Resque/Tests/JobStatusTest.php @@ -8,6 +8,11 @@ */ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase { + /** + * @var \Resque_Worker + */ + protected $worker; + public function setUp() { parent::setUp(); @@ -36,6 +41,7 @@ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase $status = new Resque_Job_Status($token); $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get()); } + public function testRunningJobReturnsRunningStatus() { $token = Resque::enqueue('jobs', 'Failing_Job', null, true);