Add a plugin/event/hook system

This commit is contained in:
chris.boulton 2011-03-27 18:42:46 +11:00
parent 4d4a5ffc89
commit 6f43fcfed8
10 changed files with 431 additions and 20 deletions

View File

@ -227,6 +227,96 @@ A PECL module (<http://pecl.php.net/package/proctitle>) exists that
adds this funcitonality to PHP, so if you'd like process titles updated, adds this funcitonality to PHP, so if you'd like process titles updated,
install the PECL module as well. php-resque will detect and use it. install the PECL module as well. php-resque will detect and use it.
## Event/Hook System ##
php-resque has a basic event system that can be used by your application
to customize how some of the php-resque internals behave.
You listen in on events (as listed below) by registering with `Resque_Event`
and supplying a callback that you would like triggered when the event is
raised:
Resque_Event::listen('eventName', [callback]);
`[callback]` may be anything in PHP that is callable by `call_user_func_array`:
* A string with the name of a function
* An array containing an object and method to call
* An array containing an object and a static method to call
* A closure (PHP 5.3)
Events may pass arguments (documented below), so your callback should accept
these arguments.
You can stop listening to an event by calling `Resque_Event::stopListening`
with the same arguments supplied to `Resque_Event::listen`.
It is up to your application to register event listeners. When enqueuing events
in your application, it should be as easy as making sure php-resque is loaded
and calling `Resque_Event::listen`.
When running workers, if you run workers via the default `resque.php` script,
your `APP_INCLUDE` script should initialize and register any listeners required
for operation. If you have rolled your own worker manager, then it is again your
responsibility to register listeners.
A sample plugin is included in the `extras` directory.
### Events ###
#### beforeFirstFork ####
Called once, as a worker initializes. Argument passed is the instance of `Resque_Worker`
that was just initialized.
#### beforeFork ####
Called before php-resque forks to run a job. Argument passed contains the instance of
`Resque_Job` for the job about to be run.
`beforeFork` is triggered in the **parent** process. Any changes made will be permanent
for as long as the worker lives.
#### afterFork ####
Called after php-resque forks to run a job (but before the job is run). Argument
passed contains the instance of `Resque_Job` for the job about to be run.
`afterFork` is triggered in the child process after forking out to complete a job. Any
changes made will only live as long as the job is being processed.
#### beforePerform ####
Called before the `setUp` and `perform` methods on a job are run. Argument passed
contains the instance of `Resque_Job` about for the job about to be run.
You can prevent execution of the job by throwing an exception of `Resque_Job_DontPerform`.
Any other exceptions thrown will be treated as if they were thrown in a job, causing the
job to fail.
#### afterPerform ####
Called after the `perform` and `tearDown` methods on a job are run. Argument passed
contains the instance of `Resque_Job` that was just run.
Any exceptions thrown will be treated as if they were thrown in a job, causing the job
to be marked as having failed.
#### onFailure ####
Called whenever a job fails. Arguments passed (in this order) include:
* Exception - The exception that was thrown when the job failed
* Resque_Job - The job that failed
#### afterEnqueue ####
Called after a job has been queued using the `Resque::enqueue` method. Arguments passed
(in this order) include:
* Class - string containing the name of the class the job was scheduled in
* Arguments - array of arguments supplied to the job
## Contributors ## ## Contributors ##
* chrisboulton * chrisboulton

View File

@ -1,8 +1,6 @@
* Write tests for: * Write tests for:
* `Resque_Failure` * `Resque_Failure`
* `Resque_Failure_Redis` * `Resque_Failure_Redis`
* Plugin/hook type system similar to Ruby version (when done, implement the
setUp and tearDown methods as a plugin)
* Change to preforking worker model * Change to preforking worker model
* Clean up /bin and /demo * Clean up /bin and /demo
* Add a way to store arbitrary text in job statuses (for things like progress * Add a way to store arbitrary text in job statuses (for things like progress

View File

@ -1,4 +1,5 @@
<?php <?php
require_once dirname(__FILE__) . '/Resque/Event.php';
require_once dirname(__FILE__) . '/Resque/Exception.php'; require_once dirname(__FILE__) . '/Resque/Exception.php';
/** /**
@ -103,7 +104,15 @@ class Resque
public static function enqueue($queue, $class, $args = null, $trackStatus = false) public static function enqueue($queue, $class, $args = null, $trackStatus = false)
{ {
require_once dirname(__FILE__) . '/Resque/Job.php'; require_once dirname(__FILE__) . '/Resque/Job.php';
return Resque_Job::create($queue, $class, $args, $trackStatus); $result = Resque_Job::create($queue, $class, $args, $trackStatus);
if ($result) {
Resque_Event::trigger('afterEnqueue', array(
'class' => $class,
'args' => $args,
));
}
return $result;
} }
/** /**

89
lib/Resque/Event.php Normal file
View File

@ -0,0 +1,89 @@
<?php
/**
* Resque event/plugin system class
*
* @package Resque/Event
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Event
{
/**
* @var array Array containing all registered callbacks, indexked by event name.
*/
private static $events = array();
/**
* Raise a given event with the supplied data.
*
* @param string $event Name of event to be raised.
* @param mixed $data Optional, any data that should be passed to each callback.
* @return true
*/
public static function trigger($event, $data = null)
{
if (!is_array($data)) {
$data = array($data);
}
if (empty(self::$events[$event])) {
return true;
}
foreach (self::$events[$event] as $callback) {
if (!is_callable($callback)) {
continue;
}
call_user_func_array($callback, $data);
}
return true;
}
/**
* Listen in on a given event to have a specified callback fired.
*
* @param string $event Name of event to listen on.
* @param mixed $callback Any callback callable by call_user_func_array.
* @return true
*/
public static function listen($event, $callback)
{
if (!isset(self::$events[$event])) {
self::$events[$event] = array();
}
self::$events[$event][] = $callback;
return true;
}
/**
* Stop a given callback from listening on a specific event.
*
* @param string $event Name of event.
* @param mixed $callback The callback as defined when listen() was called.
* @return true
*/
public static function stopListening($event, $callback)
{
if (!isset(self::$events[$event])) {
return true;
}
$key = array_search($callback, self::$events[$event]);
if ($key !== false) {
unset(self::$events[$key]);
}
return true;
}
/**
* Call all registered listeners.
*/
public static function clearListeners()
{
self::$events = array();
}
}

View File

@ -1,5 +1,7 @@
<?php <?php
require_once dirname(__FILE__) . '/Event.php';
require_once dirname(__FILE__) . '/Job/Status.php'; require_once dirname(__FILE__) . '/Job/Status.php';
require_once dirname(__FILE__) . '/Job/DontPerform.php';
/** /**
* Resque job. * Resque job.
@ -26,6 +28,11 @@ class Resque_Job
*/ */
public $payload; public $payload;
/**
* @var object Instance of the class performing work for this job.
*/
private $instance;
/** /**
* Instantiate a new instance of a job. * Instantiate a new instance of a job.
* *
@ -111,13 +118,30 @@ class Resque_Job
} }
/** /**
* Actually execute a job by calling the perform method on the class * Get the arguments supplied to this job.
* associated with the job with the supplied arguments.
* *
* @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. * @return array Array of arguments.
*/ */
public function perform() public function getArguments()
{ {
if (!isset($this->payload['args'])) {
return array();
}
return $this->payload['args'];
}
/**
* Get the instantiated object for this job that will be performing work.
*
* @return object Instance of the object that this job belongs to.
*/
public function getInstance()
{
if (!is_null($this->instance)) {
return $this->instance;
}
if(!class_exists($this->payload['class'])) { if(!class_exists($this->payload['class'])) {
throw new Resque_Exception( throw new Resque_Exception(
'Could not find job class ' . $this->payload['class'] . '.' 'Could not find job class ' . $this->payload['class'] . '.'
@ -130,8 +154,23 @@ class Resque_Job
); );
} }
$instance = new $this->payload['class']; $this->instance = new $this->payload['class'];
$instance->args = $this->payload['args']; $this->instance->job = $this;
$this->instance->args = $this->getArguments();
return $this->instance;
}
/**
* Actually execute a job by calling the perform method on the class
* associated with the job with the supplied arguments.
*
* @throws Resque_Exception When the job's class could not be found or it does not contain a perform method.
*/
public function perform()
{
$instance = $this->getInstance();
try {
Resque_Event::trigger('beforePerform', $this);
if(method_exists($instance, 'setUp')) { if(method_exists($instance, 'setUp')) {
$instance->setUp(); $instance->setUp();
@ -142,6 +181,24 @@ class Resque_Job
if(method_exists($instance, 'tearDown')) { if(method_exists($instance, 'tearDown')) {
$instance->tearDown(); $instance->tearDown();
} }
Resque_Event::trigger('afterPerform', $this);
}
// beforePerform/setUp have said don't perform this job. Return.
catch(Resque_Job_DontPerform $e) {
return false;
}
// Catch any other exception and raise the onFailure event. Rethrow
// the exception
catch(Exception $e) {
Resque_Event::trigger('onFailure', array(
'exception' => $e,
'job' => $this,
));
throw $e;
}
return true;
} }
/** /**

View File

@ -0,0 +1,13 @@
<?php
/**
* Exception to be thrown if a job should not be performed/run.
*
* @package Resque/Job
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_DontPerform extends Exception
{
}

View File

@ -1,5 +1,6 @@
<?php <?php
require_once dirname(__FILE__) . '/Stat.php'; require_once dirname(__FILE__) . '/Stat.php';
require_once dirname(__FILE__) . '/Event.php';
require_once dirname(__FILE__) . '/Job.php'; require_once dirname(__FILE__) . '/Job.php';
require_once dirname(__FILE__) . '/Job/DirtyExitException.php'; require_once dirname(__FILE__) . '/Job/DirtyExitException.php';
@ -180,11 +181,12 @@ class Resque_Worker
else { else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues)); $this->updateProcLine('Waiting for ' . implode(',', $this->queues));
} }
usleep($interval*1000000); usleep($interval * 1000000);
continue; continue;
} }
$this->log('got ' . $job); $this->log('got ' . $job);
Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job); $this->workingOn($job);
$this->child = $this->fork(); $this->child = $this->fork();
@ -231,6 +233,7 @@ class Resque_Worker
public function perform(Resque_Job $job) public function perform(Resque_Job $job)
{ {
try { try {
Resque_Event::trigger('afterFork', $job);
$job->perform(); $job->perform();
} }
catch(Exception $e) { catch(Exception $e) {
@ -314,6 +317,7 @@ class Resque_Worker
{ {
$this->registerSigHandlers(); $this->registerSigHandlers();
$this->pruneDeadWorkers(); $this->pruneDeadWorkers();
Resque_Event::trigger('beforeFirstFork');
$this->registerWorker(); $this->registerWorker();
} }

View File

@ -0,0 +1,149 @@
<?php
require_once dirname(__FILE__) . '/bootstrap.php';
/**
* Resque_Event tests.
*
* @package Resque/Tests
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Tests_EventTest extends Resque_Tests_TestCase
{
private $callbacksHit = array();
public function setUp()
{
Test_Job::$called = false;
// Register a worker to test with
$this->worker = new Resque_Worker('jobs');
$this->worker->registerWorker();
}
public function tearDown()
{
Resque_Event::clearListeners();
$this->callbacksHit = array();
}
public function getEventTestJob()
{
$payload = array(
'class' => 'Test_Job',
'args' => array(
'somevar',
),
);
$job = new Resque_Job('jobs', $payload);
$job->worker = $this->worker;
return $job;
}
public function eventCallbackProvider()
{
return array(
array('beforePerform', 'beforePerformEventCallback'),
array('afterPerform', 'afterPerformEventCallback'),
array('afterFork', 'afterForkEventCallback'),
);
}
/**
* @dataProvider eventCallbackProvider
*/
public function testEventCallbacksFire($event, $callback)
{
Resque_Event::listen($event, array($this, $callback));
$job = $this->getEventTestJob();
$this->worker->perform($job);
$this->worker->work(0);
$this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called');
}
public function testBeforeForkEventCallbackFires()
{
$event = 'beforeFork';
$callback = 'beforeForkEventCallback';
Resque_Event::listen($event, array($this, $callback));
Resque::enqueue('jobs', 'Test_Job', array(
'somevar'
));
$job = $this->getEventTestJob();
$this->worker->work(0);
$this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called');
}
public function testBeforePerformEventCanStopWork()
{
$callback = 'beforePerformEventDontPerformCallback';
Resque_Event::listen('beforePerform', array($this, $callback));
$job = $this->getEventTestJob();
$this->assertFalse($job->perform());
$this->assertContains($callback, $this->callbacksHit, $callback . ' callback was not called');
$this->assertFalse(Test_Job::$called, 'Job was still performed though Resque_Job_DontPerform was thrown');
}
public function testAfterEnqueueEventCallbackFires()
{
$callback = 'afterEnqueueEventCallback';
$event = 'afterEnqueue';
Resque_Event::listen($event, array($this, $callback));
Resque::enqueue('jobs', 'Test_Job', array(
'somevar'
));
$this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called');
}
public function beforePerformEventDontPerformCallback($instance)
{
$this->callbacksHit[] = __FUNCTION__;
throw new Resque_Job_DontPerform;
}
public function assertValidEventCallback($function, $job)
{
$this->callbacksHit[] = $function;
if (!$job instanceof Resque_Job) {
$this->fail('Callback job argument is not an instance of Resque_Job');
}
$args = $job->getArguments();
$this->assertEquals($args[0], 'somevar');
}
public function afterEnqueueEventCallback($class, $args)
{
$this->callbacksHit[] = __FUNCTION__;
$this->assertEquals('Test_Job', $class);
$this->assertEquals(array(
'somevar',
), $args);
}
public function beforePerformEventCallback($job)
{
$this->assertValidEventCallback(__FUNCTION__, $job);
}
public function afterPerformEventCallback($job)
{
$this->assertValidEventCallback(__FUNCTION__, $job);
}
public function beforeForkEventCallback($job)
{
$this->assertValidEventCallback(__FUNCTION__, $job);
}
public function afterForkEventCallback($job)
{
$this->assertValidEventCallback(__FUNCTION__, $job);
}
}

View File

@ -152,7 +152,7 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase
$this->assertTrue(Test_Job_With_SetUp::$called); $this->assertTrue(Test_Job_With_SetUp::$called);
} }
public function testJobWithTearDownCallbackFiresSetUp() public function testJobWithTearDownCallbackFiresTearDown()
{ {
$payload = array( $payload = array(
'class' => 'Test_Job_With_TearDown', 'class' => 'Test_Job_With_TearDown',

View File

@ -95,9 +95,11 @@ if(function_exists('pcntl_signal')) {
class Test_Job class Test_Job
{ {
public static $called = false;
public function perform() public function perform()
{ {
self::$called = true;
} }
} }