Merge pull request #63 from ruudk/blocking-list-pop

Basic support for blocking list pop
This commit is contained in:
Chris Boulton 2013-06-12 01:09:11 -07:00
commit 59f617e639
16 changed files with 704 additions and 185 deletions

View file

@ -10,6 +10,8 @@ class Resque
{
const VERSION = '1.2';
const DEFAULT_INTERVAL = 5;
/**
* @var Resque_Redis Instance of Resque_Redis that talks to redis.
*/
@ -60,7 +62,7 @@ class Resque
self::$redis = new Resque_Redis($server, self::$redisDatabase);
return self::$redis;
}
/**
* fork() helper method for php-resque that handles issues PHP socket
* and phpredis have with passing around sockets between child/parent
@ -114,7 +116,8 @@ class Resque
*/
public static function pop($queue)
{
$item = self::redis()->lpop('queue:' . $queue);
$item = self::redis()->lpop('queue:' . $queue);
if(!$item) {
return;
}
@ -122,6 +125,40 @@ class Resque
return json_decode($item, true);
}
/**
* Pop an item off the end of the specified queues, using blocking list pop,
* decode it and return it.
*
* @param array $queues
* @param int $timeout
* @return null|array Decoded item from the queue.
*/
public static function blpop(array $queues, $timeout)
{
$list = array();
foreach($queues AS $queue) {
$list[] = 'queue:' . $queue;
}
$item = self::redis()->blpop($list, (int)$timeout);
if(!$item) {
return;
}
/**
* Normally the Resque_Redis class returns queue names without the prefix
* But the blpop is a bit different. It returns the name as prefix:queue:name
* So we need to strip off the prefix:queue: part
*/
$queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));
return array(
'queue' => $queue,
'payload' => json_decode($item[1], true)
);
}
/**
* Return the size (number of pending jobs) of the specified queue.
*

View file

@ -58,13 +58,11 @@ class Resque_Job
);
}
$id = md5(uniqid('', true));
if (!Resque::push($queue, array(
Resque::push($queue, array(
'class' => $class,
'args' => array($args),
'id' => $id,
))) {
return false;
}
));
if($monitor) {
Resque_Job_Status::create($id);
@ -73,22 +71,41 @@ class Resque_Job
return $id;
}
/**
* Find the next available job from the specified queue and return an
* instance of Resque_Job for it.
*
* @param string $queue The name of the queue to check for a job in.
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/
public static function reserve($queue)
{
$payload = Resque::pop($queue);
if(!is_array($payload)) {
return false;
}
/**
* Find the next available job from the specified queue and return an
* instance of Resque_Job for it.
*
* @param string $queue The name of the queue to check for a job in.
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/
public static function reserve($queue)
{
$payload = Resque::pop($queue);
if(!is_array($payload)) {
return false;
}
return new Resque_Job($queue, $payload);
}
return new Resque_Job($queue, $payload);
}
/**
* Find the next available job from the specified queues using blocking list pop
* and return an instance of Resque_Job for it.
*
* @param array $queues
* @param int $timeout
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/
public static function reserveBlocking(array $queues, $timeout = null)
{
$item = Resque::blpop($queues, $timeout);
if(!is_array($item)) {
return false;
}
return new Resque_Job($item['queue'], $item['payload']);
}
/**
* Update the status of the current job.
@ -153,7 +170,7 @@ class Resque_Job
);
}
$this->instance = new $this->payload['class']();
$this->instance = new $this->payload['class'];
$this->instance->job = $this;
$this->instance->args = $this->getArguments();
$this->instance->queue = $this->queue;

View file

@ -47,6 +47,7 @@ class Resque_Redis
'lset',
'lrem',
'lpop',
'blpop',
'rpop',
'sadd',
'srem',
@ -142,7 +143,13 @@ class Resque_Redis
*/
public function __call($name, $args) {
if(in_array($name, $this->keyCommands)) {
$args[0] = self::$defaultNamespace . $args[0];
if(is_array($args[0])) {
foreach($args[0] AS $i => $v) {
$args[0][$i] = self::$defaultNamespace . $v;
}
} else {
$args[0] = self::$defaultNamespace . $args[0];
}
}
try {
return $this->driver->__call($name, $args);

View file

@ -147,75 +147,88 @@ class Resque_Worker
*
* @param int $interval How often to check for new jobs across the queues.
*/
public function work($interval = 5)
public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
{
$this->updateProcLine('Starting');
$this->startup();
while(true) {
if($this->shutdown) {
break;
}
while(true) {
if($this->shutdown) {
break;
}
// Attempt to find and reserve a job
$job = false;
if(!$this->paused) {
$job = $this->reserve();
}
// Attempt to find and reserve a job
$job = false;
if(!$this->paused) {
if($blocking === true) {
$this->log('Starting blocking with timeout of ' . $interval, self::LOG_VERBOSE);
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval);
} else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
}
if(!$job) {
// For an interval of 0, break now - helps with unit testing etc
if($interval == 0) {
break;
}
// If no job was found, we sleep for $interval before continuing and checking again
$this->log('Sleeping for ' . $interval, self::LOG_VERBOSE);
if($this->paused) {
$this->updateProcLine('Paused');
}
else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
}
usleep($interval * 1000000);
continue;
}
$job = $this->reserve($blocking, $interval);
}
$this->log('got ' . $job);
Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job);
if(!$job) {
// For an interval of 0, break now - helps with unit testing etc
if($interval == 0) {
break;
}
$this->child = Resque::fork();
if($blocking === false)
{
// If no job was found, we sleep for $interval before continuing and checking again
$this->log('Sleeping for ' . $interval, self::LOG_VERBOSE);
if($this->paused) {
$this->updateProcLine('Paused');
}
else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
}
// Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->perform($job);
if ($this->child === 0) {
exit(0);
}
}
usleep($interval * 1000000);
}
if($this->child > 0) {
// Parent process, sit and wait
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
continue;
}
// Wait until the child process finishes before continuing
pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
}
}
$this->log('got ' . $job);
Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job);
$this->child = null;
$this->doneWorking();
}
$this->child = Resque::fork();
// Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->perform($job);
if ($this->child === 0) {
exit(0);
}
}
if($this->child > 0) {
// Parent process, sit and wait
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
// Wait until the child process finishes before continuing
pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
}
}
$this->child = null;
$this->doneWorking();
}
$this->unregisterWorker();
}
@ -241,28 +254,37 @@ class Resque_Worker
$this->log('done ' . $job);
}
/**
* Attempt to find a job from the top of one of the queues for this worker.
*
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
*/
public function reserve()
{
$queues = $this->queues();
if(!is_array($queues)) {
return;
}
foreach($queues as $queue) {
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
$job = Resque_Job::reserve($queue);
if($job) {
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
return $job;
}
}
/**
* @param bool $blocking
* @param int $timeout
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
*/
public function reserve($blocking = false, $timeout = null)
{
$queues = $this->queues();
if(!is_array($queues)) {
return;
}
return false;
}
if($blocking === true) {
$job = Resque_Job::reserveBlocking($queues, $timeout);
if($job) {
$this->log('Found job on ' . $job->queue, self::LOG_VERBOSE);
return $job;
}
} else {
foreach($queues as $queue) {
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
$job = Resque_Job::reserve($queue);
if($job) {
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
return $job;
}
}
}
return false;
}
/**
* Return an array containing all of the queues that this worker should use