diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index 84ae07c..8f2d66b 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -4,6 +4,13 @@ * Use `require_once` when including php-resque after the app has been included in the sample resque.php to prevent include conflicts (andrewjshults) * Wrap job arguments in an array to improve compatibility with ruby resque (warezthebeef) * Fix a bug where the worker would spin out of control taking the server with it, if the redis connection was interrupted even briefly. Use SIGPIPE to trap this scenario cleanly. (d11wtq) +* Added support of Redis prefix (namespaces) (hlegius) +* When reserving jobs, check if the payload received from popping a queue is a valid object (fix bug whereby jobs are reserved based on an erroneous payload) (salimane) +* Re-enable autoload for class_exists in Job.php (humancopy) +* Fix lost jobs when there is more than one worker process started by the same parent process (salimane) +* Move include for resque before APP_INCLUDE is loaded in, so that way resque is available for the app +* Avoid working with dirty worker IDs (salimane) + ## 1.1 (2011-03-27) ## diff --git a/lib/Resque.php b/lib/Resque.php index 0efa24d..ded12a4 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -19,6 +19,23 @@ class Resque */ public static $redis = null; + /** + * @var mixed Host/port conbination separated by a colon, or a nested + * array of server swith host/port pairs + */ + protected static $redisServer = null; + + /** + * @var int ID of Redis database to select. + */ + protected static $redisDatabase = 0; + + /** + * @var int PID of current process. Used to detect changes when forking + * and implement "thread" safety to avoid race conditions. + */ + protected static $pid = null; + /** * Given a host/port combination separated by a colon, set it as * the redis server that Resque will talk to. @@ -29,17 +46,9 @@ class Resque */ public static function setBackend($server, $database = 0) { - if(is_array($server)) { - require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; - self::$redis = new Resque_RedisCluster($server); - } - else { - list($host, $port) = explode(':', $server); - require_once dirname(__FILE__) . '/Resque/Redis.php'; - self::$redis = new Resque_Redis($host, $port); - } - - self::redis()->select($database); + self::$redisServer = $server; + self::$redisDatabase = $database; + self::$redis = null; } /** @@ -49,10 +58,40 @@ class Resque */ public static function redis() { - if(is_null(self::$redis)) { - self::setBackend('localhost:6379'); + // Detect when the PID of the current process has changed (from a fork, etc) + // and force a reconnect to redis. + $pid = getmypid(); + if (self::$pid !== $pid) { + self::$redis = null; + self::$pid = $pid; } + if(!is_null(self::$redis)) { + return self::$redis; + } + + $server = self::$redisServer; + if (empty($server)) { + $server = 'localhost:6379'; + } + + if(is_array($server)) { + require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; + self::$redis = new Resque_RedisCluster($server); + } + else { + if (strpos($server, 'unix:') === false) { + list($host, $port) = explode(':', $server); + } + else { + $host = $server; + $port = null; + } + require_once dirname(__FILE__) . '/Resque/Redis.php'; + self::$redis = new Resque_Redis($host, $port); + } + + self::$redis->select(self::$redisDatabase); return self::$redis; } diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 0a6908f..b6d7522 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -39,7 +39,7 @@ class Resque_Redis extends Redisent 'setnx', 'incr', 'incrby', - 'decrby', + 'decr', 'decrby', 'rpush', 'lpush', diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 189cf81..fc2e9d9 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -96,7 +96,7 @@ class Resque_Worker */ public static function find($workerId) { - if(!self::exists($workerId)) { + if(!self::exists($workerId) || false === strpos($workerId, ":")) { return false; } @@ -449,12 +449,14 @@ class Resque_Worker $workerPids = $this->workerPids(); $workers = self::all(); foreach($workers as $worker) { - list($host, $pid, $queues) = explode(':', (string)$worker, 3); - if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { - continue; - } - $this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE); - $worker->unregisterWorker(); + if (is_object($worker)) { + list($host, $pid, $queues) = explode(':', (string)$worker, 3); + if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { + continue; + } + $this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE); + $worker->unregisterWorker(); + } } } diff --git a/test/Resque/Tests/bootstrap.php b/test/Resque/Tests/bootstrap.php index eb84258..ad709c6 100644 --- a/test/Resque/Tests/bootstrap.php +++ b/test/Resque/Tests/bootstrap.php @@ -21,6 +21,7 @@ require_once CWD . '/TestCase.php'; // Include Resque require_once RESQUE_LIB . 'Resque.php'; require_once RESQUE_LIB . 'Resque/Worker.php'; +require_once RESQUE_LIB . 'Resque/Redis.php'; // Attempt to start our own redis instance for tesitng. exec('which redis-server', $output, $returnVar);