diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index 38e57da..84ae07c 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -1,4 +1,11 @@ -## 1.1 (2011-02-26) ## +## 1.2 (Unreleased) ## + +* Allow alternate redis database to be selected when calling setBackend by supplying a second argument (patrickbajao) +* Use `require_once` when including php-resque after the app has been included in the sample resque.php to prevent include conflicts (andrewjshults) +* Wrap job arguments in an array to improve compatibility with ruby resque (warezthebeef) +* Fix a bug where the worker would spin out of control taking the server with it, if the redis connection was interrupted even briefly. Use SIGPIPE to trap this scenario cleanly. (d11wtq) + +## 1.1 (2011-03-27) ## * Update Redisent library for Redis 2.2 compatibility. Redis 2.2 is now required. (thedotedge) * Trim output of `ps` to remove any prepended whitespace (KevBurnsJr) diff --git a/README.markdown b/README.markdown index ab490bd..7066213 100644 --- a/README.markdown +++ b/README.markdown @@ -47,7 +47,7 @@ Jobs are queued as follows: require_once 'lib/Resque.php'; // Required if redis is located elsewhere - Resque::setBackend('localhost', 6379); + Resque::setBackend('localhost:6379'); $args = array( 'name' => 'Chris' diff --git a/lib/Redisent/Redisent.php b/lib/Redisent/Redisent.php index ac70c81..19f5cdf 100644 --- a/lib/Redisent/Redisent.php +++ b/lib/Redisent/Redisent.php @@ -49,6 +49,10 @@ class Redisent { function __construct($host, $port = 6379) { $this->host = $host; $this->port = $port; + $this->establishConnection(); + } + + function establishConnection() { $this->__sock = fsockopen($this->host, $this->port, $errno, $errstr); if (!$this->__sock) { throw new Exception("{$errno} - {$errstr}"); diff --git a/lib/Resque.php b/lib/Resque.php index 168d849..d429ae5 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -26,7 +26,7 @@ class Resque * @param mixed $server Host/port combination separated by a colon, or * a nested array of servers with host/port pairs. */ - public static function setBackend($server) + public static function setBackend($server, $database = 0) { if(is_array($server)) { require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; @@ -37,6 +37,8 @@ class Resque require_once dirname(__FILE__) . '/Resque/Redis.php'; self::$redis = new Resque_Redis($host, $port); } + + self::redis()->select($database); } /** diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 5df0fb6..4667b40 100644 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -63,7 +63,7 @@ class Resque_Job $id = md5(uniqid('', true)); Resque::push($queue, array( 'class' => $class, - 'args' => $args, + 'args' => array($args), 'id' => $id, )); @@ -128,7 +128,7 @@ class Resque_Job return array(); } - return $this->payload['args']; + return $this->payload['args'][0]; } /** @@ -248,4 +248,4 @@ class Resque_Job return '(' . implode(' | ', $name) . ')'; } } -?> \ No newline at end of file +?> diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index a5c70b9..a180d27 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -358,6 +358,7 @@ class Resque_Worker pcntl_signal(SIGUSR1, array($this, 'killChild')); pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); + pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection')); $this->log('Registered signals', self::LOG_VERBOSE); } @@ -380,6 +381,16 @@ class Resque_Worker $this->paused = false; } + /** + * Signal handler for SIGPIPE, in the event the redis connection has gone away. + * Attempts to reconnect to redis, or raises an Exception. + */ + public function reestablishRedisConnection() + { + $this->log('SIGPIPE received; attempting to reconnect'); + Resque::redis()->establishConnection(); + } + /** * Schedule a worker for shutdown. Will finish processing the current job * and when the timeout interval is reached, the worker will shut down. diff --git a/resque.php b/resque.php index d020622..d85a70e 100644 --- a/resque.php +++ b/resque.php @@ -13,8 +13,8 @@ if($APP_INCLUDE) { require_once $APP_INCLUDE; } -require 'lib/Resque.php'; -require 'lib/Resque/Worker.php'; +require_once 'lib/Resque.php'; +require_once 'lib/Resque/Worker.php'; $REDIS_BACKEND = getenv('REDIS_BACKEND'); if(!empty($REDIS_BACKEND)) { diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index df6187b..1f33fee 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -65,7 +65,7 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase Resque::enqueue('jobs', 'Test_Job', $args); $job = Resque_Job::reserve('jobs'); - $this->assertEquals($args, $job->payload['args']); + $this->assertEquals($args, $job->getArguments()); } public function testAfterJobIsReservedItIsRemoved() @@ -97,7 +97,7 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $newJob = Resque_Job::reserve('jobs'); $this->assertEquals($job->payload['class'], $newJob->payload['class']); - $this->assertEquals($job->payload['args'], $newJob->payload['args']); + $this->assertEquals($job->payload['args'], $newJob->getArguments()); } public function testFailedJobExceptionsAreCaught()