Merge branch 'master' of git://github.com/chrisboulton/php-resque

This commit is contained in:
humancopy 2011-10-23 09:47:33 +02:00
commit 599295ef9c
8 changed files with 34 additions and 10 deletions

View File

@ -1,4 +1,11 @@
## 1.1 (2011-02-26) ## ## 1.2 (Unreleased) ##
* Allow alternate redis database to be selected when calling setBackend by supplying a second argument (patrickbajao)
* Use `require_once` when including php-resque after the app has been included in the sample resque.php to prevent include conflicts (andrewjshults)
* Wrap job arguments in an array to improve compatibility with ruby resque (warezthebeef)
* Fix a bug where the worker would spin out of control taking the server with it, if the redis connection was interrupted even briefly. Use SIGPIPE to trap this scenario cleanly. (d11wtq)
## 1.1 (2011-03-27) ##
* Update Redisent library for Redis 2.2 compatibility. Redis 2.2 is now required. (thedotedge) * Update Redisent library for Redis 2.2 compatibility. Redis 2.2 is now required. (thedotedge)
* Trim output of `ps` to remove any prepended whitespace (KevBurnsJr) * Trim output of `ps` to remove any prepended whitespace (KevBurnsJr)

View File

@ -47,7 +47,7 @@ Jobs are queued as follows:
require_once 'lib/Resque.php'; require_once 'lib/Resque.php';
// Required if redis is located elsewhere // Required if redis is located elsewhere
Resque::setBackend('localhost', 6379); Resque::setBackend('localhost:6379');
$args = array( $args = array(
'name' => 'Chris' 'name' => 'Chris'

View File

@ -49,6 +49,10 @@ class Redisent {
function __construct($host, $port = 6379) { function __construct($host, $port = 6379) {
$this->host = $host; $this->host = $host;
$this->port = $port; $this->port = $port;
$this->establishConnection();
}
function establishConnection() {
$this->__sock = fsockopen($this->host, $this->port, $errno, $errstr); $this->__sock = fsockopen($this->host, $this->port, $errno, $errstr);
if (!$this->__sock) { if (!$this->__sock) {
throw new Exception("{$errno} - {$errstr}"); throw new Exception("{$errno} - {$errstr}");

View File

@ -26,7 +26,7 @@ class Resque
* @param mixed $server Host/port combination separated by a colon, or * @param mixed $server Host/port combination separated by a colon, or
* a nested array of servers with host/port pairs. * a nested array of servers with host/port pairs.
*/ */
public static function setBackend($server) public static function setBackend($server, $database = 0)
{ {
if(is_array($server)) { if(is_array($server)) {
require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
@ -37,6 +37,8 @@ class Resque
require_once dirname(__FILE__) . '/Resque/Redis.php'; require_once dirname(__FILE__) . '/Resque/Redis.php';
self::$redis = new Resque_Redis($host, $port); self::$redis = new Resque_Redis($host, $port);
} }
self::redis()->select($database);
} }
/** /**

View File

@ -63,7 +63,7 @@ class Resque_Job
$id = md5(uniqid('', true)); $id = md5(uniqid('', true));
Resque::push($queue, array( Resque::push($queue, array(
'class' => $class, 'class' => $class,
'args' => $args, 'args' => array($args),
'id' => $id, 'id' => $id,
)); ));
@ -128,7 +128,7 @@ class Resque_Job
return array(); return array();
} }
return $this->payload['args']; return $this->payload['args'][0];
} }
/** /**
@ -248,4 +248,4 @@ class Resque_Job
return '(' . implode(' | ', $name) . ')'; return '(' . implode(' | ', $name) . ')';
} }
} }
?> ?>

View File

@ -358,6 +358,7 @@ class Resque_Worker
pcntl_signal(SIGUSR1, array($this, 'killChild')); pcntl_signal(SIGUSR1, array($this, 'killChild'));
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection'));
$this->log('Registered signals', self::LOG_VERBOSE); $this->log('Registered signals', self::LOG_VERBOSE);
} }
@ -380,6 +381,16 @@ class Resque_Worker
$this->paused = false; $this->paused = false;
} }
/**
* Signal handler for SIGPIPE, in the event the redis connection has gone away.
* Attempts to reconnect to redis, or raises an Exception.
*/
public function reestablishRedisConnection()
{
$this->log('SIGPIPE received; attempting to reconnect');
Resque::redis()->establishConnection();
}
/** /**
* Schedule a worker for shutdown. Will finish processing the current job * Schedule a worker for shutdown. Will finish processing the current job
* and when the timeout interval is reached, the worker will shut down. * and when the timeout interval is reached, the worker will shut down.

View File

@ -13,8 +13,8 @@ if($APP_INCLUDE) {
require_once $APP_INCLUDE; require_once $APP_INCLUDE;
} }
require 'lib/Resque.php'; require_once 'lib/Resque.php';
require 'lib/Resque/Worker.php'; require_once 'lib/Resque/Worker.php';
$REDIS_BACKEND = getenv('REDIS_BACKEND'); $REDIS_BACKEND = getenv('REDIS_BACKEND');
if(!empty($REDIS_BACKEND)) { if(!empty($REDIS_BACKEND)) {

View File

@ -65,7 +65,7 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase
Resque::enqueue('jobs', 'Test_Job', $args); Resque::enqueue('jobs', 'Test_Job', $args);
$job = Resque_Job::reserve('jobs'); $job = Resque_Job::reserve('jobs');
$this->assertEquals($args, $job->payload['args']); $this->assertEquals($args, $job->getArguments());
} }
public function testAfterJobIsReservedItIsRemoved() public function testAfterJobIsReservedItIsRemoved()
@ -97,7 +97,7 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase
$newJob = Resque_Job::reserve('jobs'); $newJob = Resque_Job::reserve('jobs');
$this->assertEquals($job->payload['class'], $newJob->payload['class']); $this->assertEquals($job->payload['class'], $newJob->payload['class']);
$this->assertEquals($job->payload['args'], $newJob->payload['args']); $this->assertEquals($job->payload['args'], $newJob->getArguments());
} }
public function testFailedJobExceptionsAreCaught() public function testFailedJobExceptionsAreCaught()