From 9e7b3a658e34b33bc6d136c7bab80c2e6cb87129 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 18:03:41 +0200 Subject: [PATCH] Refactor --- composer.json | 4 +- config/resque.php | 2 +- src/Console/WorkCommand.php | 50 +++--------- src/Job.php | 10 ++- src/Queue.php | 35 ++++++--- src/Resque.php | 104 +++++++++++++++++++++++++ src/ResqueManager.php | 138 ---------------------------------- src/ResqueServiceProvider.php | 23 +++--- src/Worker.php | 52 +++++++++++++ 9 files changed, 215 insertions(+), 203 deletions(-) create mode 100644 src/Resque.php delete mode 100644 src/ResqueManager.php create mode 100644 src/Worker.php diff --git a/composer.json b/composer.json index bd9b2ce..bf83714 100644 --- a/composer.json +++ b/composer.json @@ -8,8 +8,8 @@ ], "require": { "illuminate/console": "^5.2", - "chrisboulton/php-resque": "^1.2", - "illuminate/config": "^5.2" + "illuminate/config": "^5.2", + "chrisboulton/php-resque": "dev-master" }, "require-dev": { "phpunit/phpunit": "^5.4" diff --git a/config/resque.php b/config/resque.php index d10dd65..61cf6f3 100644 --- a/config/resque.php +++ b/config/resque.php @@ -21,7 +21,7 @@ return [ | */ - 'queuePrefix' => env('RESQUE_QUEUE_PREFIX', null), + 'prefix' => env('RESQUE_PREFIX', null), 'trackStatus' => env('RESQUE_TRACK_STATUS', false), ]; diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 86ab95c..19ec972 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -1,11 +1,10 @@ manager = $manager; + $this->resque = $resque; } /** @@ -46,12 +45,10 @@ class WorkCommand extends IlluminateCommand $interval = (int)$this->option('interval'); $count = (int)$this->option('count'); - $logLevel = $this->getLogLevel(); - if ($count > 1) { for ($i = 0; $i < $count; $i++) { try { - $pid = $this->manager->fork(); + $pid = $this->resque->fork(); } catch (\RuntimeException $exception) { $this->error($exception->getMessage()); @@ -59,11 +56,11 @@ class WorkCommand extends IlluminateCommand } if (0 === $pid) { - $this->startWorker($queues, $interval, $logLevel); + $this->startWorker($queues, $interval); } } } else { - $this->startWorker($queues, $interval, $logLevel); + $this->startWorker($queues, $interval); } return 0; @@ -71,43 +68,16 @@ class WorkCommand extends IlluminateCommand /** * @param array $queues - * @param int $logLevel * @param int $interval */ - private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) + private function startWorker(array $queues, $interval = 5) { - $queues = array_map(function ($queue) { - return $this->manager->getQueueName($queue); - }, $queues); - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; $this->info(sprintf('Starting worker %s', $worker)); $worker->work($interval); } - /** - * @return int - */ - protected function getLogLevel() - { - switch ($this->verbosity) { - case OutputInterface::VERBOSITY_VERBOSE: - $logLevel = Resque_Worker::LOG_NORMAL; - break; - - case OutputInterface::VERBOSITY_VERY_VERBOSE: - $logLevel = Resque_Worker::LOG_VERBOSE; - break; - - default: - $logLevel = Resque_Worker::LOG_NONE; - } - - return $logLevel; - } - /** * {@inheritdoc} */ diff --git a/src/Job.php b/src/Job.php index cc52b73..1216ee7 100644 --- a/src/Job.php +++ b/src/Job.php @@ -36,7 +36,15 @@ abstract class Job */ public function arguments() { - return $this->$args; + return $this->args; + } + + /** + * @return string + */ + public function name() + { + return \get_class($this); } /** diff --git a/src/Queue.php b/src/Queue.php index 15212fa..b386d32 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -8,31 +8,44 @@ namespace Hlgrrnhrdt\Resque; */ class Queue { - protected $name; /** - * @var ResqueManager + * @var string */ - private $manager; + protected $name; /** - * Queue constructor. - * - * @param ResqueManager $manager + * @param string $name */ - public function __construct(ResqueManager $manager) + public function __construct($name) { - $this->manager = $manager; + $this->name = $name; } /** - * @return \Resque_Job[] + * @return string + */ + public function name() + { + return $this->name; + } + + /** + * @return int + */ + public function size() + { + return \Resque::size($this->name); + } + + /** + * @return Job[] */ public function jobs() { - $result = $this->manager->redis()->lrange('queue:' . $this->name, 0, -1); + $result = \Resque::redis()->lrange('queue:' . $this->name, 0, -1); $jobs = []; foreach ($result as $job) { - $jobs[] = new \Resque_Job($this->name, json_decode($job, true)); + $jobs[] = (new \Resque_Job($this->name, \json_decode($job, true)))->getInstance(); } return $jobs; diff --git a/src/Resque.php b/src/Resque.php new file mode 100644 index 0000000..6965d5e --- /dev/null +++ b/src/Resque.php @@ -0,0 +1,104 @@ + + */ +class Resque +{ + /** + * @var string + */ + protected $prefix; + + /** + * @var bool + */ + protected $trackStatus = false; + + /** + * + */ + public function __construct($connection = null) + { + + } + + /** + * @param string $prefix + */ + public function setPrefix($prefix) + { + \Resque_Redis::prefix($prefix); + } + + /** + * @param Job $job + * @param bool $trackStatus + * + * @return null|\Resque_Job_Status + */ + public function enqueue(Job $job, $trackStatus = false) + { + $id = \Resque::enqueue($job->queue(), $job->name(), $job->arguments(), $trackStatus); + + if (true === $trackStatus) { + return new \Resque_Job_Status($id); + } + + return null; + } + + /** + * @param Job $job + * @param bool $trackStatus + * + * @return null|\Resque_Job_Status + */ + public function enqueueOnce(Job $job, $trackStatus = false) + { + $queue = new Queue($job->queue()); + + foreach ($queue->jobs() as $queuedJob) { + if (true === $this->isDuplicateJob($job, $queuedJob)) { + return ($trackStatus) ? new \Resque_Job_Status($queuedJob->job->payload['id']) : null; + } + } + + return $this->enqueue($job, $trackStatus); + } + + /** + * @return \Resque_Redis + */ + public function redis() + { + return \Resque::redis(); + } + + /** + * @return int + * + * @throws RuntimeException + */ + public function fork() + { + return \Resque::fork(); + } + + /** + * @param Job $job + * @param Job $queuedJob + * + * @return bool + */ + private function isDuplicateJob(Job $job, Job $queuedJob) + { + return $job->name() === $queuedJob->name() + && count(array_intersect($job->arguments(), $queuedJob->arguments())) === count($job->arguments()); + } +} diff --git a/src/ResqueManager.php b/src/ResqueManager.php deleted file mode 100644 index 36cb59d..0000000 --- a/src/ResqueManager.php +++ /dev/null @@ -1,138 +0,0 @@ - - */ -class ResqueManager -{ - /** - * @var Resque - */ - protected $resque; - - /** - * @var string - */ - protected $queuePrefix; - - /** - * @var bool - */ - protected $trackStatus = false; - - /** - * ResqueManager constructor. - * - * @param \Resque $resque - * @param string $queuePrefix - * @param bool $trackStatus - */ - public function __construct(Resque $resque, $queuePrefix, $trackStatus = false) - { - $this->resque = $resque; - $this->trackStatus = $trackStatus; - $this->queuePrefix = $queuePrefix; - } - - /** - * @param Job $job - * @param bool $trackStatus - * - * @return null|\Resque_Job_Status - */ - public function enqueue(Job $job, $trackStatus = false) - { - $id = $this->resque->enqueue($this->getQueueNameFromJob($job), get_class($job), $job->arguments(), $trackStatus); - - if (true === $trackStatus) { - return new \Resque_Job_Status($id); - } - - return null; - } - - /** - * @param Job $job - * @param bool $trackStatus - * - * @return null|\Resque_Job_Status - */ - public function enqueueOnce(Job $job, $trackStatus = false) - { - $queue = new Queue($this->getQueueNameFromJob($job)); - - foreach ($queue->jobs() as $queuedJob) { - if (true === $this->isDuplicateJob($job, $queuedJob)) { - return ($trackStatus) ? new \Resque_Job_Status($queuedJob->payload['id']) : null; - } - } - - return $this->enqueue($job, $trackStatus); - } - - /** - * @return \Resque_Redis - */ - public function redis() - { - return $this->resque->redis(); - } - - /** - * @return int - * - * @throws RuntimeException - */ - public function fork() - { - if (false === function_exists('pcntl_fork')) { - return -1; - } - - $pid = pcntl_fork(); - if (-1 === $pid) { - throw new RuntimeException('Unable to fork child worker.'); - } - - return $pid; - } - - /** - * @param \Hlgrrnhrdt\Resque\Job $job - * @param $queuedJob - * - * @return bool - */ - private function isDuplicateJob(Job $job, \Resque_Job $queuedJob) - { - return $queuedJob->payload['class'] === get_class($queuedJob) - && count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments()); - } - - private function getQueueNameFromJob(Job $job) - { - $queue = $job->queue(); - - return $this->getQueueName($queue); - } - - /** - * @param string $queue - * - * @return string - */ - public function getQueueName($queue) - { - if ($this->queuePrefix) { - $queue = implode(':', [$this->queuePrefix, $queue]); - } - - return $queue; - } -} diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index ec68fc3..81a1fb1 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -3,7 +3,6 @@ namespace Hlgrrnhrdt\Resque; use Hlgrrnhrdt\Resque\Console\WorkCommand; use Illuminate\Support\ServiceProvider; -use Resque; /** * ResqueServiceProvider @@ -12,6 +11,15 @@ use Resque; */ class ResqueServiceProvider extends ServiceProvider { + /** + * + */ + public function boot() + { + $connection = $this->app['config']['resque.connection']; + \Resque::setBackend($connection['server'], $connection['db']); + } + /** * Register the service provider. * @@ -19,19 +27,14 @@ class ResqueServiceProvider extends ServiceProvider */ public function register() { - $this->registerManager(); + $this->registerResque(); $this->registerWorkCommand(); } - protected function registerManager() + protected function registerResque() { - $this->app->singleton('resque.manager', function () { - $config = $this->app['config']['resque.connection']; - - $resque = new Resque(); - $resque->setBackend($config['server'], $config['db']); - - return new ResqueManager($resque, $this->app['config']['resque.trackStatus']); + $this->app->singleton('resque', function () { + return new Resque(); }); } diff --git a/src/Worker.php b/src/Worker.php new file mode 100644 index 0000000..2c281d5 --- /dev/null +++ b/src/Worker.php @@ -0,0 +1,52 @@ + + */ +class Worker +{ + /** + * @var \Resque_Worker + */ + private $worker; + + /** + * @param \Resque_Worker $worker + */ + public function __construct(\Resque_Worker $worker) + { + $this->worker = $worker; + } + + /** + * @return string + */ + public function id() + { + return (string)$this->worker; + } + + /** + * @return bool + */ + public function stop() + { + list(, $pid,) = \explode(':', $this->id()); + return \posix_kill($pid, 3); + } + + /** + * @return Queue[] + */ + public function queues() + { + $queues = \array_map(function ($queue) { + return new Queue($queue); + }, $this->worker->queues()); + + return $queues; + } +}