diff --git a/README.md b/README.md index 068a57e..a254bc0 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,31 @@ class My_Job } ``` +### Dequeueing Jobs ### + +This method can be used to conveniently remove a job from a queue. + +```php +// Removes job class 'My_Job' of queue 'default' +Resque::dequeue('default', ['My_Job']); + +// Removes job class 'My_Job' with Job ID '087df5819a790ac666c9608e2234b21e' of queue 'default' +Resuque::dequeue('default', ['My_Job' => '087df5819a790ac666c9608e2234b21e']); + +// Removes job class 'My_Job' with arguments of queue 'default' +Resque::dequeue('default', ['My_Job' => array('foo' => 1, 'bar' => 2)]); + +// Removes multiple jobs +Resque::dequeue('default', ['My_Job', 'My_Job2']); +``` + +If no jobs are given, this method will dequeue all jobs matching the provided queue. + +```php +// Removes all jobs of queue 'default' +Resque::dequeue('default'); +``` + ### Tracking Job Statuses ### php-resque has the ability to perform basic status tracking of a queued diff --git a/lib/Resque.php b/lib/Resque.php index 0f4b94e..a26eaa6 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -120,39 +120,55 @@ class Resque return json_decode($item, true); } - /** - * Pop an item off the end of the specified queues, using blocking list pop, - * decode it and return it. - * - * @param array $queues - * @param int $timeout - * @return null|array Decoded item from the queue. - */ - public static function blpop(array $queues, $timeout) - { - $list = array(); - foreach($queues AS $queue) { - $list[] = 'queue:' . $queue; - } + /** + * Remove items of the specified queue + * + * @param string $queue The name of the queue to fetch an item from. + * @param array $items + * @return integer number of deleted items + */ + public static function dequeue($queue, $items = Array()) + { + if(count($items) > 0) { + return self::removeItems($queue, $items); + } else { + return self::removeList($queue); + } + } + + /** + * Pop an item off the end of the specified queues, using blocking list pop, + * decode it and return it. + * + * @param array $queues + * @param int $timeout + * @return null|array Decoded item from the queue. + */ + public static function blpop(array $queues, $timeout) + { + $list = array(); + foreach($queues AS $queue) { + $list[] = 'queue:' . $queue; + } - $item = self::redis()->blpop($list, (int)$timeout); + $item = self::redis()->blpop($list, (int)$timeout); - if(!$item) { - return; - } + if(!$item) { + return; + } - /** - * Normally the Resque_Redis class returns queue names without the prefix - * But the blpop is a bit different. It returns the name as prefix:queue:name - * So we need to strip off the prefix:queue: part - */ - $queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:')); + /** + * Normally the Resque_Redis class returns queue names without the prefix + * But the blpop is a bit different. It returns the name as prefix:queue:name + * So we need to strip off the prefix:queue: part + */ + $queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:')); - return array( - 'queue' => $queue, - 'payload' => json_decode($item[1], true) - ); - } + return array( + 'queue' => $queue, + 'payload' => json_decode($item[1], true) + ); + } /** * Return the size (number of pending jobs) of the specified queue. @@ -215,4 +231,108 @@ class Resque } return $queues; } + + /** + * Remove Items from the queue + * Safely moving each item to a temporary queue before processing it + * If the Job matches, counts otherwise puts it in a requeue_queue + * which at the end eventually be copied back into the original queue + * + * @private + * + * @param string $queue The name of the queue + * @param array $items + * @return integer number of deleted items + */ + private static function removeItems($queue, $items = Array()) + { + $counter = 0; + $originalQueue = 'queue:'. $queue; + $tempQueue = $originalQueue. ':temp:'. time(); + $requeueQueue = $tempQueue. ':requeue'; + + // move each item from original queue to temp queue and process it + $finished = false; + while(!$finished) { + $string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue); + + if(!empty($string)) { + if(self::matchItem($string, $items)) { + $counter++; + } else { + self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue); + } + } else { + $finished = true; + } + } + + // move back from temp queue to original queue + $finished = false; + while(!$finished) { + $string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue); + if (empty($string)) { + $finished = true; + } + } + + // remove temp queue and requeue queue + self::redis()->del($requeueQueue); + self::redis()->del($tempQueue); + + return $counter; + } + + /** + * matching item + * item can be ['class'] or ['class' => 'id'] or ['class' => {:foo => 1, :bar => 2}] + * @private + * + * @params string $string redis result in json + * @params $items + * + * @return (bool) + */ + private static function matchItem($string, $items) + { + $decoded = json_decode($string, true); + + foreach($items as $key => $val) { + # class name only ex: item[0] = ['class'] + if (is_numeric($key)) { + if($decoded['class'] == $val) { + return true; + } + # class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}] + } elseif (is_array($val)) { + $decodedArgs = (array)$decoded['args'][0]; + if ($decoded['class'] == $key && + count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) { + return true; + } + # class name with ID, example: item[0] = ['class' => 'id'] + } else { + if ($decoded['class'] == $key && $decoded['id'] == $val) { + return true; + } + } + } + return false; + } + + /** + * Remove List + * + * @private + * + * @params string $queue the name of the queue + * @return integer number of deleted items belongs to this list + */ + private static function removeList($queue) + { + $counter = self::size($queue); + $result = self::redis()->del('queue:' . $queue); + return ($result == 1) ? $counter : 0; + } } + diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 574e085..59ca215 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -77,7 +77,8 @@ class Resque_Redis 'zscore', 'zremrangebyscore', 'sort', - 'rename' + 'rename', + 'rpoplpush' ); // sinterstore // sunion @@ -86,7 +87,6 @@ class Resque_Redis // sdiffstore // sinter // smove - // rpoplpush // mget // msetnx // mset diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index 0c09696..d0faa53 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -180,4 +180,149 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase Resque_Redis::prefix('resque'); $this->assertEquals(Resque::size($queue), 0); } -} \ No newline at end of file + + public function testDequeueAll() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $this->assertEquals(Resque::dequeue($queue), 2); + $this->assertEquals(Resque::size($queue), 0); + } + + public function testDequeueMakeSureNotDeleteOthers() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $other_queue = 'other_jobs'; + Resque::enqueue($other_queue, 'Test_Job_Dequeue'); + Resque::enqueue($other_queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $this->assertEquals(Resque::size($other_queue), 2); + $this->assertEquals(Resque::dequeue($queue), 2); + $this->assertEquals(Resque::size($queue), 0); + $this->assertEquals(Resque::size($other_queue), 2); + } + + public function testDequeueSpecificItem() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue2'); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueSpecificMultipleItems() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue2', 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::dequeue($queue, $test), 2); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueNonExistingItem() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue4'); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 3); + } + + public function testDequeueNonExistingItem2() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue4', 'Test_Job_Dequeue1'); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 2); + } + + public function testDequeueItemID() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $qid = Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue' => $qid); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueWrongItemID() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $qid = Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + #qid right but class name is wrong + $test = array('Test_Job_Dequeue1' => $qid); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 2); + } + + public function testDequeueWrongItemID2() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $qid = Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue' => 'r4nD0mH4sh3dId'); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 2); + } + + public function testDequeueItemWithArg() + { + $queue = 'jobs'; + $arg = array('foo' => 1, 'bar' => 2); + Resque::enqueue($queue, 'Test_Job_Dequeue9'); + Resque::enqueue($queue, 'Test_Job_Dequeue9', $arg); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue9' => $arg); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + #$this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueItemWithUnorderedArg() + { + $queue = 'jobs'; + $arg = array('foo' => 1, 'bar' => 2); + $arg2 = array('bar' => 2, 'foo' => 1); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue', $arg); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue' => $arg2); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueItemWithiWrongArg() + { + $queue = 'jobs'; + $arg = array('foo' => 1, 'bar' => 2); + $arg2 = array('foo' => 2, 'bar' => 3); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue', $arg); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue' => $arg2); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 2); + } + +}