#218 Remove item from queue as well when args match

This commit is contained in:
Axel K. 2014-11-10 15:38:58 +01:00
parent 226ec33bb0
commit 6cda08de25

View File

@ -253,41 +253,42 @@ class Resque
*/ */
private static function removeItems($queue, $items = Array()) private static function removeItems($queue, $items = Array())
{ {
$counter = 0; $counter = 0;
$originalQueue = 'queue:'. $queue; $originalQueue = 'queue:'. $queue;
$tempQueue = $originalQueue. ':temp:'. time(); $tempQueue = $originalQueue. ':temp:'. time();
$requeueQueue = $tempQueue. ':requeue'; $requeueQueue = $tempQueue. ':requeue';
// move each item from original queue to temp queue and process it // move each item from original queue to temp queue and process it
$finished = false; $finished = false;
while(!$finished) { while (!$finished) {
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue); $string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
if(!empty($string)) { if (!empty($string)) {
if(self::matchItem($string, $items)) { if(self::matchItem($string, $items)) {
$counter++; self::redis()->rpop($tempQueue);
} else { $counter++;
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue); } else {
} self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
} else { }
$finished = true; } else {
$finished = true;
}
} }
}
// move back from temp queue to original queue // move back from temp queue to original queue
$finished = false; $finished = false;
while(!$finished) { while (!$finished) {
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue); $string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
if (empty($string)) { if (empty($string)) {
$finished = true; $finished = true;
}
} }
}
// remove temp queue and requeue queue // remove temp queue and requeue queue
self::redis()->del($requeueQueue); self::redis()->del($requeueQueue);
self::redis()->del($tempQueue); self::redis()->del($tempQueue);
return $counter; return $counter;
} }
/** /**