From c563d0e4aeb1a608cbf2c7e32211848771d6a747 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Thu, 28 Jul 2016 23:52:05 +0200 Subject: [PATCH 01/23] initial commit --- .gitignore | 3 +++ composer.json | 22 ++++++++++++++++ src/Console/WorkCommand.php | 19 ++++++++++++++ src/Job.php | 47 +++++++++++++++++++++++++++++++++++ src/Queue.php | 22 ++++++++++++++++ src/ResqueManager.php | 33 ++++++++++++++++++++++++ src/ResqueServiceProvider.php | 43 ++++++++++++++++++++++++++++++++ 7 files changed, 189 insertions(+) create mode 100644 .gitignore create mode 100644 composer.json create mode 100644 src/Console/WorkCommand.php create mode 100644 src/Job.php create mode 100644 src/Queue.php create mode 100644 src/ResqueManager.php create mode 100644 src/ResqueServiceProvider.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..072705c --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea/ +vendor/ +composer.lock diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..bd9b2ce --- /dev/null +++ b/composer.json @@ -0,0 +1,22 @@ +{ + "name": "hlgrrnhrdt/laravel-resque", + "authors": [ + { + "name": "Holger Reinhardt", + "email": "hlgrrnhrdt@gmail.com" + } + ], + "require": { + "illuminate/console": "^5.2", + "chrisboulton/php-resque": "^1.2", + "illuminate/config": "^5.2" + }, + "require-dev": { + "phpunit/phpunit": "^5.4" + }, + "autoload": { + "psr-4": { + "Hlgrrnhrdt\\Resque\\": "src/" + } + } +} diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php new file mode 100644 index 0000000..e6b8354 --- /dev/null +++ b/src/Console/WorkCommand.php @@ -0,0 +1,19 @@ + + */ +class WorkCommand extends Command +{ + protected $name = 'resque:work'; + + public function fire() + { + // + } +} diff --git a/src/Job.php b/src/Job.php new file mode 100644 index 0000000..4d03fac --- /dev/null +++ b/src/Job.php @@ -0,0 +1,47 @@ + + */ +abstract class Job +{ + /** + * @var string + */ + protected $queue = 'default'; + + /** + * @var array + */ + protected $arguments = []; + + /** + * @return mixed + */ + public function queue() + { + return $this->queue; + } + + /** + * @return array + */ + public function arguments() + { + return $this->arguments; + } + + /** + * @param string $queue + * + * @return Job + */ + public function onQueue($queue) + { + $this->queue = $queue; + return $this; + } +} diff --git a/src/Queue.php b/src/Queue.php new file mode 100644 index 0000000..d8ddc5c --- /dev/null +++ b/src/Queue.php @@ -0,0 +1,22 @@ + + */ +class Queue +{ + protected $name; + + public function __construct($name) + { + $this->name = $name; + } + + public function jobs() + { + \Resque::redis()->lrange('queue:' . $this->name, 0, -1); + } +} diff --git a/src/ResqueManager.php b/src/ResqueManager.php new file mode 100644 index 0000000..c01b0fb --- /dev/null +++ b/src/ResqueManager.php @@ -0,0 +1,33 @@ + + */ +class ResqueManager +{ + /** + * @var PhpResque + */ + private $resque; + + public function __construct(PhpResque $resque) + { + $this->resque = $resque; + } + + public function enqueue(Job $job) + { + $this->resque->enqueue($job->queue(), get_class($job), $job->arguments()); + } + + public function enqueueOnce(Job $job) + { + $queue = new Queue($job->queue()); + + } +} diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php new file mode 100644 index 0000000..c7c5774 --- /dev/null +++ b/src/ResqueServiceProvider.php @@ -0,0 +1,43 @@ + + */ +class ResqueServiceProvider extends ServiceProvider +{ + /** + * Register the service provider. + * + * @return void + */ + public function register() + { + $this->registerManager(); + $this->registerWorkCommand(); + } + + protected function registerManager() + { + $this->app->singleton(ResqueManager::class, function ($app) { + $config = $app['config']['resque.connection']; + $resque = new Resque(); + $resque->setBackend(implode(':', [$config['host'], $config['port']]), $config['db']); + return new ResqueManager($resque); + }); + } + + protected function registerWorkCommand() + { + $this->app->singleton('command.resque.work', function () { + return new WorkCommand(); + }); + } +} From 7bb3c9ef85c0f24367203ef491b83509abf24acd Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Thu, 28 Jul 2016 23:55:02 +0200 Subject: [PATCH 02/23] create README.md --- README.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ + From b60fb0d712b18e1a8ab9e98e2a715e755ef77542 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 09:17:13 +0200 Subject: [PATCH 03/23] Some refinements --- src/Console/WorkCommand.php | 19 ++++++++++- src/Job.php | 13 +++++-- src/Queue.php | 24 +++++++++++-- src/ResqueManager.php | 67 +++++++++++++++++++++++++++++++++---- 4 files changed, 110 insertions(+), 13 deletions(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index e6b8354..1470a5a 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -2,6 +2,7 @@ namespace Hlgrrnhrdt\Resque\Console; use Illuminate\Console\Command; +use Symfony\Component\Console\Input\InputOption; /** * WorkCommand @@ -14,6 +15,22 @@ class WorkCommand extends Command public function fire() { - // + $queue = $this->option('queue'); + $interval = $this->option('interval'); + $count = $this->option('count'); } + + /** + * {@inheritdoc} + */ + protected function getOptions() + { + return [ + ['queue', null, InputOption::VALUE_IS_ARRAY & InputOption::VALUE_OPTIONAL, '', 'default'], + ['interval', null, InputOption::VALUE_OPTIONAL, '', 5], + ['count', null, InputOption::VALUE_OPTIONAL, '', 1], + ]; + } + + } diff --git a/src/Job.php b/src/Job.php index 4d03fac..cc52b73 100644 --- a/src/Job.php +++ b/src/Job.php @@ -8,15 +8,20 @@ namespace Hlgrrnhrdt\Resque; */ abstract class Job { + /** + * @var \Resque_Job + */ + public $job; + /** * @var string */ - protected $queue = 'default'; + public $queue = 'default'; /** * @var array */ - protected $arguments = []; + public $args = []; /** * @return mixed @@ -31,7 +36,7 @@ abstract class Job */ public function arguments() { - return $this->arguments; + return $this->$args; } /** @@ -44,4 +49,6 @@ abstract class Job $this->queue = $queue; return $this; } + + abstract public function perform(); } diff --git a/src/Queue.php b/src/Queue.php index d8ddc5c..15212fa 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -9,14 +9,32 @@ namespace Hlgrrnhrdt\Resque; class Queue { protected $name; + /** + * @var ResqueManager + */ + private $manager; - public function __construct($name) + /** + * Queue constructor. + * + * @param ResqueManager $manager + */ + public function __construct(ResqueManager $manager) { - $this->name = $name; + $this->manager = $manager; } + /** + * @return \Resque_Job[] + */ public function jobs() { - \Resque::redis()->lrange('queue:' . $this->name, 0, -1); + $result = $this->manager->redis()->lrange('queue:' . $this->name, 0, -1); + $jobs = []; + foreach ($result as $job) { + $jobs[] = new \Resque_Job($this->name, json_decode($job, true)); + } + + return $jobs; } } diff --git a/src/ResqueManager.php b/src/ResqueManager.php index c01b0fb..21a8f4e 100644 --- a/src/ResqueManager.php +++ b/src/ResqueManager.php @@ -1,7 +1,8 @@ resque = $resque; } - public function enqueue(Job $job) + /** + * @param Job $job + * @param bool $trackStatus + * + * @return null|\Resque_Job_Status + */ + public function enqueue(Job $job, $trackStatus = false) { - $this->resque->enqueue($job->queue(), get_class($job), $job->arguments()); + $id = $this->resque->enqueue($job->queue(), get_class($job), $job->arguments(), $trackStatus); + + if (true === $trackStatus) { + return new \Resque_Job_Status($id); + } + + return null; } - public function enqueueOnce(Job $job) + /** + * @param Job $job + * @param bool $trackStatus + * + * @return null|\Resque_Job_Status + */ + public function enqueueOnce(Job $job, $trackStatus = false) { $queue = new Queue($job->queue()); - + + foreach ($queue->jobs() as $queuedJob) { + if ($job->payload['class'] === get_class($queuedJob) && count(array_intersect($queuedJob->getArguments(), + $job->arguments())) === $job->arguments() + ) { + return ($trackStatus) ? new \Resque_Job_Status($job->payload['id']) : null; + } + } + + return $this->enqueue($job, $trackStatus); + } + + /** + * @return \Resque_Redis + */ + public function redis() + { + return $this->resque->redis(); + } + + /** + * @return int + * + * @throws RuntimeException + */ + public function fork() + { + if (false === function_exists('pcntl_fork')) { + return -1; + } + + $pid = pcntl_fork(); + if (-1 === $pid) { + throw new RuntimeException('Unable to fork child worker.'); + } + + return $pid; } } From afceda3d95e1f340cd23f9632e61eb1f93a6cf62 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 13:49:06 +0200 Subject: [PATCH 04/23] improvements to config --- src/Console/WorkCommand.php | 104 +++++++++++++++++++++++++++++++--- src/ResqueManager.php | 35 +++++++++++- src/ResqueServiceProvider.php | 13 +++-- 3 files changed, 137 insertions(+), 15 deletions(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 1470a5a..391c099 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -1,23 +1,105 @@ + * @author Holger Reinhardt */ -class WorkCommand extends Command +class WorkCommand extends IlluminateCommand { + /** + * The console command name. + * + * @var string + */ protected $name = 'resque:work'; + /** + * @var \Hlgrrnhrdt\Resque\ResqueManager + */ + private $manager; + + /** + * @param \Hlgrrnhrdt\Resque\ResqueManager $manager + */ + public function __construct(ResqueManager $manager) + { + parent::__construct(); + $this->manager = $manager; + } + + /** + * Execute the console command. + * + * @return int + */ public function fire() { $queue = $this->option('queue'); - $interval = $this->option('interval'); - $count = $this->option('count'); + $interval = (int)$this->option('interval'); + $count = (int)$this->option('count'); + + $queues = explode(',', $queue); + $logLevel = $this->getLogLevel(); + + if ($count > 1) { + for ($i = 0; $i < $count; $i++) { + try { + $pid = $this->manager->fork(); + } catch (\RuntimeException $exception) { + $this->error($exception->getMessage()); + + return 1; + } + + if (0 === $pid) { + $this->startWorker($queues, $interval, $logLevel); + } + } + } else { + $this->startWorker($queues, $interval, $logLevel); + } + + return 0; + } + + /** + * @param array $queues + * @param int $logLevel + * @param int $interval + */ + private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) + { + $worker = $this->manager->startWorker($queues, $interval, $logLevel); + $this->info(sprintf('Starting worker %s', $worker)); + } + + /** + * @return int + */ + protected function getLogLevel() + { + switch ($this->verbosity) { + case OutputInterface::VERBOSITY_VERBOSE: + $logLevel = Resque_Worker::LOG_NORMAL; + break; + + case OutputInterface::VERBOSITY_VERY_VERBOSE: + $logLevel = Resque_Worker::LOG_VERBOSE; + break; + + default: + $logLevel = Resque_Worker::LOG_NONE; + } + + return $logLevel; } /** @@ -26,9 +108,15 @@ class WorkCommand extends Command protected function getOptions() { return [ - ['queue', null, InputOption::VALUE_IS_ARRAY & InputOption::VALUE_OPTIONAL, '', 'default'], - ['interval', null, InputOption::VALUE_OPTIONAL, '', 5], - ['count', null, InputOption::VALUE_OPTIONAL, '', 1], + [ + 'queue', + null, + InputOption::VALUE_IS_ARRAY & InputOption::VALUE_OPTIONAL, + 'The queue to work on', + 'default', + ], + ['interval', null, InputOption::VALUE_OPTIONAL, 'The queue to work on', 5], + ['count', null, InputOption::VALUE_OPTIONAL, 'The queue to work on', 1], ]; } diff --git a/src/ResqueManager.php b/src/ResqueManager.php index 21a8f4e..fc5f20b 100644 --- a/src/ResqueManager.php +++ b/src/ResqueManager.php @@ -2,6 +2,7 @@ namespace Hlgrrnhrdt\Resque; use Resque; +use Resque_Worker; use RuntimeException; /** @@ -12,13 +13,25 @@ use RuntimeException; class ResqueManager { /** - * @var PhpResque + * @var Resque */ - private $resque; + protected $resque; - public function __construct(Resque $resque) + /** + * @var bool + */ + protected $trackStatus = false; + + /** + * ResqueManager constructor. + * + * @param \Resque $resque + * @param bool $trackStatus + */ + public function __construct(Resque $resque, $trackStatus = false) { $this->resque = $resque; + $this->trackStatus = $trackStatus; } /** @@ -85,4 +98,20 @@ class ResqueManager return $pid; } + + /** + * @param array $queues + * @param int $interval + * @param int $logLevel + * + * @return \Resque_Worker + */ + public function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) + { + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + $worker->work($interval); + + return $worker; + } } diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index c7c5774..0b5b60f 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -3,6 +3,7 @@ namespace Hlgrrnhrdt\Resque; use Config; use Hlgrrnhrdt\Resque\Console\WorkCommand; +use Illuminate\Contracts\Foundation\Application; use Illuminate\Support\ServiceProvider; use Resque; @@ -24,20 +25,24 @@ class ResqueServiceProvider extends ServiceProvider $this->registerWorkCommand(); } + + protected function registerManager() { - $this->app->singleton(ResqueManager::class, function ($app) { + $this->app->singleton('resque.manager', function (Application $app) { $config = $app['config']['resque.connection']; + $resque = new Resque(); $resque->setBackend(implode(':', [$config['host'], $config['port']]), $config['db']); - return new ResqueManager($resque); + + return new ResqueManager($resque, $app['config']['resque.trackStatus']); }); } protected function registerWorkCommand() { - $this->app->singleton('command.resque.work', function () { - return new WorkCommand(); + $this->app->singleton('command.resque.work', function (Application $app) { + return new WorkCommand($app->make('resque.manager')); }); } } From 8bb8f723dcde188aa479671310ec5e3d3b4994e1 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 13:54:25 +0200 Subject: [PATCH 05/23] add config file --- config/resque.php | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 config/resque.php diff --git a/config/resque.php b/config/resque.php new file mode 100644 index 0000000..342269d --- /dev/null +++ b/config/resque.php @@ -0,0 +1,27 @@ + [ + 'host' => env('RESQUE_REDIS_HOST', 'localhost'), + 'port' => env('RESQUE_REDIS_PORT', 6379), + 'db' => env('RESQUE_REDIS_DB', 0), + ], + + /* + |-------------------------------------------------------------------------- + | Track Status + |-------------------------------------------------------------------------- + | + */ + + 'trackStatus' => env('RESQUE_TRACK_STATUS', false), + +]; From c2e84864944b98e4dd7a92a9d0a6507a3f529761 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 14:02:56 +0200 Subject: [PATCH 06/23] add command --- src/ResqueServiceProvider.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index 0b5b60f..b9e839d 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -1,7 +1,6 @@ app->singleton('command.resque.work', function (Application $app) { return new WorkCommand($app->make('resque.manager')); }); + $this->commands('command.resque.work'); } } From 02f2712fabd9dcc0c610745179faf380ea08bcce Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 14:05:07 +0200 Subject: [PATCH 07/23] remove Application type hinting --- src/ResqueServiceProvider.php | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index b9e839d..9d7a81f 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -2,7 +2,6 @@ namespace Hlgrrnhrdt\Resque; use Hlgrrnhrdt\Resque\Console\WorkCommand; -use Illuminate\Contracts\Foundation\Application; use Illuminate\Support\ServiceProvider; use Resque; @@ -24,24 +23,22 @@ class ResqueServiceProvider extends ServiceProvider $this->registerWorkCommand(); } - - protected function registerManager() { - $this->app->singleton('resque.manager', function (Application $app) { - $config = $app['config']['resque.connection']; + $this->app->singleton('resque.manager', function () { + $config = $this->app['config']['resque.connection']; $resque = new Resque(); $resque->setBackend(implode(':', [$config['host'], $config['port']]), $config['db']); - return new ResqueManager($resque, $app['config']['resque.trackStatus']); + return new ResqueManager($resque, $this->app['config']['resque.trackStatus']); }); } protected function registerWorkCommand() { - $this->app->singleton('command.resque.work', function (Application $app) { - return new WorkCommand($app->make('resque.manager')); + $this->app->singleton('command.resque.work', function () { + return new WorkCommand($this->app->make('resque.manager')); }); $this->commands('command.resque.work'); } From 8d88957e283e9f9f4b000183c7dd267860778a66 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 14:14:52 +0200 Subject: [PATCH 08/23] set proper option mode --- src/Console/WorkCommand.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 391c099..9622c78 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -111,7 +111,7 @@ class WorkCommand extends IlluminateCommand [ 'queue', null, - InputOption::VALUE_IS_ARRAY & InputOption::VALUE_OPTIONAL, + InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, 'The queue to work on', 'default', ], From 5fd6530f479813bb489f8f3ff32cf74bd7a7ded6 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 14:22:47 +0200 Subject: [PATCH 09/23] fix queue option on WorkCommand --- src/Console/WorkCommand.php | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 9622c78..7fcf699 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -46,7 +46,6 @@ class WorkCommand extends IlluminateCommand $interval = (int)$this->option('interval'); $count = (int)$this->option('count'); - $queues = explode(',', $queue); $logLevel = $this->getLogLevel(); if ($count > 1) { @@ -60,11 +59,11 @@ class WorkCommand extends IlluminateCommand } if (0 === $pid) { - $this->startWorker($queues, $interval, $logLevel); + $this->startWorker($queue, $interval, $logLevel); } } } else { - $this->startWorker($queues, $interval, $logLevel); + $this->startWorker($queue, $interval, $logLevel); } return 0; From e230e66b48632c1efc8ab6b89a6c7ade0969a08f Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 14:24:23 +0200 Subject: [PATCH 10/23] fix queue option on WorkCommand --- src/Console/WorkCommand.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 7fcf699..464d221 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -112,7 +112,7 @@ class WorkCommand extends IlluminateCommand null, InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, 'The queue to work on', - 'default', + ['default'], ], ['interval', null, InputOption::VALUE_OPTIONAL, 'The queue to work on', 5], ['count', null, InputOption::VALUE_OPTIONAL, 'The queue to work on', 1], From 0663765b4769a3360ca1f791d0e8043bcfa23139 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 15:02:27 +0200 Subject: [PATCH 11/23] fix detect duplicate job --- config/resque.php | 3 +-- src/Console/WorkCommand.php | 5 ++++- src/ResqueManager.php | 22 ++++++++-------------- src/ResqueServiceProvider.php | 2 +- 4 files changed, 14 insertions(+), 18 deletions(-) diff --git a/config/resque.php b/config/resque.php index 342269d..a1cfa11 100644 --- a/config/resque.php +++ b/config/resque.php @@ -10,8 +10,7 @@ return [ */ 'connection' => [ - 'host' => env('RESQUE_REDIS_HOST', 'localhost'), - 'port' => env('RESQUE_REDIS_PORT', 6379), + 'server' => env('RESQUE_REDIS_SERVER', 'localhost:6379'), 'db' => env('RESQUE_REDIS_DB', 0), ], diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 464d221..e7769bc 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -76,8 +76,11 @@ class WorkCommand extends IlluminateCommand */ private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) { - $worker = $this->manager->startWorker($queues, $interval, $logLevel); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + $this->info(sprintf('Starting worker %s', $worker)); + $worker->work($interval); } /** diff --git a/src/ResqueManager.php b/src/ResqueManager.php index fc5f20b..3d9fe03 100644 --- a/src/ResqueManager.php +++ b/src/ResqueManager.php @@ -62,10 +62,8 @@ class ResqueManager $queue = new Queue($job->queue()); foreach ($queue->jobs() as $queuedJob) { - if ($job->payload['class'] === get_class($queuedJob) && count(array_intersect($queuedJob->getArguments(), - $job->arguments())) === $job->arguments() - ) { - return ($trackStatus) ? new \Resque_Job_Status($job->payload['id']) : null; + if (true === $this->isDuplicateJob($job, $queuedJob)) { + return ($trackStatus) ? new \Resque_Job_Status($queuedJob->payload['id']) : null; } } @@ -100,18 +98,14 @@ class ResqueManager } /** - * @param array $queues - * @param int $interval - * @param int $logLevel + * @param \Hlgrrnhrdt\Resque\Job $job + * @param $queuedJob * - * @return \Resque_Worker + * @return bool */ - public function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) + private function isDuplicateJob(Job $job, \Resque_Job $queuedJob) { - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; - $worker->work($interval); - - return $worker; + return $queuedJob->payload['class'] === get_class($queuedJob) + && count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments()); } } diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index 9d7a81f..ec68fc3 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -29,7 +29,7 @@ class ResqueServiceProvider extends ServiceProvider $config = $this->app['config']['resque.connection']; $resque = new Resque(); - $resque->setBackend(implode(':', [$config['host'], $config['port']]), $config['db']); + $resque->setBackend($config['server'], $config['db']); return new ResqueManager($resque, $this->app['config']['resque.trackStatus']); }); From 8d6ebce6b904feadb532f56e18045c4b978caf13 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 15:11:13 +0200 Subject: [PATCH 12/23] allow queue to be prefixed --- config/resque.php | 1 + src/ResqueManager.php | 22 +++++++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/config/resque.php b/config/resque.php index a1cfa11..d10dd65 100644 --- a/config/resque.php +++ b/config/resque.php @@ -21,6 +21,7 @@ return [ | */ + 'queuePrefix' => env('RESQUE_QUEUE_PREFIX', null), 'trackStatus' => env('RESQUE_TRACK_STATUS', false), ]; diff --git a/src/ResqueManager.php b/src/ResqueManager.php index 3d9fe03..3195ad3 100644 --- a/src/ResqueManager.php +++ b/src/ResqueManager.php @@ -2,7 +2,6 @@ namespace Hlgrrnhrdt\Resque; use Resque; -use Resque_Worker; use RuntimeException; /** @@ -17,6 +16,11 @@ class ResqueManager */ protected $resque; + /** + * @var string + */ + protected $queuePrefix; + /** * @var bool */ @@ -26,12 +30,14 @@ class ResqueManager * ResqueManager constructor. * * @param \Resque $resque + * @param string $queuePrefix * @param bool $trackStatus */ - public function __construct(Resque $resque, $trackStatus = false) + public function __construct(Resque $resque, $queuePrefix, $trackStatus = false) { $this->resque = $resque; $this->trackStatus = $trackStatus; + $this->queuePrefix = $queuePrefix; } /** @@ -42,7 +48,7 @@ class ResqueManager */ public function enqueue(Job $job, $trackStatus = false) { - $id = $this->resque->enqueue($job->queue(), get_class($job), $job->arguments(), $trackStatus); + $id = $this->resque->enqueue($this->getQueueName($job), get_class($job), $job->arguments(), $trackStatus); if (true === $trackStatus) { return new \Resque_Job_Status($id); @@ -108,4 +114,14 @@ class ResqueManager return $queuedJob->payload['class'] === get_class($queuedJob) && count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments()); } + + private function getQueueName(Job $job) + { + $queue = $job->queue(); + if ($this->queuePrefix) { + $queue = implode(':', [$this->queuePrefix, $queue]); + } + + return $queue; + } } From fd166bbe821c4f951264b16c8e5cbce394f7fd6a Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 15:15:35 +0200 Subject: [PATCH 13/23] Apply queue prefix on everything --- src/Console/WorkCommand.php | 10 +++++++--- src/ResqueManager.php | 17 ++++++++++++++--- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index e7769bc..325ba38 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -42,7 +42,7 @@ class WorkCommand extends IlluminateCommand */ public function fire() { - $queue = $this->option('queue'); + $queues = $this->option('queue'); $interval = (int)$this->option('interval'); $count = (int)$this->option('count'); @@ -59,11 +59,11 @@ class WorkCommand extends IlluminateCommand } if (0 === $pid) { - $this->startWorker($queue, $interval, $logLevel); + $this->startWorker($queues, $interval, $logLevel); } } } else { - $this->startWorker($queue, $interval, $logLevel); + $this->startWorker($queues, $interval, $logLevel); } return 0; @@ -76,6 +76,10 @@ class WorkCommand extends IlluminateCommand */ private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) { + $queues = array_walk($queues, function ($queue) { + return $this->manager->getQueue($queue); + }); + $worker = new Resque_Worker($queues); $worker->logLevel = $logLevel; diff --git a/src/ResqueManager.php b/src/ResqueManager.php index 3195ad3..a54af90 100644 --- a/src/ResqueManager.php +++ b/src/ResqueManager.php @@ -48,7 +48,7 @@ class ResqueManager */ public function enqueue(Job $job, $trackStatus = false) { - $id = $this->resque->enqueue($this->getQueueName($job), get_class($job), $job->arguments(), $trackStatus); + $id = $this->resque->enqueue($this->getQueueFromJob($job), get_class($job), $job->arguments(), $trackStatus); if (true === $trackStatus) { return new \Resque_Job_Status($id); @@ -65,7 +65,7 @@ class ResqueManager */ public function enqueueOnce(Job $job, $trackStatus = false) { - $queue = new Queue($job->queue()); + $queue = new Queue($this->getQueueFromJob($job)); foreach ($queue->jobs() as $queuedJob) { if (true === $this->isDuplicateJob($job, $queuedJob)) { @@ -115,9 +115,20 @@ class ResqueManager && count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments()); } - private function getQueueName(Job $job) + private function getQueueFromJob(Job $job) { $queue = $job->queue(); + + return $this->getQueue($queue); + } + + /** + * @param string $queue + * + * @return string + */ + public function getQueue($queue) + { if ($this->queuePrefix) { $queue = implode(':', [$this->queuePrefix, $queue]); } From 175123695d32cbb7a4ca6d6a1725ee578e0fd433 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 15:23:33 +0200 Subject: [PATCH 14/23] Fix queue name prefixing --- src/Console/WorkCommand.php | 6 +++--- src/ResqueManager.php | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 325ba38..86ab95c 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -76,9 +76,9 @@ class WorkCommand extends IlluminateCommand */ private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) { - $queues = array_walk($queues, function ($queue) { - return $this->manager->getQueue($queue); - }); + $queues = array_map(function ($queue) { + return $this->manager->getQueueName($queue); + }, $queues); $worker = new Resque_Worker($queues); $worker->logLevel = $logLevel; diff --git a/src/ResqueManager.php b/src/ResqueManager.php index a54af90..36cb59d 100644 --- a/src/ResqueManager.php +++ b/src/ResqueManager.php @@ -48,7 +48,7 @@ class ResqueManager */ public function enqueue(Job $job, $trackStatus = false) { - $id = $this->resque->enqueue($this->getQueueFromJob($job), get_class($job), $job->arguments(), $trackStatus); + $id = $this->resque->enqueue($this->getQueueNameFromJob($job), get_class($job), $job->arguments(), $trackStatus); if (true === $trackStatus) { return new \Resque_Job_Status($id); @@ -65,7 +65,7 @@ class ResqueManager */ public function enqueueOnce(Job $job, $trackStatus = false) { - $queue = new Queue($this->getQueueFromJob($job)); + $queue = new Queue($this->getQueueNameFromJob($job)); foreach ($queue->jobs() as $queuedJob) { if (true === $this->isDuplicateJob($job, $queuedJob)) { @@ -115,11 +115,11 @@ class ResqueManager && count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments()); } - private function getQueueFromJob(Job $job) + private function getQueueNameFromJob(Job $job) { $queue = $job->queue(); - return $this->getQueue($queue); + return $this->getQueueName($queue); } /** @@ -127,7 +127,7 @@ class ResqueManager * * @return string */ - public function getQueue($queue) + public function getQueueName($queue) { if ($this->queuePrefix) { $queue = implode(':', [$this->queuePrefix, $queue]); From 9e7b3a658e34b33bc6d136c7bab80c2e6cb87129 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 18:03:41 +0200 Subject: [PATCH 15/23] Refactor --- composer.json | 4 +- config/resque.php | 2 +- src/Console/WorkCommand.php | 50 +++--------- src/Job.php | 10 ++- src/Queue.php | 35 ++++++--- src/Resque.php | 104 +++++++++++++++++++++++++ src/ResqueManager.php | 138 ---------------------------------- src/ResqueServiceProvider.php | 23 +++--- src/Worker.php | 52 +++++++++++++ 9 files changed, 215 insertions(+), 203 deletions(-) create mode 100644 src/Resque.php delete mode 100644 src/ResqueManager.php create mode 100644 src/Worker.php diff --git a/composer.json b/composer.json index bd9b2ce..bf83714 100644 --- a/composer.json +++ b/composer.json @@ -8,8 +8,8 @@ ], "require": { "illuminate/console": "^5.2", - "chrisboulton/php-resque": "^1.2", - "illuminate/config": "^5.2" + "illuminate/config": "^5.2", + "chrisboulton/php-resque": "dev-master" }, "require-dev": { "phpunit/phpunit": "^5.4" diff --git a/config/resque.php b/config/resque.php index d10dd65..61cf6f3 100644 --- a/config/resque.php +++ b/config/resque.php @@ -21,7 +21,7 @@ return [ | */ - 'queuePrefix' => env('RESQUE_QUEUE_PREFIX', null), + 'prefix' => env('RESQUE_PREFIX', null), 'trackStatus' => env('RESQUE_TRACK_STATUS', false), ]; diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 86ab95c..19ec972 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -1,11 +1,10 @@ manager = $manager; + $this->resque = $resque; } /** @@ -46,12 +45,10 @@ class WorkCommand extends IlluminateCommand $interval = (int)$this->option('interval'); $count = (int)$this->option('count'); - $logLevel = $this->getLogLevel(); - if ($count > 1) { for ($i = 0; $i < $count; $i++) { try { - $pid = $this->manager->fork(); + $pid = $this->resque->fork(); } catch (\RuntimeException $exception) { $this->error($exception->getMessage()); @@ -59,11 +56,11 @@ class WorkCommand extends IlluminateCommand } if (0 === $pid) { - $this->startWorker($queues, $interval, $logLevel); + $this->startWorker($queues, $interval); } } } else { - $this->startWorker($queues, $interval, $logLevel); + $this->startWorker($queues, $interval); } return 0; @@ -71,43 +68,16 @@ class WorkCommand extends IlluminateCommand /** * @param array $queues - * @param int $logLevel * @param int $interval */ - private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) + private function startWorker(array $queues, $interval = 5) { - $queues = array_map(function ($queue) { - return $this->manager->getQueueName($queue); - }, $queues); - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; $this->info(sprintf('Starting worker %s', $worker)); $worker->work($interval); } - /** - * @return int - */ - protected function getLogLevel() - { - switch ($this->verbosity) { - case OutputInterface::VERBOSITY_VERBOSE: - $logLevel = Resque_Worker::LOG_NORMAL; - break; - - case OutputInterface::VERBOSITY_VERY_VERBOSE: - $logLevel = Resque_Worker::LOG_VERBOSE; - break; - - default: - $logLevel = Resque_Worker::LOG_NONE; - } - - return $logLevel; - } - /** * {@inheritdoc} */ diff --git a/src/Job.php b/src/Job.php index cc52b73..1216ee7 100644 --- a/src/Job.php +++ b/src/Job.php @@ -36,7 +36,15 @@ abstract class Job */ public function arguments() { - return $this->$args; + return $this->args; + } + + /** + * @return string + */ + public function name() + { + return \get_class($this); } /** diff --git a/src/Queue.php b/src/Queue.php index 15212fa..b386d32 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -8,31 +8,44 @@ namespace Hlgrrnhrdt\Resque; */ class Queue { - protected $name; /** - * @var ResqueManager + * @var string */ - private $manager; + protected $name; /** - * Queue constructor. - * - * @param ResqueManager $manager + * @param string $name */ - public function __construct(ResqueManager $manager) + public function __construct($name) { - $this->manager = $manager; + $this->name = $name; } /** - * @return \Resque_Job[] + * @return string + */ + public function name() + { + return $this->name; + } + + /** + * @return int + */ + public function size() + { + return \Resque::size($this->name); + } + + /** + * @return Job[] */ public function jobs() { - $result = $this->manager->redis()->lrange('queue:' . $this->name, 0, -1); + $result = \Resque::redis()->lrange('queue:' . $this->name, 0, -1); $jobs = []; foreach ($result as $job) { - $jobs[] = new \Resque_Job($this->name, json_decode($job, true)); + $jobs[] = (new \Resque_Job($this->name, \json_decode($job, true)))->getInstance(); } return $jobs; diff --git a/src/Resque.php b/src/Resque.php new file mode 100644 index 0000000..6965d5e --- /dev/null +++ b/src/Resque.php @@ -0,0 +1,104 @@ + + */ +class Resque +{ + /** + * @var string + */ + protected $prefix; + + /** + * @var bool + */ + protected $trackStatus = false; + + /** + * + */ + public function __construct($connection = null) + { + + } + + /** + * @param string $prefix + */ + public function setPrefix($prefix) + { + \Resque_Redis::prefix($prefix); + } + + /** + * @param Job $job + * @param bool $trackStatus + * + * @return null|\Resque_Job_Status + */ + public function enqueue(Job $job, $trackStatus = false) + { + $id = \Resque::enqueue($job->queue(), $job->name(), $job->arguments(), $trackStatus); + + if (true === $trackStatus) { + return new \Resque_Job_Status($id); + } + + return null; + } + + /** + * @param Job $job + * @param bool $trackStatus + * + * @return null|\Resque_Job_Status + */ + public function enqueueOnce(Job $job, $trackStatus = false) + { + $queue = new Queue($job->queue()); + + foreach ($queue->jobs() as $queuedJob) { + if (true === $this->isDuplicateJob($job, $queuedJob)) { + return ($trackStatus) ? new \Resque_Job_Status($queuedJob->job->payload['id']) : null; + } + } + + return $this->enqueue($job, $trackStatus); + } + + /** + * @return \Resque_Redis + */ + public function redis() + { + return \Resque::redis(); + } + + /** + * @return int + * + * @throws RuntimeException + */ + public function fork() + { + return \Resque::fork(); + } + + /** + * @param Job $job + * @param Job $queuedJob + * + * @return bool + */ + private function isDuplicateJob(Job $job, Job $queuedJob) + { + return $job->name() === $queuedJob->name() + && count(array_intersect($job->arguments(), $queuedJob->arguments())) === count($job->arguments()); + } +} diff --git a/src/ResqueManager.php b/src/ResqueManager.php deleted file mode 100644 index 36cb59d..0000000 --- a/src/ResqueManager.php +++ /dev/null @@ -1,138 +0,0 @@ - - */ -class ResqueManager -{ - /** - * @var Resque - */ - protected $resque; - - /** - * @var string - */ - protected $queuePrefix; - - /** - * @var bool - */ - protected $trackStatus = false; - - /** - * ResqueManager constructor. - * - * @param \Resque $resque - * @param string $queuePrefix - * @param bool $trackStatus - */ - public function __construct(Resque $resque, $queuePrefix, $trackStatus = false) - { - $this->resque = $resque; - $this->trackStatus = $trackStatus; - $this->queuePrefix = $queuePrefix; - } - - /** - * @param Job $job - * @param bool $trackStatus - * - * @return null|\Resque_Job_Status - */ - public function enqueue(Job $job, $trackStatus = false) - { - $id = $this->resque->enqueue($this->getQueueNameFromJob($job), get_class($job), $job->arguments(), $trackStatus); - - if (true === $trackStatus) { - return new \Resque_Job_Status($id); - } - - return null; - } - - /** - * @param Job $job - * @param bool $trackStatus - * - * @return null|\Resque_Job_Status - */ - public function enqueueOnce(Job $job, $trackStatus = false) - { - $queue = new Queue($this->getQueueNameFromJob($job)); - - foreach ($queue->jobs() as $queuedJob) { - if (true === $this->isDuplicateJob($job, $queuedJob)) { - return ($trackStatus) ? new \Resque_Job_Status($queuedJob->payload['id']) : null; - } - } - - return $this->enqueue($job, $trackStatus); - } - - /** - * @return \Resque_Redis - */ - public function redis() - { - return $this->resque->redis(); - } - - /** - * @return int - * - * @throws RuntimeException - */ - public function fork() - { - if (false === function_exists('pcntl_fork')) { - return -1; - } - - $pid = pcntl_fork(); - if (-1 === $pid) { - throw new RuntimeException('Unable to fork child worker.'); - } - - return $pid; - } - - /** - * @param \Hlgrrnhrdt\Resque\Job $job - * @param $queuedJob - * - * @return bool - */ - private function isDuplicateJob(Job $job, \Resque_Job $queuedJob) - { - return $queuedJob->payload['class'] === get_class($queuedJob) - && count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments()); - } - - private function getQueueNameFromJob(Job $job) - { - $queue = $job->queue(); - - return $this->getQueueName($queue); - } - - /** - * @param string $queue - * - * @return string - */ - public function getQueueName($queue) - { - if ($this->queuePrefix) { - $queue = implode(':', [$this->queuePrefix, $queue]); - } - - return $queue; - } -} diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index ec68fc3..81a1fb1 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -3,7 +3,6 @@ namespace Hlgrrnhrdt\Resque; use Hlgrrnhrdt\Resque\Console\WorkCommand; use Illuminate\Support\ServiceProvider; -use Resque; /** * ResqueServiceProvider @@ -12,6 +11,15 @@ use Resque; */ class ResqueServiceProvider extends ServiceProvider { + /** + * + */ + public function boot() + { + $connection = $this->app['config']['resque.connection']; + \Resque::setBackend($connection['server'], $connection['db']); + } + /** * Register the service provider. * @@ -19,19 +27,14 @@ class ResqueServiceProvider extends ServiceProvider */ public function register() { - $this->registerManager(); + $this->registerResque(); $this->registerWorkCommand(); } - protected function registerManager() + protected function registerResque() { - $this->app->singleton('resque.manager', function () { - $config = $this->app['config']['resque.connection']; - - $resque = new Resque(); - $resque->setBackend($config['server'], $config['db']); - - return new ResqueManager($resque, $this->app['config']['resque.trackStatus']); + $this->app->singleton('resque', function () { + return new Resque(); }); } diff --git a/src/Worker.php b/src/Worker.php new file mode 100644 index 0000000..2c281d5 --- /dev/null +++ b/src/Worker.php @@ -0,0 +1,52 @@ + + */ +class Worker +{ + /** + * @var \Resque_Worker + */ + private $worker; + + /** + * @param \Resque_Worker $worker + */ + public function __construct(\Resque_Worker $worker) + { + $this->worker = $worker; + } + + /** + * @return string + */ + public function id() + { + return (string)$this->worker; + } + + /** + * @return bool + */ + public function stop() + { + list(, $pid,) = \explode(':', $this->id()); + return \posix_kill($pid, 3); + } + + /** + * @return Queue[] + */ + public function queues() + { + $queues = \array_map(function ($queue) { + return new Queue($queue); + }, $this->worker->queues()); + + return $queues; + } +} From 0519a18b389ccf6c71063dca6e095fb00f17180f Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Fri, 29 Jul 2016 18:04:55 +0200 Subject: [PATCH 16/23] prefix settable --- src/ResqueServiceProvider.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index 81a1fb1..50db87c 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -34,7 +34,8 @@ class ResqueServiceProvider extends ServiceProvider protected function registerResque() { $this->app->singleton('resque', function () { - return new Resque(); + $prefix = $connection = $this->app['config']['resque.prefix']; + return (new Resque())->setPrefix($prefix); }); } From a0868851ea10888154411975e279292a6dfd6b3d Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Sat, 30 Jul 2016 00:00:27 +0200 Subject: [PATCH 17/23] refactor config --- config/resque.php | 6 +++--- src/Resque.php | 12 ++---------- src/ResqueServiceProvider.php | 22 +++++++++++++++++----- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/config/resque.php b/config/resque.php index 61cf6f3..926755d 100644 --- a/config/resque.php +++ b/config/resque.php @@ -10,8 +10,9 @@ return [ */ 'connection' => [ - 'server' => env('RESQUE_REDIS_SERVER', 'localhost:6379'), - 'db' => env('RESQUE_REDIS_DB', 0), + 'host' => env('RESQUE_REDIS_HOST', 'localhost'), + 'port' => env('RESQUE_REDIS_PORT', 6379), + 'database' => env('RESQUE_REDIS_DATABASE', 0), ], /* @@ -22,6 +23,5 @@ return [ */ 'prefix' => env('RESQUE_PREFIX', null), - 'trackStatus' => env('RESQUE_TRACK_STATUS', false), ]; diff --git a/src/Resque.php b/src/Resque.php index 6965d5e..4d7f596 100644 --- a/src/Resque.php +++ b/src/Resque.php @@ -20,14 +20,6 @@ class Resque */ protected $trackStatus = false; - /** - * - */ - public function __construct($connection = null) - { - - } - /** * @param string $prefix */ @@ -64,7 +56,7 @@ class Resque $queue = new Queue($job->queue()); foreach ($queue->jobs() as $queuedJob) { - if (true === $this->isDuplicateJob($job, $queuedJob)) { + if (true === $this->isSameJob($job, $queuedJob)) { return ($trackStatus) ? new \Resque_Job_Status($queuedJob->job->payload['id']) : null; } } @@ -96,7 +88,7 @@ class Resque * * @return bool */ - private function isDuplicateJob(Job $job, Job $queuedJob) + protected function isSameJob(Job $job, Job $queuedJob) { return $job->name() === $queuedJob->name() && count(array_intersect($job->arguments(), $queuedJob->arguments())) === count($job->arguments()); diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index 50db87c..72c97f1 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -16,8 +16,7 @@ class ResqueServiceProvider extends ServiceProvider */ public function boot() { - $connection = $this->app['config']['resque.connection']; - \Resque::setBackend($connection['server'], $connection['db']); + $this->setRedisConfig(); } /** @@ -33,8 +32,8 @@ class ResqueServiceProvider extends ServiceProvider protected function registerResque() { - $this->app->singleton('resque', function () { - $prefix = $connection = $this->app['config']['resque.prefix']; + $this->app->singleton(Resque::class, function () { + $prefix = $this->app['config']['resque.prefix']; return (new Resque())->setPrefix($prefix); }); } @@ -42,8 +41,21 @@ class ResqueServiceProvider extends ServiceProvider protected function registerWorkCommand() { $this->app->singleton('command.resque.work', function () { - return new WorkCommand($this->app->make('resque.manager')); + return new WorkCommand($this->app->make(Resque::class)); }); $this->commands('command.resque.work'); } + + protected function setRedisConfig() + { + $config = $this->app['config']['resque.connection']; + + $host = isset($config['host']) ? $config['host'] : 'localhost'; + $port = isset($config['port']) ? $config['port'] : 6379; + $database = isset($config['database']) ? $config['database'] : 0; + + $server = implode(':', [$host, $port]); + + \Resque::setBackend($server, $database); + } } From f558730863535c96b178e7f4ebcc1e4e39ddd095 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Sat, 30 Jul 2016 00:30:29 +0200 Subject: [PATCH 18/23] changed php-resque package --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index bf83714..570bf22 100644 --- a/composer.json +++ b/composer.json @@ -9,7 +9,7 @@ "require": { "illuminate/console": "^5.2", "illuminate/config": "^5.2", - "chrisboulton/php-resque": "dev-master" + "danhunsaker/php-resque": "^1.3" }, "require-dev": { "phpunit/phpunit": "^5.4" From b99863c0ff48c769925151b817b990b339aea71c Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Sat, 30 Jul 2016 00:35:52 +0200 Subject: [PATCH 19/23] utilize fluent interface --- src/Resque.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Resque.php b/src/Resque.php index 4d7f596..1fe3715 100644 --- a/src/Resque.php +++ b/src/Resque.php @@ -22,10 +22,13 @@ class Resque /** * @param string $prefix + * + * @return Resque */ public function setPrefix($prefix) { \Resque_Redis::prefix($prefix); + return $this; } /** From 6f941a12cf2a0c76a000b1c663f3af0586b29a58 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Sat, 30 Jul 2016 01:43:10 +0200 Subject: [PATCH 20/23] remove possibility to run multiple workers with one command --- src/Console/WorkCommand.php | 44 +++++++++++-------------------------- 1 file changed, 13 insertions(+), 31 deletions(-) diff --git a/src/Console/WorkCommand.php b/src/Console/WorkCommand.php index 19ec972..9b01c38 100644 --- a/src/Console/WorkCommand.php +++ b/src/Console/WorkCommand.php @@ -20,6 +20,13 @@ class WorkCommand extends IlluminateCommand */ protected $name = 'resque:work'; + /** + * The console command description. + * + * @var string + */ + protected $description = 'Run a resque worker'; + /** * @var \Hlgrrnhrdt\Resque\Resque */ @@ -41,27 +48,12 @@ class WorkCommand extends IlluminateCommand */ public function fire() { - $queues = $this->option('queue'); + $queue = $this->option('queue'); $interval = (int)$this->option('interval'); - $count = (int)$this->option('count'); - if ($count > 1) { - for ($i = 0; $i < $count; $i++) { - try { - $pid = $this->resque->fork(); - } catch (\RuntimeException $exception) { - $this->error($exception->getMessage()); + $queues = explode(',', $queue); - return 1; - } - - if (0 === $pid) { - $this->startWorker($queues, $interval); - } - } - } else { - $this->startWorker($queues, $interval); - } + $this->startWorker($queues, $interval); return 0; } @@ -73,8 +65,7 @@ class WorkCommand extends IlluminateCommand private function startWorker(array $queues, $interval = 5) { $worker = new Resque_Worker($queues); - - $this->info(sprintf('Starting worker %s', $worker)); + $this->info(\sprintf('Starting worker %s', $worker)); $worker->work($interval); } @@ -84,17 +75,8 @@ class WorkCommand extends IlluminateCommand protected function getOptions() { return [ - [ - 'queue', - null, - InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, - 'The queue to work on', - ['default'], - ], - ['interval', null, InputOption::VALUE_OPTIONAL, 'The queue to work on', 5], - ['count', null, InputOption::VALUE_OPTIONAL, 'The queue to work on', 1], + ['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to work on', 'default'], + ['interval', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 5], ]; } - - } From cff14e320f29a833f0f556f2fba062cd6406d24b Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Sat, 30 Jul 2016 01:49:45 +0200 Subject: [PATCH 21/23] set default prefix --- config/resque.php | 2 +- src/ResqueServiceProvider.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/resque.php b/config/resque.php index 926755d..76bd51a 100644 --- a/config/resque.php +++ b/config/resque.php @@ -22,6 +22,6 @@ return [ | */ - 'prefix' => env('RESQUE_PREFIX', null), + 'prefix' => env('RESQUE_PREFIX', 'resque'), ]; diff --git a/src/ResqueServiceProvider.php b/src/ResqueServiceProvider.php index 72c97f1..16ae019 100644 --- a/src/ResqueServiceProvider.php +++ b/src/ResqueServiceProvider.php @@ -34,7 +34,7 @@ class ResqueServiceProvider extends ServiceProvider { $this->app->singleton(Resque::class, function () { $prefix = $this->app['config']['resque.prefix']; - return (new Resque())->setPrefix($prefix); + return (new Resque())->setPrefix($prefix ?: 'resque'); }); } From 90588406a00383b44dd34084c832d4842084213a Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Sun, 31 Jul 2016 22:58:04 +0200 Subject: [PATCH 22/23] Fix comparing jobs --- src/Job.php | 12 +++++++++++- src/Resque.php | 14 +------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/Job.php b/src/Job.php index 1216ee7..6a63bbc 100644 --- a/src/Job.php +++ b/src/Job.php @@ -36,7 +36,7 @@ abstract class Job */ public function arguments() { - return $this->args; + return $this->args ?: []; } /** @@ -58,5 +58,15 @@ abstract class Job return $this; } + /** + * @param Job $job + * + * @return bool + */ + public function equals(Job $job) + { + return $this->name() === $job->name() && $this->arguments() === $job->arguments(); + } + abstract public function perform(); } diff --git a/src/Resque.php b/src/Resque.php index 1fe3715..2dd5b26 100644 --- a/src/Resque.php +++ b/src/Resque.php @@ -59,7 +59,7 @@ class Resque $queue = new Queue($job->queue()); foreach ($queue->jobs() as $queuedJob) { - if (true === $this->isSameJob($job, $queuedJob)) { + if (true === $job->equals($queuedJob)) { return ($trackStatus) ? new \Resque_Job_Status($queuedJob->job->payload['id']) : null; } } @@ -84,16 +84,4 @@ class Resque { return \Resque::fork(); } - - /** - * @param Job $job - * @param Job $queuedJob - * - * @return bool - */ - protected function isSameJob(Job $job, Job $queuedJob) - { - return $job->name() === $queuedJob->name() - && count(array_intersect($job->arguments(), $queuedJob->arguments())) === count($job->arguments()); - } } From c905c734bbf2e8f27feeea98d421a2045a427b89 Mon Sep 17 00:00:00 2001 From: Holger Reinhardt Date: Mon, 1 Aug 2016 09:04:46 +0200 Subject: [PATCH 23/23] add installation instrcutions to README --- README.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/README.md b/README.md index 8b13789..ee41b9e 100644 --- a/README.md +++ b/README.md @@ -1 +1,40 @@ +## Features +This package provides tools for the following, and more: + +- Add jobs to queue + +## Installation + +Via composer + +``` bash +$ composer require hlgrrnhrdt/laravel-resque +``` + + + +Copy the config the file ```config/resque.php``` from the package to your config folder. + +### Laravel + + +``` php +'providers' => [ + Hlgrrnhrdt\Resque\ResqueServiceProvider::class +] +``` + +### Lumen + +Open ```bootstrap/app.php``` and register the required service provider + +``` php +$app->register(Hlgrrnhrdt\Resque\ResqueServiceProvider::class); +``` + +and load the config with +``` php + +$app->configure('resque'); +```