Merge pull request #223 from theaxel/master

Fix dequeueing with args
This commit is contained in:
Chris Boulton 2015-02-02 12:34:14 -08:00
commit 1d24d009f2
2 changed files with 55 additions and 31 deletions

View File

@ -253,41 +253,42 @@ class Resque
*/
private static function removeItems($queue, $items = Array())
{
$counter = 0;
$originalQueue = 'queue:'. $queue;
$tempQueue = $originalQueue. ':temp:'. time();
$requeueQueue = $tempQueue. ':requeue';
// move each item from original queue to temp queue and process it
$finished = false;
while(!$finished) {
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
if(!empty($string)) {
if(self::matchItem($string, $items)) {
$counter++;
} else {
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
}
} else {
$finished = true;
$counter = 0;
$originalQueue = 'queue:'. $queue;
$tempQueue = $originalQueue. ':temp:'. time();
$requeueQueue = $tempQueue. ':requeue';
// move each item from original queue to temp queue and process it
$finished = false;
while (!$finished) {
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
if (!empty($string)) {
if(self::matchItem($string, $items)) {
self::redis()->rpop($tempQueue);
$counter++;
} else {
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
}
} else {
$finished = true;
}
}
}
// move back from temp queue to original queue
$finished = false;
while(!$finished) {
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
if (empty($string)) {
$finished = true;
// move back from temp queue to original queue
$finished = false;
while (!$finished) {
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
if (empty($string)) {
$finished = true;
}
}
}
// remove temp queue and requeue queue
self::redis()->del($requeueQueue);
self::redis()->del($tempQueue);
return $counter;
// remove temp queue and requeue queue
self::redis()->del($requeueQueue);
self::redis()->del($tempQueue);
return $counter;
}
/**

View File

@ -298,6 +298,29 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase
$this->assertEquals(Resque::dequeue($queue, $test), 1);
#$this->assertEquals(Resque::size($queue), 1);
}
public function testDequeueSeveralItemsWithArgs()
{
// GIVEN
$queue = 'jobs';
$args = array('foo' => 1, 'bar' => 10);
$removeArgs = array('foo' => 1, 'bar' => 2);
Resque::enqueue($queue, 'Test_Job_Dequeue9', $args);
Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs);
Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs);
$this->assertEquals(Resque::size($queue), 3);
// WHEN
$test = array('Test_Job_Dequeue9' => $removeArgs);
$removedItems = Resque::dequeue($queue, $test);
// THEN
$this->assertEquals($removedItems, 2);
$this->assertEquals(Resque::size($queue), 1);
$item = Resque::pop($queue);
$this->assertInternalType('array', $item['args']);
$this->assertEquals(10, $item['args'][0]['bar'], 'Wrong items were dequeued from queue!');
}
public function testDequeueItemWithUnorderedArg()
{