From b60fb0d712b18e1a8ab9e98e2a715e755ef77542 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 09:17:13 +0200 Subject: [PATCH] Some refinements --- src/Console/WorkCommand.php | 19 ++++++++++- src/Job.php | 13 +++++-- src/Queue.php | 24 +++++++++++-- src/ResqueManager.php | 67 +++++++++++++++++++++++++++++++++---- 4 files changed, 110 insertions(+), 13 deletions(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index e6b8354..1470a5a 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -2,6 +2,7 @@ namespace Hlgrrnhrdt\Resque\Console; use Illuminate\Console\Command; +use Symfony\Component\Console\Input\InputOption; /** * WorkCommand @@ -14,6 +15,22 @@ class WorkCommand extends Command public function fire() { - // + $queue = $this->option('queue'); + $interval = $this->option('interval'); + $count = $this->option('count'); } + + /** + * {@inheritdoc} + */ + protected function getOptions() + { + return [ + ['queue', null, InputOption::VALUE_IS_ARRAY & InputOption::VALUE_OPTIONAL, '', 'default'], + ['interval', null, InputOption::VALUE_OPTIONAL, '', 5], + ['count', null, InputOption::VALUE_OPTIONAL, '', 1], + ]; + } + + } diff --git a/src/Job.php b/src/Job.php index 4d03fac..cc52b73 100644 --- a/src/Job.php +++ b/src/Job.php @@ -8,15 +8,20 @@ namespace Hlgrrnhrdt\Resque; */ abstract class Job { + /** + * @var \Resque_Job + */ + public $job; + /** * @var string */ - protected $queue = 'default'; + public $queue = 'default'; /** * @var array */ - protected $arguments = []; + public $args = []; /** * @return mixed @@ -31,7 +36,7 @@ abstract class Job */ public function arguments() { - return $this->arguments; + return $this->$args; } /** @@ -44,4 +49,6 @@ abstract class Job $this->queue = $queue; return $this; } + + abstract public function perform(); } diff --git a/src/Queue.php b/src/Queue.php index d8ddc5c..15212fa 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -9,14 +9,32 @@ namespace Hlgrrnhrdt\Resque; class Queue { protected $name; + /** + * @var ResqueManager + */ + private $manager; - public function __construct($name) + /** + * Queue constructor. + * + * @param ResqueManager $manager + */ + public function __construct(ResqueManager $manager) { - $this->name = $name; + $this->manager = $manager; } + /** + * @return \Resque_Job[] + */ public function jobs() { - \Resque::redis()->lrange('queue:' . $this->name, 0, -1); + $result = $this->manager->redis()->lrange('queue:' . $this->name, 0, -1); + $jobs = []; + foreach ($result as $job) { + $jobs[] = new \Resque_Job($this->name, json_decode($job, true)); + } + + return $jobs; } } diff --git a/src/ResqueManager.php b/src/ResqueManager.php index c01b0fb..21a8f4e 100644 --- a/src/ResqueManager.php +++ b/src/ResqueManager.php @@ -1,7 +1,8 @@ resque = $resque; } - public function enqueue(Job $job) + /** + * @param Job $job + * @param bool $trackStatus + * + * @return null|\Resque_Job_Status + */ + public function enqueue(Job $job, $trackStatus = false) { - $this->resque->enqueue($job->queue(), get_class($job), $job->arguments()); + $id = $this->resque->enqueue($job->queue(), get_class($job), $job->arguments(), $trackStatus); + + if (true === $trackStatus) { + return new \Resque_Job_Status($id); + } + + return null; } - public function enqueueOnce(Job $job) + /** + * @param Job $job + * @param bool $trackStatus + * + * @return null|\Resque_Job_Status + */ + public function enqueueOnce(Job $job, $trackStatus = false) { $queue = new Queue($job->queue()); - + + foreach ($queue->jobs() as $queuedJob) { + if ($job->payload['class'] === get_class($queuedJob) && count(array_intersect($queuedJob->getArguments(), + $job->arguments())) === $job->arguments() + ) { + return ($trackStatus) ? new \Resque_Job_Status($job->payload['id']) : null; + } + } + + return $this->enqueue($job, $trackStatus); + } + + /** + * @return \Resque_Redis + */ + public function redis() + { + return $this->resque->redis(); + } + + /** + * @return int + * + * @throws RuntimeException + */ + public function fork() + { + if (false === function_exists('pcntl_fork')) { + return -1; + } + + $pid = pcntl_fork(); + if (-1 === $pid) { + throw new RuntimeException('Unable to fork child worker.'); + } + + return $pid; } }