This commit is contained in:
Holger Reinhardt 2016-07-29 18:03:41 +02:00
parent 175123695d
commit 9e7b3a658e
9 changed files with 215 additions and 203 deletions

View File

@ -8,8 +8,8 @@
], ],
"require": { "require": {
"illuminate/console": "^5.2", "illuminate/console": "^5.2",
"chrisboulton/php-resque": "^1.2", "illuminate/config": "^5.2",
"illuminate/config": "^5.2" "chrisboulton/php-resque": "dev-master"
}, },
"require-dev": { "require-dev": {
"phpunit/phpunit": "^5.4" "phpunit/phpunit": "^5.4"

View File

@ -21,7 +21,7 @@ return [
| |
*/ */
'queuePrefix' => env('RESQUE_QUEUE_PREFIX', null), 'prefix' => env('RESQUE_PREFIX', null),
'trackStatus' => env('RESQUE_TRACK_STATUS', false), 'trackStatus' => env('RESQUE_TRACK_STATUS', false),
]; ];

View File

@ -1,11 +1,10 @@
<?php <?php
namespace Hlgrrnhrdt\Resque\Console; namespace Hlgrrnhrdt\Resque\Console;
use Hlgrrnhrdt\Resque\ResqueManager; use Hlgrrnhrdt\Resque\Resque;
use Illuminate\Console\Command as IlluminateCommand; use Illuminate\Console\Command as IlluminateCommand;
use Resque_Worker; use Resque_Worker;
use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
/** /**
* WorkCommand * WorkCommand
@ -22,17 +21,17 @@ class WorkCommand extends IlluminateCommand
protected $name = 'resque:work'; protected $name = 'resque:work';
/** /**
* @var \Hlgrrnhrdt\Resque\ResqueManager * @var \Hlgrrnhrdt\Resque\Resque
*/ */
private $manager; private $resque;
/** /**
* @param \Hlgrrnhrdt\Resque\ResqueManager $manager * @param \Hlgrrnhrdt\Resque\Resque $resque
*/ */
public function __construct(ResqueManager $manager) public function __construct(Resque $resque)
{ {
parent::__construct(); parent::__construct();
$this->manager = $manager; $this->resque = $resque;
} }
/** /**
@ -46,12 +45,10 @@ class WorkCommand extends IlluminateCommand
$interval = (int)$this->option('interval'); $interval = (int)$this->option('interval');
$count = (int)$this->option('count'); $count = (int)$this->option('count');
$logLevel = $this->getLogLevel();
if ($count > 1) { if ($count > 1) {
for ($i = 0; $i < $count; $i++) { for ($i = 0; $i < $count; $i++) {
try { try {
$pid = $this->manager->fork(); $pid = $this->resque->fork();
} catch (\RuntimeException $exception) { } catch (\RuntimeException $exception) {
$this->error($exception->getMessage()); $this->error($exception->getMessage());
@ -59,11 +56,11 @@ class WorkCommand extends IlluminateCommand
} }
if (0 === $pid) { if (0 === $pid) {
$this->startWorker($queues, $interval, $logLevel); $this->startWorker($queues, $interval);
} }
} }
} else { } else {
$this->startWorker($queues, $interval, $logLevel); $this->startWorker($queues, $interval);
} }
return 0; return 0;
@ -71,43 +68,16 @@ class WorkCommand extends IlluminateCommand
/** /**
* @param array $queues * @param array $queues
* @param int $logLevel
* @param int $interval * @param int $interval
*/ */
private function startWorker(array $queues, $interval = 5, $logLevel = Resque_Worker::LOG_NONE) private function startWorker(array $queues, $interval = 5)
{ {
$queues = array_map(function ($queue) {
return $this->manager->getQueueName($queue);
}, $queues);
$worker = new Resque_Worker($queues); $worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
$this->info(sprintf('Starting worker %s', $worker)); $this->info(sprintf('Starting worker %s', $worker));
$worker->work($interval); $worker->work($interval);
} }
/**
* @return int
*/
protected function getLogLevel()
{
switch ($this->verbosity) {
case OutputInterface::VERBOSITY_VERBOSE:
$logLevel = Resque_Worker::LOG_NORMAL;
break;
case OutputInterface::VERBOSITY_VERY_VERBOSE:
$logLevel = Resque_Worker::LOG_VERBOSE;
break;
default:
$logLevel = Resque_Worker::LOG_NONE;
}
return $logLevel;
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */

View File

@ -36,7 +36,15 @@ abstract class Job
*/ */
public function arguments() public function arguments()
{ {
return $this->$args; return $this->args;
}
/**
* @return string
*/
public function name()
{
return \get_class($this);
} }
/** /**

View File

@ -8,31 +8,44 @@ namespace Hlgrrnhrdt\Resque;
*/ */
class Queue class Queue
{ {
protected $name;
/** /**
* @var ResqueManager * @var string
*/ */
private $manager; protected $name;
/** /**
* Queue constructor. * @param string $name
*
* @param ResqueManager $manager
*/ */
public function __construct(ResqueManager $manager) public function __construct($name)
{ {
$this->manager = $manager; $this->name = $name;
} }
/** /**
* @return \Resque_Job[] * @return string
*/
public function name()
{
return $this->name;
}
/**
* @return int
*/
public function size()
{
return \Resque::size($this->name);
}
/**
* @return Job[]
*/ */
public function jobs() public function jobs()
{ {
$result = $this->manager->redis()->lrange('queue:' . $this->name, 0, -1); $result = \Resque::redis()->lrange('queue:' . $this->name, 0, -1);
$jobs = []; $jobs = [];
foreach ($result as $job) { foreach ($result as $job) {
$jobs[] = new \Resque_Job($this->name, json_decode($job, true)); $jobs[] = (new \Resque_Job($this->name, \json_decode($job, true)))->getInstance();
} }
return $jobs; return $jobs;

104
src/Resque.php Normal file
View File

@ -0,0 +1,104 @@
<?php
namespace Hlgrrnhrdt\Resque;
use RuntimeException;
/**
* Resque
*
* @author Holger Reinhardt <hlgrrnhrdt@gmail.com>
*/
class Resque
{
/**
* @var string
*/
protected $prefix;
/**
* @var bool
*/
protected $trackStatus = false;
/**
*
*/
public function __construct($connection = null)
{
}
/**
* @param string $prefix
*/
public function setPrefix($prefix)
{
\Resque_Redis::prefix($prefix);
}
/**
* @param Job $job
* @param bool $trackStatus
*
* @return null|\Resque_Job_Status
*/
public function enqueue(Job $job, $trackStatus = false)
{
$id = \Resque::enqueue($job->queue(), $job->name(), $job->arguments(), $trackStatus);
if (true === $trackStatus) {
return new \Resque_Job_Status($id);
}
return null;
}
/**
* @param Job $job
* @param bool $trackStatus
*
* @return null|\Resque_Job_Status
*/
public function enqueueOnce(Job $job, $trackStatus = false)
{
$queue = new Queue($job->queue());
foreach ($queue->jobs() as $queuedJob) {
if (true === $this->isDuplicateJob($job, $queuedJob)) {
return ($trackStatus) ? new \Resque_Job_Status($queuedJob->job->payload['id']) : null;
}
}
return $this->enqueue($job, $trackStatus);
}
/**
* @return \Resque_Redis
*/
public function redis()
{
return \Resque::redis();
}
/**
* @return int
*
* @throws RuntimeException
*/
public function fork()
{
return \Resque::fork();
}
/**
* @param Job $job
* @param Job $queuedJob
*
* @return bool
*/
private function isDuplicateJob(Job $job, Job $queuedJob)
{
return $job->name() === $queuedJob->name()
&& count(array_intersect($job->arguments(), $queuedJob->arguments())) === count($job->arguments());
}
}

View File

@ -1,138 +0,0 @@
<?php
namespace Hlgrrnhrdt\Resque;
use Resque;
use RuntimeException;
/**
* Resque
*
* @author Holger Reinhardt <hlgrrnhrdt@gmail.com>
*/
class ResqueManager
{
/**
* @var Resque
*/
protected $resque;
/**
* @var string
*/
protected $queuePrefix;
/**
* @var bool
*/
protected $trackStatus = false;
/**
* ResqueManager constructor.
*
* @param \Resque $resque
* @param string $queuePrefix
* @param bool $trackStatus
*/
public function __construct(Resque $resque, $queuePrefix, $trackStatus = false)
{
$this->resque = $resque;
$this->trackStatus = $trackStatus;
$this->queuePrefix = $queuePrefix;
}
/**
* @param Job $job
* @param bool $trackStatus
*
* @return null|\Resque_Job_Status
*/
public function enqueue(Job $job, $trackStatus = false)
{
$id = $this->resque->enqueue($this->getQueueNameFromJob($job), get_class($job), $job->arguments(), $trackStatus);
if (true === $trackStatus) {
return new \Resque_Job_Status($id);
}
return null;
}
/**
* @param Job $job
* @param bool $trackStatus
*
* @return null|\Resque_Job_Status
*/
public function enqueueOnce(Job $job, $trackStatus = false)
{
$queue = new Queue($this->getQueueNameFromJob($job));
foreach ($queue->jobs() as $queuedJob) {
if (true === $this->isDuplicateJob($job, $queuedJob)) {
return ($trackStatus) ? new \Resque_Job_Status($queuedJob->payload['id']) : null;
}
}
return $this->enqueue($job, $trackStatus);
}
/**
* @return \Resque_Redis
*/
public function redis()
{
return $this->resque->redis();
}
/**
* @return int
*
* @throws RuntimeException
*/
public function fork()
{
if (false === function_exists('pcntl_fork')) {
return -1;
}
$pid = pcntl_fork();
if (-1 === $pid) {
throw new RuntimeException('Unable to fork child worker.');
}
return $pid;
}
/**
* @param \Hlgrrnhrdt\Resque\Job $job
* @param $queuedJob
*
* @return bool
*/
private function isDuplicateJob(Job $job, \Resque_Job $queuedJob)
{
return $queuedJob->payload['class'] === get_class($queuedJob)
&& count(array_intersect($queuedJob->getArguments(), $job->arguments())) === count($job->arguments());
}
private function getQueueNameFromJob(Job $job)
{
$queue = $job->queue();
return $this->getQueueName($queue);
}
/**
* @param string $queue
*
* @return string
*/
public function getQueueName($queue)
{
if ($this->queuePrefix) {
$queue = implode(':', [$this->queuePrefix, $queue]);
}
return $queue;
}
}

View File

@ -3,7 +3,6 @@ namespace Hlgrrnhrdt\Resque;
use Hlgrrnhrdt\Resque\Console\WorkCommand; use Hlgrrnhrdt\Resque\Console\WorkCommand;
use Illuminate\Support\ServiceProvider; use Illuminate\Support\ServiceProvider;
use Resque;
/** /**
* ResqueServiceProvider * ResqueServiceProvider
@ -12,6 +11,15 @@ use Resque;
*/ */
class ResqueServiceProvider extends ServiceProvider class ResqueServiceProvider extends ServiceProvider
{ {
/**
*
*/
public function boot()
{
$connection = $this->app['config']['resque.connection'];
\Resque::setBackend($connection['server'], $connection['db']);
}
/** /**
* Register the service provider. * Register the service provider.
* *
@ -19,19 +27,14 @@ class ResqueServiceProvider extends ServiceProvider
*/ */
public function register() public function register()
{ {
$this->registerManager(); $this->registerResque();
$this->registerWorkCommand(); $this->registerWorkCommand();
} }
protected function registerManager() protected function registerResque()
{ {
$this->app->singleton('resque.manager', function () { $this->app->singleton('resque', function () {
$config = $this->app['config']['resque.connection']; return new Resque();
$resque = new Resque();
$resque->setBackend($config['server'], $config['db']);
return new ResqueManager($resque, $this->app['config']['resque.trackStatus']);
}); });
} }

52
src/Worker.php Normal file
View File

@ -0,0 +1,52 @@
<?php
namespace Hlgrrnhrdt\Resque;
/**
* Worker
*
* @author Holger Reinhardt <holger.reinhardt@aboutyou.de>
*/
class Worker
{
/**
* @var \Resque_Worker
*/
private $worker;
/**
* @param \Resque_Worker $worker
*/
public function __construct(\Resque_Worker $worker)
{
$this->worker = $worker;
}
/**
* @return string
*/
public function id()
{
return (string)$this->worker;
}
/**
* @return bool
*/
public function stop()
{
list(, $pid,) = \explode(':', $this->id());
return \posix_kill($pid, 3);
}
/**
* @return Queue[]
*/
public function queues()
{
$queues = \array_map(function ($queue) {
return new Queue($queue);
}, $this->worker->queues());
return $queues;
}
}