fix lost jobs when there is more than one worker process started by the same parent process

This commit is contained in:
Chris Boulton 2012-03-05 19:21:43 +11:00
parent 4700375d25
commit ebe7665817

View File

@ -19,6 +19,23 @@ class Resque
*/ */
public static $redis = null; public static $redis = null;
/**
* @var mixed Host/port conbination separated by a colon, or a nested
* array of server swith host/port pairs
*/
protected static $redisServer = null;
/**
* @var int ID of Redis database to select.
*/
protected static $redisDatabase = 0;
/**
* @var int PID of current process. Used to detect changes when forking
* and implement "thread" safety to avoid race conditions.
*/
protected static $pid = null;
/** /**
* Given a host/port combination separated by a colon, set it as * Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to. * the redis server that Resque will talk to.
@ -28,6 +45,35 @@ class Resque
*/ */
public static function setBackend($server, $database = 0) public static function setBackend($server, $database = 0)
{ {
self::$redisServer = $server;
self::$redisDatabase = $database;
self::$redis = null;
}
/**
* Return an instance of the Resque_Redis class instantiated for Resque.
*
* @return Resque_Redis Instance of Resque_Redis.
*/
public static function redis()
{
// Detect when the PID of the current process has changed (from a fork, etc)
// and force a reconnect to redis.
$pid = getmypid();
if (self::$pid !== $pid) {
self::$redis = null;
self::$pid = $pid;
}
if(!is_null(self::$redis)) {
return self::$redis;
}
$server = self::$redisServer;
if (empty($server)) {
$server = 'localhost:6379';
}
if(is_array($server)) { if(is_array($server)) {
require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
self::$redis = new Resque_RedisCluster($server); self::$redis = new Resque_RedisCluster($server);
@ -38,20 +84,7 @@ class Resque
self::$redis = new Resque_Redis($host, $port); self::$redis = new Resque_Redis($host, $port);
} }
self::redis()->select($database); self::$redis->select(self::$redisDatabase);
}
/**
* Return an instance of the Resque_Redis class instantiated for Resque.
*
* @return Resque_Redis Instance of Resque_Redis.
*/
public static function redis()
{
if(is_null(self::$redis)) {
self::setBackend('localhost:6379');
}
return self::$redis; return self::$redis;
} }