Basic support for blocking list pop

This commit is contained in:
Ruud Kamphuis 2012-08-24 16:51:03 +02:00
parent 64cd1874ed
commit 41bf8c1c26
5 changed files with 38 additions and 29 deletions

View File

@ -14,6 +14,8 @@ class Resque
{ {
const VERSION = '1.0'; const VERSION = '1.0';
const DEFAULT_INTERVAL = 5;
/** /**
* @var Resque_Redis Instance of Resque_Redis that talks to redis. * @var Resque_Redis Instance of Resque_Redis that talks to redis.
*/ */
@ -115,14 +117,19 @@ class Resque
* @param string $queue The name of the queue to fetch an item from. * @param string $queue The name of the queue to fetch an item from.
* @return array Decoded item from the queue. * @return array Decoded item from the queue.
*/ */
public static function pop($queue) public static function pop($queue, $interval = null)
{ {
$item = self::redis()->lpop('queue:' . $queue); if($interval == null) {
$item = self::redis()->lpop('queue:' . $queue);
} else {
$item = self::redis()->blpop('queue:' . $queue, $interval ?: Resque::DEFAULT_INTERVAL);
}
if(!$item) { if(!$item) {
return; return;
} }
return json_decode($item, true); return json_decode($interval == 0 ? $item : $item[1], true);
} }
/** /**
@ -168,10 +175,10 @@ class Resque
* @param string $queue Queue to fetch next available job from. * @param string $queue Queue to fetch next available job from.
* @return Resque_Job Instance of Resque_Job to be processed, false if none or error. * @return Resque_Job Instance of Resque_Job to be processed, false if none or error.
*/ */
public static function reserve($queue) public static function reserve($queue, $interval = null)
{ {
require_once dirname(__FILE__) . '/Resque/Job.php'; require_once dirname(__FILE__) . '/Resque/Job.php';
return Resque_Job::reserve($queue); return Resque_Job::reserve($queue, $interval);
} }
/** /**

View File

@ -83,9 +83,9 @@ class Resque_Job
* @param string $queue The name of the queue to check for a job in. * @param string $queue The name of the queue to check for a job in.
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/ */
public static function reserve($queue) public static function reserve($queue, $interval = null)
{ {
$payload = Resque::pop($queue); $payload = Resque::pop($queue, $interval);
if(!is_array($payload)) { if(!is_array($payload)) {
return false; return false;
} }
@ -156,7 +156,7 @@ class Resque_Job
); );
} }
$this->instance = new $this->payload['class'](); $this->instance = new $this->payload['class'];
$this->instance->job = $this; $this->instance->job = $this;
$this->instance->args = $this->getArguments(); $this->instance->args = $this->getArguments();
$this->instance->queue = $this->queue; $this->instance->queue = $this->queue;

View File

@ -50,6 +50,7 @@ class Resque_Redis extends Redisent
'lset', 'lset',
'lrem', 'lrem',
'lpop', 'lpop',
'blpop',
'rpop', 'rpop',
'sadd', 'sadd',
'srem', 'srem',

View File

@ -50,6 +50,7 @@ class Resque_RedisCluster extends RedisentCluster
'lset', 'lset',
'lrem', 'lrem',
'lpop', 'lpop',
'blpop',
'rpop', 'rpop',
'sadd', 'sadd',
'srem', 'srem',

View File

@ -153,7 +153,7 @@ class Resque_Worker
* *
* @param int $interval How often to check for new jobs across the queues. * @param int $interval How often to check for new jobs across the queues.
*/ */
public function work($interval = 5) public function work($interval = Resque::DEFAULT_INTERVAL)
{ {
$this->updateProcLine('Starting'); $this->updateProcLine('Starting');
$this->startup(); $this->startup();
@ -166,25 +166,25 @@ class Resque_Worker
// Attempt to find and reserve a job // Attempt to find and reserve a job
$job = false; $job = false;
if(!$this->paused) { if(!$this->paused) {
$job = $this->reserve(); $job = $this->reserve($interval);
} }
if(!$job) { $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
// For an interval of 0, break now - helps with unit testing etc
if($interval == 0) { if(!$job) {
break; // For an interval of 0, break now - helps with unit testing etc
} if($interval == 0) {
// If no job was found, we sleep for $interval before continuing and checking again break;
$this->log('Sleeping for ' . $interval, true); }
if($this->paused) {
$this->updateProcLine('Paused'); // If no job was found, we sleep for $interval before continuing and checking again
} if($this->paused) {
else { $this->updateProcLine('Paused');
$this->updateProcLine('Waiting for ' . implode(',', $this->queues)); usleep($interval * 1000000); //it's paused, so don't hog redis with requests.
} }
usleep($interval * 1000000);
continue; continue;
} }
$this->log('got ' . $job); $this->log('got ' . $job);
Resque_Event::trigger('beforeFork', $job); Resque_Event::trigger('beforeFork', $job);
@ -252,15 +252,15 @@ class Resque_Worker
* *
* @return object|boolean Instance of Resque_Job if a job is found, false if not. * @return object|boolean Instance of Resque_Job if a job is found, false if not.
*/ */
public function reserve() public function reserve($interval = null)
{ {
$queues = $this->queues(); $queues = $this->queues();
if(!is_array($queues)) { if(!is_array($queues)) {
return; return;
} }
foreach($queues as $queue) { foreach($queues as $queue) {
$this->log('Checking ' . $queue, self::LOG_VERBOSE); $this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE);
$job = Resque_Job::reserve($queue); $job = Resque_Job::reserve($queue, $interval);
if($job) { if($job) {
$this->log('Found job on ' . $queue, self::LOG_VERBOSE); $this->log('Found job on ' . $queue, self::LOG_VERBOSE);
return $job; return $job;