Apply queue prefix on everything

This commit is contained in:
Holger Reinhardt 2016-07-29 15:15:35 +02:00
parent 8d6ebce6b9
commit fd166bbe82
2 changed files with 21 additions and 6 deletions

View File

@ -42,7 +42,7 @@ class WorkCommand extends IlluminateCommand
*/ */
public function fire() public function fire()
{ {
$queue = $this->option('queue'); $queues = $this->option('queue');
$interval = (int)$this->option('interval'); $interval = (int)$this->option('interval');
$count = (int)$this->option('count'); $count = (int)$this->option('count');
@ -59,11 +59,11 @@ class WorkCommand extends IlluminateCommand
} }
if (0 === $pid) { if (0 === $pid) {
$this->startWorker($queue, $interval, $logLevel); $this->startWorker($queues, $interval, $logLevel);
} }
} }
} else { } else {
$this->startWorker($queue, $interval, $logLevel); $this->startWorker($queues, $interval, $logLevel);
} }
return 0; return 0;
@ -76,6 +76,10 @@ class WorkCommand extends IlluminateCommand
*/ */
private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE)
{ {
$queues = array_walk($queues, function ($queue) {
return $this->manager->getQueue($queue);
});
$worker = new Resque_Worker($queues); $worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel; $worker->logLevel = $logLevel;

View File

@ -48,7 +48,7 @@ class ResqueManager
*/ */
public function enqueue(Job $job, $trackStatus = false) public function enqueue(Job $job, $trackStatus = false)
{ {
$id = $this->resque->enqueue($this->getQueueName($job), get_class($job), $job->arguments(), $trackStatus); $id = $this->resque->enqueue($this->getQueueFromJob($job), get_class($job), $job->arguments(), $trackStatus);
if (true === $trackStatus) { if (true === $trackStatus) {
return new \Resque_Job_Status($id); return new \Resque_Job_Status($id);
@ -65,7 +65,7 @@ class ResqueManager
*/ */
public function enqueueOnce(Job $job, $trackStatus = false) public function enqueueOnce(Job $job, $trackStatus = false)
{ {
$queue = new Queue($job->queue()); $queue = new Queue($this->getQueueFromJob($job));
foreach ($queue->jobs() as $queuedJob) { foreach ($queue->jobs() as $queuedJob) {
if (true === $this->isDuplicateJob($job, $queuedJob)) { if (true === $this->isDuplicateJob($job, $queuedJob)) {
@ -115,9 +115,20 @@ class ResqueManager
&& count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments()); && count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments());
} }
private function getQueueName(Job $job) private function getQueueFromJob(Job $job)
{ {
$queue = $job->queue(); $queue = $job->queue();
return $this->getQueue($queue);
}
/**
* @param string $queue
*
* @return string
*/
public function getQueue($queue)
{
if ($this->queuePrefix) { if ($this->queuePrefix) {
$queue = implode(':', [$this->queuePrefix, $queue]); $queue = implode(':', [$this->queuePrefix, $queue]);
} }