From 7aa9abdb2e6da5d7f84bd6fc7488e97c68c48351 Mon Sep 17 00:00:00 2001 From: Rockstar04 Date: Tue, 25 Jun 2013 08:56:28 -0700 Subject: [PATCH 1/4] Added PSR3 to composer dependencies --- composer.json | 3 +- composer.lock | 77 +++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 61 insertions(+), 19 deletions(-) diff --git a/composer.json b/composer.json index 6ea9b0d..fb868bc 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,8 @@ "require": { "php": ">=5.3.0", "ext-pcntl": "*", - "colinmollenhour/credis": "1.2.*" + "colinmollenhour/credis": "1.2.*", + "psr/log": "1.0.0" }, "suggest": { "ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.", diff --git a/composer.lock b/composer.lock index 7140535..1a84e92 100644 --- a/composer.lock +++ b/composer.lock @@ -3,7 +3,7 @@ "This file locks the dependencies of your project to a known state", "Read more about it at http://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file" ], - "hash": "1f551c3cdade1b7ff7d9e32a44eeb3dc", + "hash": "646010a06695709794f1bc38392e392f", "packages": [ { "name": "colinmollenhour/credis", @@ -42,21 +42,59 @@ "description": "Credis is a lightweight interface to the Redis key-value store which wraps the phpredis library when available for better performance.", "homepage": "https://github.com/colinmollenhour/credis", "time": "2013-03-19 02:57:04" + }, + { + "name": "psr/log", + "version": "1.0.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/log", + "reference": "1.0.0" + }, + "dist": { + "type": "zip", + "url": "https://github.com/php-fig/log/archive/1.0.0.zip", + "reference": "1.0.0", + "shasum": "" + }, + "type": "library", + "autoload": { + "psr-0": { + "Psr\\Log\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interface for logging libraries", + "keywords": [ + "log", + "psr", + "psr-3" + ], + "time": "2012-12-21 11:40:51" } ], "packages-dev": [ { "name": "phpunit/php-code-coverage", - "version": "1.2.9", + "version": "1.2.11", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-code-coverage.git", - "reference": "1.2.9" + "reference": "1.2.11" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/1.2.9", - "reference": "1.2.9", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/1.2.11", + "reference": "1.2.11", "shasum": "" }, "require": { @@ -65,6 +103,9 @@ "phpunit/php-text-template": ">=1.1.1@stable", "phpunit/php-token-stream": ">=1.1.3@stable" }, + "require-dev": { + "phpunit/phpunit": "3.7.*" + }, "suggest": { "ext-dom": "*", "ext-xdebug": ">=2.0.5" @@ -96,7 +137,7 @@ "testing", "xunit" ], - "time": "2013-02-26 18:55:56" + "time": "2013-05-23 18:23:24" }, { "name": "phpunit/php-file-iterator", @@ -278,16 +319,16 @@ }, { "name": "phpunit/phpunit", - "version": "3.7.18", + "version": "3.7.21", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "3.7.18" + "reference": "3.7.21" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/3.7.18", - "reference": "3.7.18", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/3.7.21", + "reference": "3.7.21", "shasum": "" }, "require": { @@ -301,7 +342,7 @@ "phpunit/php-text-template": ">=1.1.1", "phpunit/php-timer": ">=1.0.2,<1.1.0", "phpunit/phpunit-mock-objects": ">=1.2.0,<1.3.0", - "symfony/yaml": ">=2.2.0" + "symfony/yaml": ">=2.0,<3.0" }, "require-dev": { "pear-pear/pear": "1.9.4" @@ -348,7 +389,7 @@ "testing", "xunit" ], - "time": "2013-03-07 21:45:39" + "time": "2013-05-23 18:54:29" }, { "name": "phpunit/phpunit-mock-objects", @@ -401,17 +442,17 @@ }, { "name": "symfony/yaml", - "version": "v2.2.0", + "version": "v2.3.1", "target-dir": "Symfony/Component/Yaml", "source": { "type": "git", "url": "https://github.com/symfony/Yaml.git", - "reference": "v2.2.0-RC3" + "reference": "v2.3.1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/Yaml/zipball/v2.2.0-RC3", - "reference": "v2.2.0-RC3", + "url": "https://api.github.com/repos/symfony/Yaml/zipball/v2.3.1", + "reference": "v2.3.1", "shasum": "" }, "require": { @@ -420,7 +461,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "2.2-dev" + "dev-master": "2.3-dev" } }, "autoload": { @@ -444,7 +485,7 @@ ], "description": "Symfony Yaml Component", "homepage": "http://symfony.com", - "time": "2013-01-27 16:49:19" + "time": "2013-05-10 18:12:13" } ], "aliases": [ From 726e58a297a22aeb30607f97b01343993696a134 Mon Sep 17 00:00:00 2001 From: Rockstar04 Date: Tue, 25 Jun 2013 10:06:37 -0700 Subject: [PATCH 2/4] Added basic logging class for a fallback Corrected unit tests for new code Add partial unit coverage for logging class Add newline to logTest file --- bin/resque | 21 ++++---- lib/Resque/Log.php | 62 ++++++++++++++++++++++ lib/Resque/Worker.php | 80 ++++++++++++----------------- test/Resque/Tests/EventTest.php | 1 + test/Resque/Tests/JobStatusTest.php | 1 + test/Resque/Tests/JobTest.php | 1 + test/Resque/Tests/LogTest.php | 31 +++++++++++ test/Resque/Tests/WorkerTest.php | 20 ++++++++ 8 files changed, 161 insertions(+), 56 deletions(-) create mode 100644 lib/Resque/Log.php create mode 100644 test/Resque/Tests/LogTest.php diff --git a/bin/resque b/bin/resque index f0f1429..890c529 100755 --- a/bin/resque +++ b/bin/resque @@ -39,17 +39,19 @@ if(!empty($REDIS_BACKEND)) { Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB); } -$logLevel = 0; +$logLevel = false; $LOGGING = getenv('LOGGING'); $VERBOSE = getenv('VERBOSE'); $VVERBOSE = getenv('VVERBOSE'); if(!empty($LOGGING) || !empty($VERBOSE)) { - $logLevel = Resque_Worker::LOG_NORMAL; + $logLevel = true; } else if(!empty($VVERBOSE)) { - $logLevel = Resque_Worker::LOG_VERBOSE; + $logLevel = true; } +$logger = new Resque_Log($logLevel); + $APP_INCLUDE = getenv('APP_INCLUDE'); if($APP_INCLUDE) { if(!file_exists($APP_INCLUDE)) { @@ -75,7 +77,7 @@ if(!empty($COUNT) && $COUNT > 1) { $PREFIX = getenv('PREFIX'); if(!empty($PREFIX)) { - fwrite(STDOUT, '*** Prefix set to '.$PREFIX."\n"); + $logger->log(Psr\Log\LogLevel::INFO, 'Prefix set to {prefix}', array('prefix' => $PREFIX)); Resque_Redis::prefix($PREFIX); } @@ -83,14 +85,15 @@ if($count > 1) { for($i = 0; $i < $count; ++$i) { $pid = Resque::fork(); if($pid == -1) { - die("Could not fork worker ".$i."\n"); + $logger->log(Psr\Log\LogLevel::EMERGENCY, 'Could not fork worker {count}', array('count' => $i)); + die(); } // Child, start the worker else if(!$pid) { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; - fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->setLogger(new Resque_Log($logLevel)); + $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); $worker->work($interval, $BLOCKING); break; } @@ -100,7 +103,7 @@ if($count > 1) { else { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; + $worker->setLogger(new Resque_Log($logLevel)); $PIDFILE = getenv('PIDFILE'); if ($PIDFILE) { @@ -108,7 +111,7 @@ else { die('Could not write PID information to ' . $PIDFILE); } - fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); $worker->work($interval, $BLOCKING); } ?> \ No newline at end of file diff --git a/lib/Resque/Log.php b/lib/Resque/Log.php new file mode 100644 index 0000000..ce279cc --- /dev/null +++ b/lib/Resque/Log.php @@ -0,0 +1,62 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Log extends Psr\Log\AbstractLogger +{ + public $verbose; + + public function __construct($verbose = false) { + $this->verbose = $verbose; + } + + /** + * Logs with an arbitrary level. + * + * @param mixed $level PSR-3 log level constant, or equivalent string + * @param string $message Message to log, may contain a { placeholder } + * @param array $context Variables to replace { placeholder } + * @return null + */ + public function log($level, $message, array $context = array()) + { + if ($this->verbose) { + fwrite( + STDOUT, + '[' . $level . '] [' . strftime('%T %Y-%m-%d') . '] ' . $this->interpolate($message, $context) . PHP_EOL + ); + return; + } + + if (!($level === Psr\Log\LogLevel::INFO || $level === Psr\Log\LogLevel::DEBUG)) { + fwrite( + STDOUT, + '[' . $level . '] ' . $this->interpolate($message, $context) . PHP_EOL + ); + } + } + + /** + * Fill placeholders with the provided context + * @author Jordi Boggiano j.boggiano@seld.be + * + * @param string $message Message to be logged + * @param array $context Array of variables to use in message + * @return string + */ + public function interpolate($message, array $context = array()) + { + // build a replacement array with braces around the context keys + $replace = array(); + foreach ($context as $key => $val) { + $replace['{' . $key . '}'] = $val; + } + + // interpolate replacement values into the message and return + return strtr($message, $replace); + } +} diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 870d4c8..28161c9 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -9,14 +9,10 @@ */ class Resque_Worker { - const LOG_NONE = 0; - const LOG_NORMAL = 1; - const LOG_VERBOSE = 2; - /** - * @var int Current log level of this worker. - */ - public $logLevel = 0; + * @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface + */ + public $logger; /** * @var array Array of all associated queues for this worker. @@ -161,7 +157,7 @@ class Resque_Worker $job = false; if(!$this->paused) { if($blocking === true) { - $this->log('Starting blocking with timeout of ' . $interval, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); } else { $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); @@ -179,7 +175,7 @@ class Resque_Worker if($blocking === false) { // If no job was found, we sleep for $interval before continuing and checking again - $this->log('Sleeping for ' . $interval, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); if($this->paused) { $this->updateProcLine('Paused'); } @@ -193,7 +189,7 @@ class Resque_Worker continue; } - $this->log('got ' . $job); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); Resque_Event::trigger('beforeFork', $job); $this->workingOn($job); @@ -203,7 +199,7 @@ class Resque_Worker if ($this->child === 0 || $this->child === false) { $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, $status); $this->perform($job); if ($this->child === 0) { exit(0); @@ -214,7 +210,7 @@ class Resque_Worker // Parent process, sit and wait $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, $status); // Wait until the child process finishes before continuing pcntl_wait($status); @@ -245,13 +241,13 @@ class Resque_Worker $job->perform(); } catch(Exception $e) { - $this->log($job . ' failed: ' . $e->getMessage()); + $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage())); $job->fail($e); return; } $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); - $this->log('done ' . $job); + $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); } /** @@ -269,15 +265,15 @@ class Resque_Worker if($blocking === true) { $job = Resque_Job::reserveBlocking($queues, $timeout); if($job) { - $this->log('Found job on ' . $job->queue, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); return $job; } } else { foreach($queues as $queue) { - $this->log('Checking ' . $queue, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); $job = Resque_Job::reserve($queue); if($job) { - $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); return $job; } } @@ -355,7 +351,7 @@ class Resque_Worker pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection')); - $this->log('Registered signals', self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); } /** @@ -363,7 +359,7 @@ class Resque_Worker */ public function pauseProcessing() { - $this->log('USR2 received; pausing job processing'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); $this->paused = true; } @@ -373,7 +369,7 @@ class Resque_Worker */ public function unPauseProcessing() { - $this->log('CONT received; resuming job processing'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); $this->paused = false; } @@ -383,7 +379,7 @@ class Resque_Worker */ public function reestablishRedisConnection() { - $this->log('SIGPIPE received; attempting to reconnect'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect'); Resque::redis()->establishConnection(); } @@ -394,7 +390,7 @@ class Resque_Worker public function shutdown() { $this->shutdown = true; - $this->log('Exiting...'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); } /** @@ -414,18 +410,18 @@ class Resque_Worker public function killChild() { if(!$this->child) { - $this->log('No child to kill.', self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); return; } - $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { - $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); posix_kill($this->child, SIGKILL); $this->child = null; } else { - $this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); $this->shutdown(); } } @@ -448,7 +444,7 @@ class Resque_Worker if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { continue; } - $this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); $worker->unregisterWorker(); } } @@ -536,26 +532,6 @@ class Resque_Worker return $this->id; } - /** - * Output a given log message to STDOUT. - * - * @param string $message Message to output. - * @param int $logLevel The logging level to capture - */ - public function log($message, $logLevel = self::LOG_NORMAL) - { - if ($logLevel > $this->logLevel) { - return; - } - - if ($this->logLevel == self::LOG_NORMAL) { - fwrite(STDOUT, "*** " . $message . "\n"); - return; - } - - fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n"); - } - /** * Return an object describing the job this worker is currently working on. * @@ -582,5 +558,15 @@ class Resque_Worker { return Resque_Stat::get($stat . ':' . $this); } + + /** + * Inject the logging object into the worker + * + * @param Psr\Log\LoggerInterface $logger + */ + public function setLogger(Psr\Log\LoggerInterface $logger) + { + $this->logger = $logger; + } } ?> diff --git a/test/Resque/Tests/EventTest.php b/test/Resque/Tests/EventTest.php index 880a323..894fd7a 100644 --- a/test/Resque/Tests/EventTest.php +++ b/test/Resque/Tests/EventTest.php @@ -16,6 +16,7 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase // Register a worker to test with $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); $this->worker->registerWorker(); } diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php index 68a25ff..d751c37 100644 --- a/test/Resque/Tests/JobStatusTest.php +++ b/test/Resque/Tests/JobStatusTest.php @@ -19,6 +19,7 @@ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase // Register a worker to test with $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); } public function testJobStatusCanBeTracked() diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index 688c265..0c09696 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -17,6 +17,7 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase // Register a worker to test with $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); $this->worker->registerWorker(); } diff --git a/test/Resque/Tests/LogTest.php b/test/Resque/Tests/LogTest.php new file mode 100644 index 0000000..db97b16 --- /dev/null +++ b/test/Resque/Tests/LogTest.php @@ -0,0 +1,31 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_LogTest extends Resque_Tests_TestCase +{ + public function testLogInterpolate() + { + $logger = new Resque_Log(); + $actual = $logger->interpolate('string {replace}', array('replace' => 'value')); + $expected = 'string value'; + + $this->assertEquals($expected, $actual); + } + + public function testLogInterpolateMutiple() + { + $logger = new Resque_Log(); + $actual = $logger->interpolate( + 'string {replace1} {replace2}', + array('replace1' => 'value1', 'replace2' => 'value2') + ); + $expected = 'string value1 value2'; + + $this->assertEquals($expected, $actual); + } +} diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index 1ab7dd2..93c0621 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -11,6 +11,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testWorkerRegistersInList() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); // Make sure the worker is in the list @@ -23,6 +24,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase // Register a few workers for($i = 0; $i < $num; ++$i) { $worker = new Resque_Worker('queue_' . $i); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); } @@ -33,6 +35,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testGetWorkerById() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); $newWorker = Resque_Worker::find((string)$worker); @@ -47,6 +50,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testWorkerCanUnregister() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); $worker->unregisterWorker(); @@ -58,6 +62,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testPausedWorkerDoesNotPickUpJobs() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->pauseProcessing(); Resque::enqueue('jobs', 'Test_Job'); $worker->work(0); @@ -68,6 +73,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testResumedWorkerPicksUpJobs() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->pauseProcessing(); Resque::enqueue('jobs', 'Test_Job'); $worker->work(0); @@ -83,6 +89,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase 'queue1', 'queue2' )); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('queue1', 'Test_Job_1'); Resque::enqueue('queue2', 'Test_Job_2'); @@ -101,6 +108,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase 'medium', 'low' )); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); // Queue the jobs in a different order @@ -122,6 +130,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testWildcardQueueWorkerWorksAllQueues() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('queue1', 'Test_Job_1'); @@ -137,6 +146,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testWorkerDoesNotWorkOnUnknownQueues() { $worker = new Resque_Worker('queue1'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('queue2', 'Test_Job'); @@ -147,6 +157,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase { Resque::enqueue('jobs', 'Test_Job'); $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $job = $worker->reserve(); $worker->workingOn($job); $worker->doneWorking(); @@ -156,6 +167,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testWorkerRecordsWhatItIsWorkingOn() { $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); $payload = array( @@ -178,6 +190,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase Resque::enqueue('jobs', 'Invalid_Job'); $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->work(0); $worker->work(0); @@ -189,15 +202,18 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase { // Register a good worker $goodWorker = new Resque_Worker('jobs'); + $goodWorker->setLogger(new Resque_Log()); $goodWorker->registerWorker(); $workerId = explode(':', $goodWorker); // Register some bad workers $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->setId($workerId[0].':1:jobs'); $worker->registerWorker(); $worker = new Resque_Worker(array('high', 'low')); + $worker->setLogger(new Resque_Log()); $worker->setId($workerId[0].':2:high,low'); $worker->registerWorker(); @@ -213,12 +229,14 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase { // Register a bad worker on this machine $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $workerId = explode(':', $worker); $worker->setId($workerId[0].':1:jobs'); $worker->registerWorker(); // Register some other false workers $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->setId('my.other.host:1:jobs'); $worker->registerWorker(); @@ -235,6 +253,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testWorkerFailsUncompletedJobsOnExit() { $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); $payload = array( @@ -251,6 +270,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testBlockingListPop() { $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('jobs', 'Test_Job_1'); From 5cef885a0602d7c09f53e81baeb05529e6676e90 Mon Sep 17 00:00:00 2001 From: Rockstar04 Date: Tue, 25 Jun 2013 12:10:08 -0700 Subject: [PATCH 3/4] Look for a var in the bin/resque script to allow override --- bin/resque | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/bin/resque b/bin/resque index 890c529..b9a10cf 100755 --- a/bin/resque +++ b/bin/resque @@ -50,8 +50,6 @@ else if(!empty($VVERBOSE)) { $logLevel = true; } -$logger = new Resque_Log($logLevel); - $APP_INCLUDE = getenv('APP_INCLUDE'); if($APP_INCLUDE) { if(!file_exists($APP_INCLUDE)) { @@ -61,6 +59,12 @@ if($APP_INCLUDE) { require_once $APP_INCLUDE; } +// See if the APP_INCLUDE containes a logger object, +// If none exists, fallback to internal logger +if (!isset($logger) && !is_object($logger)) { + $logger = new Resque_Log($logLevel); +} + $BLOCKING = getenv('BLOCKING') !== FALSE; $interval = 5; @@ -92,7 +96,7 @@ if($count > 1) { else if(!$pid) { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); - $worker->setLogger(new Resque_Log($logLevel)); + $worker->setLogger($logger); $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); $worker->work($interval, $BLOCKING); break; @@ -103,7 +107,7 @@ if($count > 1) { else { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); - $worker->setLogger(new Resque_Log($logLevel)); + $worker->setLogger($logger); $PIDFILE = getenv('PIDFILE'); if ($PIDFILE) { From 25a804d93da848e69f6ac999b84065e57623fdb4 Mon Sep 17 00:00:00 2001 From: Trent Petersen Date: Wed, 10 Jul 2013 10:07:19 -0500 Subject: [PATCH 4/4] Convert indentation in the Worker class.... to . . . . tabs This was painful for me as a PSR 1-2 follower --- lib/Resque/Worker.php | 228 +++++++++++++++++++++--------------------- 1 file changed, 114 insertions(+), 114 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 28161c9..a6c3985 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -86,7 +86,7 @@ class Resque_Worker */ public static function find($workerId) { - if(!self::exists($workerId) || false === strpos($workerId, ":")) { + if(!self::exists($workerId) || false === strpos($workerId, ":")) { return false; } @@ -148,83 +148,83 @@ class Resque_Worker $this->updateProcLine('Starting'); $this->startup(); - while(true) { - if($this->shutdown) { - break; - } + while(true) { + if($this->shutdown) { + break; + } - // Attempt to find and reserve a job - $job = false; - if(!$this->paused) { - if($blocking === true) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); - } else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); - } + // Attempt to find and reserve a job + $job = false; + if(!$this->paused) { + if($blocking === true) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); + } else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + } - $job = $this->reserve($blocking, $interval); - } + $job = $this->reserve($blocking, $interval); + } - if(!$job) { - // For an interval of 0, break now - helps with unit testing etc - if($interval == 0) { - break; - } + if(!$job) { + // For an interval of 0, break now - helps with unit testing etc + if($interval == 0) { + break; + } - if($blocking === false) - { - // If no job was found, we sleep for $interval before continuing and checking again - $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); - if($this->paused) { - $this->updateProcLine('Paused'); - } - else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); - } + if($blocking === false) + { + // If no job was found, we sleep for $interval before continuing and checking again + $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); + if($this->paused) { + $this->updateProcLine('Paused'); + } + else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + } - usleep($interval * 1000000); - } + usleep($interval * 1000000); + } - continue; - } + continue; + } - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); - Resque_Event::trigger('beforeFork', $job); - $this->workingOn($job); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); + Resque_Event::trigger('beforeFork', $job); + $this->workingOn($job); - $this->child = Resque::fork(); + $this->child = Resque::fork(); - // Forked and we're the child. Run the job. - if ($this->child === 0 || $this->child === false) { - $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->logger->log(Psr\Log\LogLevel::INFO, $status); - $this->perform($job); - if ($this->child === 0) { - exit(0); - } - } + // Forked and we're the child. Run the job. + if ($this->child === 0 || $this->child === false) { + $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->logger->log(Psr\Log\LogLevel::INFO, $status); + $this->perform($job); + if ($this->child === 0) { + exit(0); + } + } - if($this->child > 0) { - // Parent process, sit and wait - $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->logger->log(Psr\Log\LogLevel::INFO, $status); + if($this->child > 0) { + // Parent process, sit and wait + $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->logger->log(Psr\Log\LogLevel::INFO, $status); - // Wait until the child process finishes before continuing - pcntl_wait($status); - $exitStatus = pcntl_wexitstatus($status); - if($exitStatus !== 0) { - $job->fail(new Resque_Job_DirtyExitException( - 'Job exited with exit code ' . $exitStatus - )); - } - } + // Wait until the child process finishes before continuing + pcntl_wait($status); + $exitStatus = pcntl_wexitstatus($status); + if($exitStatus !== 0) { + $job->fail(new Resque_Job_DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + } + } - $this->child = null; - $this->doneWorking(); - } + $this->child = null; + $this->doneWorking(); + } $this->unregisterWorker(); } @@ -241,46 +241,46 @@ class Resque_Worker $job->perform(); } catch(Exception $e) { - $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage())); + $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage())); $job->fail($e); return; } $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); - $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); + $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); } - /** - * @param bool $blocking - * @param int $timeout - * @return object|boolean Instance of Resque_Job if a job is found, false if not. - */ - public function reserve($blocking = false, $timeout = null) - { - $queues = $this->queues(); - if(!is_array($queues)) { - return; - } + /** + * @param bool $blocking + * @param int $timeout + * @return object|boolean Instance of Resque_Job if a job is found, false if not. + */ + public function reserve($blocking = false, $timeout = null) + { + $queues = $this->queues(); + if(!is_array($queues)) { + return; + } - if($blocking === true) { - $job = Resque_Job::reserveBlocking($queues, $timeout); - if($job) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); - return $job; - } - } else { - foreach($queues as $queue) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); - $job = Resque_Job::reserve($queue); - if($job) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); - return $job; - } - } - } + if($blocking === true) { + $job = Resque_Job::reserveBlocking($queues, $timeout); + if($job) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); + return $job; + } + } else { + foreach($queues as $queue) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); + $job = Resque_Job::reserve($queue); + if($job) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); + return $job; + } + } + } - return false; - } + return false; + } /** * Return an array containing all of the queues that this worker should use @@ -351,7 +351,7 @@ class Resque_Worker pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection')); - $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); } /** @@ -359,7 +359,7 @@ class Resque_Worker */ public function pauseProcessing() { - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); $this->paused = true; } @@ -369,7 +369,7 @@ class Resque_Worker */ public function unPauseProcessing() { - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); $this->paused = false; } @@ -379,7 +379,7 @@ class Resque_Worker */ public function reestablishRedisConnection() { - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect'); Resque::redis()->establishConnection(); } @@ -390,7 +390,7 @@ class Resque_Worker public function shutdown() { $this->shutdown = true; - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); } /** @@ -410,18 +410,18 @@ class Resque_Worker public function killChild() { if(!$this->child) { - $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); return; } - $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { - $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); posix_kill($this->child, SIGKILL); $this->child = null; } else { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); $this->shutdown(); } } @@ -439,14 +439,14 @@ class Resque_Worker $workerPids = $this->workerPids(); $workers = self::all(); foreach($workers as $worker) { - if (is_object($worker)) { - list($host, $pid, $queues) = explode(':', (string)$worker, 3); - if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { - continue; - } - $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); - $worker->unregisterWorker(); - } + if (is_object($worker)) { + list($host, $pid, $queues) = explode(':', (string)$worker, 3); + if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { + continue; + } + $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); + $worker->unregisterWorker(); + } } }