fix compatibility with phpredis

* implement a fork helper method that closes the connection to redis before forking (instead of resetting after)
  to work around bugs with phpredis/socket fork handling
* phpredis does not automatically typecast to string, so worker name must be typecasted when registering
This commit is contained in:
Chris Boulton 2013-01-13 02:59:06 +11:00
parent f082ec872e
commit 6800fbe5ac
3 changed files with 31 additions and 39 deletions

View File

@ -68,7 +68,7 @@ if(!empty($COUNT) && $COUNT > 1) {
if($count > 1) { if($count > 1) {
for($i = 0; $i < $count; ++$i) { for($i = 0; $i < $count; ++$i) {
$pid = pcntl_fork(); $pid = Resque::fork();
if($pid == -1) { if($pid == -1) {
die("Could not fork worker ".$i."\n"); die("Could not fork worker ".$i."\n");
} }

View File

@ -26,12 +26,6 @@ class Resque
*/ */
protected static $redisDatabase = 0; protected static $redisDatabase = 0;
/**
* @var int PID of current process. Used to detect changes when forking
* and implement "thread" safety to avoid race conditions.
*/
protected static $pid = null;
/** /**
* Given a host/port combination separated by a colon, set it as * Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to. * the redis server that Resque will talk to.
@ -54,15 +48,7 @@ class Resque
*/ */
public static function redis() public static function redis()
{ {
// Detect when the PID of the current process has changed (from a fork, etc) if (self::$redis !== null) {
// and force a reconnect to redis.
$pid = getmypid();
if (self::$pid !== $pid) {
self::$redis = null;
self::$pid = $pid;
}
if(!is_null(self::$redis)) {
return self::$redis; return self::$redis;
} }
@ -75,6 +61,33 @@ class Resque
return self::$redis; return self::$redis;
} }
/**
* fork() helper method for php-resque that handles issues PHP socket
* and phpredis have with passing around sockets between child/parent
* processes.
*
* Will close connection to Redis before forking.
*
* @return int Return vars as per pcntl_fork()
*/
public static function fork()
{
if(!function_exists('pcntl_fork')) {
return -1;
}
// Close the connection to Redis before forking.
// This is a workaround for issues phpredis has.
self::$redis = null;
$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}
return $pid;
}
/** /**
* Push a job to the end of a specific queue. If the queue does not * Push a job to the end of a specific queue. If the queue does not
* exist, then create it as well. * exist, then create it as well.

View File

@ -184,7 +184,7 @@ class Resque_Worker
Resque_Event::trigger('beforeFork', $job); Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job); $this->workingOn($job);
$this->child = $this->fork(); $this->child = Resque::fork();
// Forked and we're the child. Run the job. // Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) { if ($this->child === 0 || $this->child === false) {
@ -286,27 +286,6 @@ class Resque_Worker
return $queues; return $queues;
} }
/**
* Attempt to fork a child process from the parent to run a job in.
*
* Return values are those of pcntl_fork().
*
* @return int -1 if the fork failed, 0 for the forked child, the PID of the child for the parent.
*/
private function fork()
{
if(!function_exists('pcntl_fork')) {
return false;
}
$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}
return $pid;
}
/** /**
* Perform necessary actions to start a worker. * Perform necessary actions to start a worker.
*/ */
@ -474,7 +453,7 @@ class Resque_Worker
*/ */
public function registerWorker() public function registerWorker()
{ {
Resque::redis()->sadd('workers', $this); Resque::redis()->sadd('workers', (string)$this);
Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y')); Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y'));
} }