Basic support for blocking list pop

This commit is contained in:
Ruud Kamphuis 2012-08-24 16:51:03 +02:00
parent 37cdec8e11
commit 4d5552867a
4 changed files with 37 additions and 28 deletions

View file

@ -80,9 +80,9 @@ class Resque_Job
* @param string $queue The name of the queue to check for a job in.
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/
public static function reserve($queue)
public static function reserve($queue, $interval = null)
{
$payload = Resque::pop($queue);
$payload = Resque::pop($queue, $interval);
if(!is_array($payload)) {
return false;
}
@ -153,7 +153,7 @@ class Resque_Job
);
}
$this->instance = new $this->payload['class']();
$this->instance = new $this->payload['class'];
$this->instance->job = $this;
$this->instance->args = $this->getArguments();
$this->instance->queue = $this->queue;

View file

@ -47,6 +47,7 @@ class Resque_Redis
'lset',
'lrem',
'lpop',
'blpop',
'rpop',
'sadd',
'srem',

View file

@ -147,7 +147,7 @@ class Resque_Worker
*
* @param int $interval How often to check for new jobs across the queues.
*/
public function work($interval = 5)
public function work($interval = Resque::DEFAULT_INTERVAL)
{
$this->updateProcLine('Starting');
$this->startup();
@ -160,25 +160,25 @@ class Resque_Worker
// Attempt to find and reserve a job
$job = false;
if(!$this->paused) {
$job = $this->reserve();
$job = $this->reserve($interval);
}
if(!$job) {
// For an interval of 0, break now - helps with unit testing etc
if($interval == 0) {
break;
}
// If no job was found, we sleep for $interval before continuing and checking again
$this->log('Sleeping for ' . $interval, self::LOG_VERBOSE);
if($this->paused) {
$this->updateProcLine('Paused');
}
else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
}
usleep($interval * 1000000);
continue;
}
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
if(!$job) {
// For an interval of 0, break now - helps with unit testing etc
if($interval == 0) {
break;
}
// If no job was found, we sleep for $interval before continuing and checking again
if($this->paused) {
$this->updateProcLine('Paused');
usleep($interval * 1000000); //it's paused, so don't hog redis with requests.
}
continue;
}
$this->log('got ' . $job);
Resque_Event::trigger('beforeFork', $job);
@ -246,15 +246,15 @@ class Resque_Worker
*
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
*/
public function reserve()
public function reserve($interval = null)
{
$queues = $this->queues();
if(!is_array($queues)) {
return;
}
foreach($queues as $queue) {
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
$job = Resque_Job::reserve($queue);
$this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE);
$job = Resque_Job::reserve($queue, $interval);
if($job) {
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
return $job;