From b8f98eecd2f4cfad157c28a7a26f6122d679ef64 Mon Sep 17 00:00:00 2001 From: Ruud Kamphuis Date: Tue, 12 Mar 2013 11:18:37 +0100 Subject: [PATCH] WIP --- bin/resque | 1 - demo/job.php | 5 +- demo/queue.php | 2 +- lib/Resque.php | 33 +++++++-- lib/Resque/Job.php | 49 ++++++++++---- lib/Resque/Redis.php | 8 ++- lib/Resque/Worker.php | 154 ++++++++++++++++++++++-------------------- 7 files changed, 150 insertions(+), 102 deletions(-) diff --git a/bin/resque b/bin/resque index 2b605ce..fbae329 100644 --- a/bin/resque +++ b/bin/resque @@ -1,4 +1,3 @@ -#!/usr/bin/env php '); + sleep(1); + fwrite(STDOUT, 'Job ended!' . PHP_EOL); } } ?> \ No newline at end of file diff --git a/demo/queue.php b/demo/queue.php index 95ff5b0..393df87 100644 --- a/demo/queue.php +++ b/demo/queue.php @@ -14,6 +14,6 @@ $args = array( ), ); -$jobId = Resque::enqueue('default', $argv[1], $args, true); +$jobId = Resque::enqueue($argv[1], $argv[2], $args, true); echo "Queued job ".$jobId."\n\n"; ?> \ No newline at end of file diff --git a/lib/Resque.php b/lib/Resque.php index 928ef5e..c7ddf4c 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -114,21 +114,40 @@ class Resque * @param string $queue The name of the queue to fetch an item from. * @return array Decoded item from the queue. */ - public static function pop($queue, $interval = null) + public static function pop($queue) { - if($interval == null) { - $item = self::redis()->lpop('queue:' . $queue); - } else { - $item = self::redis()->blpop('queue:' . $queue, $interval ? $interval : Resque::DEFAULT_INTERVAL); - } + $item = self::redis()->lpop('queue:' . $queue); if(!$item) { return; } - return json_decode($interval == 0 ? $item : $item[1], true); + return json_decode($item, true); } + /** + * Pop an item off the end of the specified queue, decode it and + * return it. + * + * @param string $queue The name of the queue to fetch an item from. + * @return array Decoded item from the queue. + */ + public static function blpop($queues, $interval = null) + { + $list = array(); + foreach($queues AS $queue) { + $list[] = 'queue:' . $queue; + } + + $item = self::redis()->blpop($list, $interval ? (int)$interval : Resque::DEFAULT_INTERVAL); + + if(!$item) { + return; + } + + return json_decode($item[1], true); + } + /** * Return the size (number of pending jobs) of the specified queue. * diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 618b118..e2fb60e 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -73,22 +73,41 @@ class Resque_Job return $id; } - /** - * Find the next available job from the specified queue and return an - * instance of Resque_Job for it. - * - * @param string $queue The name of the queue to check for a job in. - * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. - */ - public static function reserve($queue, $interval = null) - { - $payload = Resque::pop($queue, $interval); - if(!is_array($payload)) { - return false; - } + /** + * Find the next available job from the specified queue and return an + * instance of Resque_Job for it. + * + * @param string $queue The name of the queue to check for a job in. + * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserve($queue) + { + $payload = Resque::pop($queue); + if(!is_array($payload)) { + return false; + } - return new Resque_Job($queue, $payload); - } + return new Resque_Job($queue, $payload); + } + + /** + * Find the next available job from the specified queue and return an + * instance of Resque_Job for it. + * + * @param string $queue The name of the queue to check for a job in. + * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserveBlocking($queues, $interval = null) + { + $payload = Resque::blpop($queues, $interval); + if(!is_array($payload)) { + return false; + } + + var_dump($payload); + + return new Resque_Job($payload->queue, $payload); + } /** * Update the status of the current job. diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 4447146..6cef3d2 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -143,7 +143,13 @@ class Resque_Redis */ public function __call($name, $args) { if(in_array($name, $this->keyCommands)) { - $args[0] = self::$defaultNamespace . $args[0]; + if(is_array($args[0])) { + foreach($args[0] AS $i => $v) { + $args[0][$i] = self::$defaultNamespace . $v; + } + } else { + $args[0] = self::$defaultNamespace . $args[0]; + } } try { return $this->driver->__call($name, $args); diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 204aae6..03487f5 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -152,70 +152,53 @@ class Resque_Worker $this->updateProcLine('Starting'); $this->startup(); - while(true) { - if($this->shutdown) { - break; - } - - // Attempt to find and reserve a job - $job = false; - if(!$this->paused) { - $job = $this->reserve($interval); - } - + while($job = $this->reserveBlocking($interval)) { $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); - if(!$job) { - // For an interval of 0, break now - helps with unit testing etc - if($interval == 0) { - break; - } + $this->log('got ' . $job); + Resque_Event::trigger('beforeFork', $job); + $this->workingOn($job); - // If no job was found, we sleep for $interval before continuing and checking again - if($this->paused) { - $this->updateProcLine('Paused'); - usleep($interval * 1000000); //it's paused, so don't hog redis with requests. - } + $this->child = Resque::fork(); - continue; + // Forked and we're the child. Run the job. + if ($this->child === 0 || $this->child === false) { + $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->log($status, self::LOG_VERBOSE); + $this->perform($job); + if ($this->child === 0) { + exit(0); + } } - $this->log('got ' . $job); - Resque_Event::trigger('beforeFork', $job); - $this->workingOn($job); + if($this->child > 0) { + // Parent process, sit and wait + $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->log($status, self::LOG_VERBOSE); - $this->child = Resque::fork(); + // Wait until the child process finishes before continuing + pcntl_wait($status); + $exitStatus = pcntl_wexitstatus($status); + if($exitStatus !== 0) { + $job->fail(new Resque_Job_DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + } + } - // Forked and we're the child. Run the job. - if ($this->child === 0 || $this->child === false) { - $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); - $this->perform($job); - if ($this->child === 0) { - exit(0); - } - } + $this->child = null; + $this->doneWorking(); - if($this->child > 0) { - // Parent process, sit and wait - $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); + if($this->shutdown) { + break; + } - // Wait until the child process finishes before continuing - pcntl_wait($status); - $exitStatus = pcntl_wexitstatus($status); - if($exitStatus !== 0) { - $job->fail(new Resque_Job_DirtyExitException( - 'Job exited with exit code ' . $exitStatus - )); - } - } - - $this->child = null; - $this->doneWorking(); - } + if($this->paused) { + break; + } + } $this->unregisterWorker(); } @@ -241,28 +224,49 @@ class Resque_Worker $this->log('done ' . $job); } - /** - * Attempt to find a job from the top of one of the queues for this worker. - * - * @return object|boolean Instance of Resque_Job if a job is found, false if not. - */ - public function reserve($interval = null) - { - $queues = $this->queues(); - if(!is_array($queues)) { - return; - } - foreach($queues as $queue) { - $this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE); - $job = Resque_Job::reserve($queue, $interval); - if($job) { - $this->log('Found job on ' . $queue, self::LOG_VERBOSE); - return $job; - } - } + /** + * Attempt to find a job from the top of one of the queues for this worker. + * + * @return object|boolean Instance of Resque_Job if a job is found, false if not. + */ + public function reserve() + { + $queues = $this->queues(); + if(!is_array($queues)) { + return; + } + foreach($queues as $queue) { + $this->log('Checking ' . $queue, self::LOG_VERBOSE); + $job = Resque_Job::reserve($queue); + if($job) { + $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + return $job; + } + } - return false; - } + return false; + } + + /** + * Attempt to find a job from the top of one of the queues for this worker. + * + * @return object|boolean Instance of Resque_Job if a job is found, false if not. + */ + public function reserveBlocking($interval = null) + { + $queues = $this->queues(); + if(!is_array($queues)) { + return; + } + + $job = Resque_Job::reserveBlocking($queues, $interval); + if($job) { + $this->log('Found job on ' . $job->queue, self::LOG_VERBOSE); + return $job; + } + + return false; + } /** * Return an array containing all of the queues that this worker should use