From 4a97c1462829a4f8592ae56949f22c850875832e Mon Sep 17 00:00:00 2001 From: Wedy Chainy Date: Wed, 17 Sep 2014 10:20:26 +1000 Subject: [PATCH] dequeue --- README.md | 16 ++++ lib/Resque.php | 142 +++++++++++++++++++++++++++------- lib/Resque/Redis.php | 2 +- test/Resque/Tests/JobTest.php | 75 +++++++++++++++++- 4 files changed, 204 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 068a57e..3a1860d 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,22 @@ class My_Job } ``` +### Dequeueing Jobs ### + +This method can be used to conveniently remove a job from a queue. + +```php +// Removes 'My_Job' of queue 'default' +Resque::dequeue('default', ['My_Job']); +``` + +If no jobs are given, this method will dequeue all jobs matching the provided queue. + +```php +// Removes all jobs of queue 'default' +Resque::dequeue('default'); +``` + ### Tracking Job Statuses ### php-resque has the ability to perform basic status tracking of a queued diff --git a/lib/Resque.php b/lib/Resque.php index 0f4b94e..c83b601 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -120,39 +120,55 @@ class Resque return json_decode($item, true); } - /** - * Pop an item off the end of the specified queues, using blocking list pop, - * decode it and return it. - * - * @param array $queues - * @param int $timeout - * @return null|array Decoded item from the queue. - */ - public static function blpop(array $queues, $timeout) - { - $list = array(); - foreach($queues AS $queue) { - $list[] = 'queue:' . $queue; - } + /** + * Remove items of the specified queue + * + * @param string $queue The name of the queue to fetch an item from. + * @param array $items + * @return integer number of deleted items + */ + public static function dequeue($queue, $items = Array()) + { + if(count($items) > 0) { + return self::removeItems($queue, $items); + } else { + return self::removeList($queue); + } + } + + /** + * Pop an item off the end of the specified queues, using blocking list pop, + * decode it and return it. + * + * @param array $queues + * @param int $timeout + * @return null|array Decoded item from the queue. + */ + public static function blpop(array $queues, $timeout) + { + $list = array(); + foreach($queues AS $queue) { + $list[] = 'queue:' . $queue; + } - $item = self::redis()->blpop($list, (int)$timeout); + $item = self::redis()->blpop($list, (int)$timeout); - if(!$item) { - return; - } + if(!$item) { + return; + } - /** - * Normally the Resque_Redis class returns queue names without the prefix - * But the blpop is a bit different. It returns the name as prefix:queue:name - * So we need to strip off the prefix:queue: part - */ - $queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:')); + /** + * Normally the Resque_Redis class returns queue names without the prefix + * But the blpop is a bit different. It returns the name as prefix:queue:name + * So we need to strip off the prefix:queue: part + */ + $queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:')); - return array( - 'queue' => $queue, - 'payload' => json_decode($item[1], true) - ); - } + return array( + 'queue' => $queue, + 'payload' => json_decode($item[1], true) + ); + } /** * Return the size (number of pending jobs) of the specified queue. @@ -215,4 +231,72 @@ class Resque } return $queues; } + + /** + * Remove Items from the queue + * Safely moving each item to a temporary queue before processing it + * If the Job matches, counts otherwise puts it in a requeue_queue + * which at the end eventually be copied back into the original queue + * + * @private + * + * @param string $queue The name of the queue + * @param array $items + * @return integer number of deleted items + */ + private static function removeItems($queue, $items = Array()) + { + $counter = 0; + $originalQueue = 'queue:'. $queue; + $tempQueue = $originalQueue. ':temp:'. time(); + $requeueQueue = $tempQueue. ':requeue'; + + // move each item from original queue to temp queue and process it + $finished = false; + while(!$finished) { + $string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue); + + if(!empty($string)) { + $decoded = json_decode($string, true); + if(in_array($decoded['class'], $items)) { + $counter++; + } else { + self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue); + } + } else { + $finished = true; + } + } + + // move back from temp queue to original queue + $finished = false; + while(!$finished) { + $string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue); + if (empty($string)) { + $finished = true; + } + } + + // remove temp queue and requeue queue + self::redis()->del($requeueQueue); + self::redis()->del($tempQueue); + + return $counter; + } + + /** + * Remove List + * + * @private + * + * @params string $queue the name of the queue + * @return integer number of deleted items belongs to this list + */ + private static function removeList($queue) + { + $counter = self::size($queue); + $result = self::redis()->del('queue:' . $queue); + return ($result == 1) ? $counter : 0; + } } + diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 574e085..3d3848d 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -78,6 +78,7 @@ class Resque_Redis 'zremrangebyscore', 'sort', 'rename' + 'rpoplpush' ); // sinterstore // sunion @@ -86,7 +87,6 @@ class Resque_Redis // sdiffstore // sinter // smove - // rpoplpush // mget // msetnx // mset diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index 0c09696..f3a6e99 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -180,4 +180,77 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase Resque_Redis::prefix('resque'); $this->assertEquals(Resque::size($queue), 0); } -} \ No newline at end of file + + public function testDequeueAll() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $this->assertEquals(Resque::dequeue($queue), 2); + $this->assertEquals(Resque::size($queue), 0); + } + + public function testDequeueMakeSureNotDeleteOthers() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $other_queue = 'other_jobs'; + Resque::enqueue($other_queue, 'Test_Job_Dequeue'); + Resque::enqueue($other_queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $this->assertEquals(Resque::size($other_queue), 2); + $this->assertEquals(Resque::dequeue($queue), 2); + $this->assertEquals(Resque::size($queue), 0); + $this->assertEquals(Resque::size($other_queue), 2); + } + + public function testDequeueSpecificItem() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue2'); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueSpecificMultipleItems() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue2', 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::dequeue($queue, $test), 2); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueNonExistingItem() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue4'); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 3); + } + + public function testDequeueNonExistingItem2() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue4', 'Test_Job_Dequeue1'); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 2); + } + +}