From 4d5552867a30a314939209ba6b161b60db5c1d5e Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Fri, 24 Aug 2012 16:51:03 +0200 Subject: [PATCH] Basic support for blocking list pop --- lib/Resque.php | 16 ++++++++++++---- lib/Resque/Job.php | 6 +++--- lib/Resque/Redis.php | 1 + lib/Resque/Worker.php | 42 +++++++++++++++++++++--------------------- 4 files changed, 37 insertions(+), 28 deletions(-) diff --git a/lib/Resque.php b/lib/Resque.php index a463bea..7478fa4 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -10,6 +10,8 @@ class Resque { const VERSION = '1.2'; + const DEFAULT_INTERVAL = 5; + /** * @var Resque_Redis Instance of Resque_Redis that talks to redis. */ @@ -112,14 +114,19 @@ class Resque * @param string $queue The name of the queue to fetch an item from. * @return array Decoded item from the queue. */ - public static function pop($queue) + public static function pop($queue, $interval = null) { - $item = self::redis()->lpop('queue:' . $queue); + if($interval == null) { + $item = self::redis()->lpop('queue:' . $queue); + } else { + $item = self::redis()->blpop('queue:' . $queue, $interval ?: Resque::DEFAULT_INTERVAL); + } + if(!$item) { return; } - return json_decode($item, true); + return json_decode($interval == 0 ? $item : $item[1], true); } /** @@ -164,8 +171,9 @@ class Resque * @param string $queue Queue to fetch next available job from. * @return Resque_Job Instance of Resque_Job to be processed, false if none or error. */ - public static function reserve($queue) + public static function reserve($queue, $interval = null) { + require_once dirname(__FILE__) . '/Resque/Job.php'; return Resque_Job::reserve($queue); } diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 8cedd4b..618b118 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -80,9 +80,9 @@ class Resque_Job * @param string $queue The name of the queue to check for a job in. * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. */ - public static function reserve($queue) + public static function reserve($queue, $interval = null) { - $payload = Resque::pop($queue); + $payload = Resque::pop($queue, $interval); if(!is_array($payload)) { return false; } @@ -153,7 +153,7 @@ class Resque_Job ); } - $this->instance = new $this->payload['class'](); + $this->instance = new $this->payload['class']; $this->instance->job = $this; $this->instance->args = $this->getArguments(); $this->instance->queue = $this->queue; diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index bbb0643..4447146 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -47,6 +47,7 @@ class Resque_Redis 'lset', 'lrem', 'lpop', + 'blpop', 'rpop', 'sadd', 'srem', diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 45852bb..204aae6 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -147,7 +147,7 @@ class Resque_Worker * * @param int $interval How often to check for new jobs across the queues. */ - public function work($interval = 5) + public function work($interval = Resque::DEFAULT_INTERVAL) { $this->updateProcLine('Starting'); $this->startup(); @@ -160,25 +160,25 @@ class Resque_Worker // Attempt to find and reserve a job $job = false; if(!$this->paused) { - $job = $this->reserve(); + $job = $this->reserve($interval); } - if(!$job) { - // For an interval of 0, break now - helps with unit testing etc - if($interval == 0) { - break; - } - // If no job was found, we sleep for $interval before continuing and checking again - $this->log('Sleeping for ' . $interval, self::LOG_VERBOSE); - if($this->paused) { - $this->updateProcLine('Paused'); - } - else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); - } - usleep($interval * 1000000); - continue; - } + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + + if(!$job) { + // For an interval of 0, break now - helps with unit testing etc + if($interval == 0) { + break; + } + + // If no job was found, we sleep for $interval before continuing and checking again + if($this->paused) { + $this->updateProcLine('Paused'); + usleep($interval * 1000000); //it's paused, so don't hog redis with requests. + } + + continue; + } $this->log('got ' . $job); Resque_Event::trigger('beforeFork', $job); @@ -246,15 +246,15 @@ class Resque_Worker * * @return object|boolean Instance of Resque_Job if a job is found, false if not. */ - public function reserve() + public function reserve($interval = null) { $queues = $this->queues(); if(!is_array($queues)) { return; } foreach($queues as $queue) { - $this->log('Checking ' . $queue, self::LOG_VERBOSE); - $job = Resque_Job::reserve($queue); + $this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE); + $job = Resque_Job::reserve($queue, $interval); if($job) { $this->log('Found job on ' . $queue, self::LOG_VERBOSE); return $job;