From 0ac835a630e5081af6bcfe1e6a9a26814b1cb9d1 Mon Sep 17 00:00:00 2001 From: pedroarnal Date: Wed, 25 Jan 2012 00:46:34 +0100 Subject: [PATCH 1/6] Enabling use of unix sockets on Redis backend. --- lib/Resque.php | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/Resque.php b/lib/Resque.php index d0be465..f4dbbd9 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -33,7 +33,13 @@ class Resque self::$redis = new Resque_RedisCluster($server); } else { - list($host, $port) = explode(':', $server); + if (strpos($server, 'unix:') === false) { + list($host, $port) = explode(':', $server); + } + else { + $host = $server; + $port = null; + } require_once dirname(__FILE__) . '/Resque/Redis.php'; self::$redis = new Resque_Redis($host, $port); } From 5b24b471dc393daf0317f9bec98def56f53e52b5 Mon Sep 17 00:00:00 2001 From: Salimane Adjao Moustapha Date: Thu, 2 Feb 2012 15:49:59 +0800 Subject: [PATCH 2/6] avoid working with dirty worker ids --- lib/Resque/Worker.php | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index a180d27..6bef8f3 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -53,7 +53,7 @@ class Resque_Worker * @var Resque_Job Current job, if any, being processed by this worker. */ private $currentJob = null; - + /** * @var int Process ID of child worker processes. */ @@ -95,7 +95,7 @@ class Resque_Worker */ public static function find($workerId) { - if(!self::exists($workerId)) { + if(!self::exists($workerId) || false === strpos($workerId, ":")) { return false; } @@ -447,12 +447,14 @@ class Resque_Worker $workerPids = $this->workerPids(); $workers = self::all(); foreach($workers as $worker) { - list($host, $pid, $queues) = explode(':', (string)$worker, 3); - if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { - continue; - } - $this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE); - $worker->unregisterWorker(); + if (is_object($worker)) { + list($host, $pid, $queues) = explode(':', (string)$worker, 3); + if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { + continue; + } + $this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE); + $worker->unregisterWorker(); + } } } From 782d0317e630a18a343ae958626d79cad2bbe2b4 Mon Sep 17 00:00:00 2001 From: pedroarnal Date: Sat, 18 Feb 2012 17:10:06 +0100 Subject: [PATCH 3/6] Update lib/Resque/Redis.php --- lib/Resque/Redis.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 0a6908f..b6d7522 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -39,7 +39,7 @@ class Resque_Redis extends Redisent 'setnx', 'incr', 'incrby', - 'decrby', + 'decr', 'decrby', 'rpush', 'lpush', From ebe76658175a7a8c9f190db6747a91f0c91eb988 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 5 Mar 2012 19:21:43 +1100 Subject: [PATCH 4/6] fix lost jobs when there is more than one worker process started by the same parent process --- lib/Resque.php | 61 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/lib/Resque.php b/lib/Resque.php index d0be465..a997356 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -19,6 +19,23 @@ class Resque */ public static $redis = null; + /** + * @var mixed Host/port conbination separated by a colon, or a nested + * array of server swith host/port pairs + */ + protected static $redisServer = null; + + /** + * @var int ID of Redis database to select. + */ + protected static $redisDatabase = 0; + + /** + * @var int PID of current process. Used to detect changes when forking + * and implement "thread" safety to avoid race conditions. + */ + protected static $pid = null; + /** * Given a host/port combination separated by a colon, set it as * the redis server that Resque will talk to. @@ -28,6 +45,35 @@ class Resque */ public static function setBackend($server, $database = 0) { + self::$redisServer = $server; + self::$redisDatabase = $database; + self::$redis = null; + } + + /** + * Return an instance of the Resque_Redis class instantiated for Resque. + * + * @return Resque_Redis Instance of Resque_Redis. + */ + public static function redis() + { + // Detect when the PID of the current process has changed (from a fork, etc) + // and force a reconnect to redis. + $pid = getmypid(); + if (self::$pid !== $pid) { + self::$redis = null; + self::$pid = $pid; + } + + if(!is_null(self::$redis)) { + return self::$redis; + } + + $server = self::$redisServer; + if (empty($server)) { + $server = 'localhost:6379'; + } + if(is_array($server)) { require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; self::$redis = new Resque_RedisCluster($server); @@ -38,20 +84,7 @@ class Resque self::$redis = new Resque_Redis($host, $port); } - self::redis()->select($database); - } - - /** - * Return an instance of the Resque_Redis class instantiated for Resque. - * - * @return Resque_Redis Instance of Resque_Redis. - */ - public static function redis() - { - if(is_null(self::$redis)) { - self::setBackend('localhost:6379'); - } - + self::$redis->select(self::$redisDatabase); return self::$redis; } From 8259dc7491346ccaee73ab5c0d80f31fa84435a7 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 5 Mar 2012 19:35:56 +1100 Subject: [PATCH 5/6] update changelog --- CHANGELOG.markdown | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index 84ae07c..8f2d66b 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -4,6 +4,13 @@ * Use `require_once` when including php-resque after the app has been included in the sample resque.php to prevent include conflicts (andrewjshults) * Wrap job arguments in an array to improve compatibility with ruby resque (warezthebeef) * Fix a bug where the worker would spin out of control taking the server with it, if the redis connection was interrupted even briefly. Use SIGPIPE to trap this scenario cleanly. (d11wtq) +* Added support of Redis prefix (namespaces) (hlegius) +* When reserving jobs, check if the payload received from popping a queue is a valid object (fix bug whereby jobs are reserved based on an erroneous payload) (salimane) +* Re-enable autoload for class_exists in Job.php (humancopy) +* Fix lost jobs when there is more than one worker process started by the same parent process (salimane) +* Move include for resque before APP_INCLUDE is loaded in, so that way resque is available for the app +* Avoid working with dirty worker IDs (salimane) + ## 1.1 (2011-03-27) ## From e6464f4c792d362d9fbc4c0b4dc9131971f28d0a Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 5 Mar 2012 20:21:12 +1100 Subject: [PATCH 6/6] Require Redisent/Redis separately --- test/Resque/Tests/bootstrap.php | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Resque/Tests/bootstrap.php b/test/Resque/Tests/bootstrap.php index eb84258..ad709c6 100644 --- a/test/Resque/Tests/bootstrap.php +++ b/test/Resque/Tests/bootstrap.php @@ -21,6 +21,7 @@ require_once CWD . '/TestCase.php'; // Include Resque require_once RESQUE_LIB . 'Resque.php'; require_once RESQUE_LIB . 'Resque/Worker.php'; +require_once RESQUE_LIB . 'Resque/Redis.php'; // Attempt to start our own redis instance for tesitng. exec('which redis-server', $output, $returnVar);