diff --git a/lib/Resque.php b/lib/Resque.php index 6b067ae..0b621bf 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -253,41 +253,42 @@ class Resque */ private static function removeItems($queue, $items = Array()) { - $counter = 0; - $originalQueue = 'queue:'. $queue; - $tempQueue = $originalQueue. ':temp:'. time(); - $requeueQueue = $tempQueue. ':requeue'; - - // move each item from original queue to temp queue and process it - $finished = false; - while(!$finished) { - $string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue); - - if(!empty($string)) { - if(self::matchItem($string, $items)) { - $counter++; - } else { - self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue); - } - } else { - $finished = true; + $counter = 0; + $originalQueue = 'queue:'. $queue; + $tempQueue = $originalQueue. ':temp:'. time(); + $requeueQueue = $tempQueue. ':requeue'; + + // move each item from original queue to temp queue and process it + $finished = false; + while (!$finished) { + $string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue); + + if (!empty($string)) { + if(self::matchItem($string, $items)) { + self::redis()->rpop($tempQueue); + $counter++; + } else { + self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue); + } + } else { + $finished = true; + } } - } - // move back from temp queue to original queue - $finished = false; - while(!$finished) { - $string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue); - if (empty($string)) { - $finished = true; + // move back from temp queue to original queue + $finished = false; + while (!$finished) { + $string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue); + if (empty($string)) { + $finished = true; + } } - } - // remove temp queue and requeue queue - self::redis()->del($requeueQueue); - self::redis()->del($tempQueue); - - return $counter; + // remove temp queue and requeue queue + self::redis()->del($requeueQueue); + self::redis()->del($tempQueue); + + return $counter; } /** diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index d0faa53..0636d8b 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -298,6 +298,29 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $this->assertEquals(Resque::dequeue($queue, $test), 1); #$this->assertEquals(Resque::size($queue), 1); } + + public function testDequeueSeveralItemsWithArgs() + { + // GIVEN + $queue = 'jobs'; + $args = array('foo' => 1, 'bar' => 10); + $removeArgs = array('foo' => 1, 'bar' => 2); + Resque::enqueue($queue, 'Test_Job_Dequeue9', $args); + Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs); + Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs); + $this->assertEquals(Resque::size($queue), 3); + + // WHEN + $test = array('Test_Job_Dequeue9' => $removeArgs); + $removedItems = Resque::dequeue($queue, $test); + + // THEN + $this->assertEquals($removedItems, 2); + $this->assertEquals(Resque::size($queue), 1); + $item = Resque::pop($queue); + $this->assertInternalType('array', $item['args']); + $this->assertEquals(10, $item['args'][0]['bar'], 'Wrong items were dequeued from queue!'); + } public function testDequeueItemWithUnorderedArg() {