From 6f43fcfed8b62120f9e1d614e179b43edf949e18 Mon Sep 17 00:00:00 2001 From: "chris.boulton" Date: Sun, 27 Mar 2011 18:42:46 +1100 Subject: [PATCH] Add a plugin/event/hook system --- README.markdown | 90 +++++++++++++++++++ TODO.markdown | 2 - lib/Resque.php | 11 ++- lib/Resque/Event.php | 89 +++++++++++++++++++ lib/Resque/Job.php | 85 +++++++++++++++--- lib/Resque/Job/DontPerform.php | 13 +++ lib/Resque/Worker.php | 6 +- test/Resque/Tests/EventTest.php | 149 ++++++++++++++++++++++++++++++++ test/Resque/Tests/JobTest.php | 2 +- test/Resque/Tests/bootstrap.php | 4 +- 10 files changed, 431 insertions(+), 20 deletions(-) create mode 100644 lib/Resque/Event.php create mode 100644 lib/Resque/Job/DontPerform.php create mode 100644 test/Resque/Tests/EventTest.php diff --git a/README.markdown b/README.markdown index 6c958ba..ab490bd 100644 --- a/README.markdown +++ b/README.markdown @@ -227,6 +227,96 @@ A PECL module () exists that adds this funcitonality to PHP, so if you'd like process titles updated, install the PECL module as well. php-resque will detect and use it. +## Event/Hook System ## + +php-resque has a basic event system that can be used by your application +to customize how some of the php-resque internals behave. + +You listen in on events (as listed below) by registering with `Resque_Event` +and supplying a callback that you would like triggered when the event is +raised: + + Resque_Event::listen('eventName', [callback]); + +`[callback]` may be anything in PHP that is callable by `call_user_func_array`: + +* A string with the name of a function +* An array containing an object and method to call +* An array containing an object and a static method to call +* A closure (PHP 5.3) + +Events may pass arguments (documented below), so your callback should accept +these arguments. + +You can stop listening to an event by calling `Resque_Event::stopListening` +with the same arguments supplied to `Resque_Event::listen`. + +It is up to your application to register event listeners. When enqueuing events +in your application, it should be as easy as making sure php-resque is loaded +and calling `Resque_Event::listen`. + +When running workers, if you run workers via the default `resque.php` script, +your `APP_INCLUDE` script should initialize and register any listeners required +for operation. If you have rolled your own worker manager, then it is again your +responsibility to register listeners. + +A sample plugin is included in the `extras` directory. + +### Events ### + +#### beforeFirstFork #### + +Called once, as a worker initializes. Argument passed is the instance of `Resque_Worker` +that was just initialized. + +#### beforeFork #### + +Called before php-resque forks to run a job. Argument passed contains the instance of +`Resque_Job` for the job about to be run. + +`beforeFork` is triggered in the **parent** process. Any changes made will be permanent +for as long as the worker lives. + +#### afterFork #### + +Called after php-resque forks to run a job (but before the job is run). Argument +passed contains the instance of `Resque_Job` for the job about to be run. + +`afterFork` is triggered in the child process after forking out to complete a job. Any +changes made will only live as long as the job is being processed. + +#### beforePerform #### + +Called before the `setUp` and `perform` methods on a job are run. Argument passed +contains the instance of `Resque_Job` about for the job about to be run. + +You can prevent execution of the job by throwing an exception of `Resque_Job_DontPerform`. +Any other exceptions thrown will be treated as if they were thrown in a job, causing the +job to fail. + +#### afterPerform #### + +Called after the `perform` and `tearDown` methods on a job are run. Argument passed +contains the instance of `Resque_Job` that was just run. + +Any exceptions thrown will be treated as if they were thrown in a job, causing the job +to be marked as having failed. + +#### onFailure #### + +Called whenever a job fails. Arguments passed (in this order) include: + +* Exception - The exception that was thrown when the job failed +* Resque_Job - The job that failed + +#### afterEnqueue #### + +Called after a job has been queued using the `Resque::enqueue` method. Arguments passed +(in this order) include: + +* Class - string containing the name of the class the job was scheduled in +* Arguments - array of arguments supplied to the job + ## Contributors ## * chrisboulton diff --git a/TODO.markdown b/TODO.markdown index d8ee07a..61ea867 100644 --- a/TODO.markdown +++ b/TODO.markdown @@ -1,8 +1,6 @@ * Write tests for: * `Resque_Failure` * `Resque_Failure_Redis` -* Plugin/hook type system similar to Ruby version (when done, implement the -setUp and tearDown methods as a plugin) * Change to preforking worker model * Clean up /bin and /demo * Add a way to store arbitrary text in job statuses (for things like progress diff --git a/lib/Resque.php b/lib/Resque.php index aa48f8d..168d849 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -1,4 +1,5 @@ $class, + 'args' => $args, + )); + } + + return $result; } /** diff --git a/lib/Resque/Event.php b/lib/Resque/Event.php new file mode 100644 index 0000000..2264ae2 --- /dev/null +++ b/lib/Resque/Event.php @@ -0,0 +1,89 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Event +{ + /** + * @var array Array containing all registered callbacks, indexked by event name. + */ + private static $events = array(); + + /** + * Raise a given event with the supplied data. + * + * @param string $event Name of event to be raised. + * @param mixed $data Optional, any data that should be passed to each callback. + * @return true + */ + public static function trigger($event, $data = null) + { + if (!is_array($data)) { + $data = array($data); + } + + if (empty(self::$events[$event])) { + return true; + } + + foreach (self::$events[$event] as $callback) { + if (!is_callable($callback)) { + continue; + } + call_user_func_array($callback, $data); + } + + return true; + } + + /** + * Listen in on a given event to have a specified callback fired. + * + * @param string $event Name of event to listen on. + * @param mixed $callback Any callback callable by call_user_func_array. + * @return true + */ + public static function listen($event, $callback) + { + if (!isset(self::$events[$event])) { + self::$events[$event] = array(); + } + + self::$events[$event][] = $callback; + return true; + } + + /** + * Stop a given callback from listening on a specific event. + * + * @param string $event Name of event. + * @param mixed $callback The callback as defined when listen() was called. + * @return true + */ + public static function stopListening($event, $callback) + { + if (!isset(self::$events[$event])) { + return true; + } + + $key = array_search($callback, self::$events[$event]); + if ($key !== false) { + unset(self::$events[$key]); + } + + return true; + } + + /** + * Call all registered listeners. + */ + public static function clearListeners() + { + self::$events = array(); + } +} \ No newline at end of file diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 86084f6..d2023c5 100644 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -1,5 +1,7 @@ payload['id']); return $status->get(); } - + /** - * Actually execute a job by calling the perform method on the class - * associated with the job with the supplied arguments. + * Get the arguments supplied to this job. * - * @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. + * @return array Array of arguments. */ - public function perform() + public function getArguments() { + if (!isset($this->payload['args'])) { + return array(); + } + + return $this->payload['args']; + } + + /** + * Get the instantiated object for this job that will be performing work. + * + * @return object Instance of the object that this job belongs to. + */ + public function getInstance() + { + if (!is_null($this->instance)) { + return $this->instance; + } + if(!class_exists($this->payload['class'])) { throw new Resque_Exception( 'Could not find job class ' . $this->payload['class'] . '.' @@ -130,18 +154,51 @@ class Resque_Job ); } - $instance = new $this->payload['class']; - $instance->args = $this->payload['args']; + $this->instance = new $this->payload['class']; + $this->instance->job = $this; + $this->instance->args = $this->getArguments(); + return $this->instance; + } - if(method_exists($instance, 'setUp')) { - $instance->setUp(); + /** + * Actually execute a job by calling the perform method on the class + * associated with the job with the supplied arguments. + * + * @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. + */ + public function perform() + { + $instance = $this->getInstance(); + try { + Resque_Event::trigger('beforePerform', $this); + + if(method_exists($instance, 'setUp')) { + $instance->setUp(); + } + + $instance->perform(); + + if(method_exists($instance, 'tearDown')) { + $instance->tearDown(); + } + + Resque_Event::trigger('afterPerform', $this); } - - $instance->perform(); - - if(method_exists($instance, 'tearDown')) { - $instance->tearDown(); + // beforePerform/setUp have said don't perform this job. Return. + catch(Resque_Job_DontPerform $e) { + return false; } + // Catch any other exception and raise the onFailure event. Rethrow + // the exception + catch(Exception $e) { + Resque_Event::trigger('onFailure', array( + 'exception' => $e, + 'job' => $this, + )); + throw $e; + } + + return true; } /** diff --git a/lib/Resque/Job/DontPerform.php b/lib/Resque/Job/DontPerform.php new file mode 100644 index 0000000..91d5c70 --- /dev/null +++ b/lib/Resque/Job/DontPerform.php @@ -0,0 +1,13 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_DontPerform extends Exception +{ + +} \ No newline at end of file diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index c59391f..aa834d5 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -1,5 +1,6 @@ updateProcLine('Waiting for ' . implode(',', $this->queues)); } - usleep($interval*1000000); + usleep($interval * 1000000); continue; } $this->log('got ' . $job); + Resque_Event::trigger('beforeFork', $job); $this->workingOn($job); $this->child = $this->fork(); @@ -231,6 +233,7 @@ class Resque_Worker public function perform(Resque_Job $job) { try { + Resque_Event::trigger('afterFork', $job); $job->perform(); } catch(Exception $e) { @@ -314,6 +317,7 @@ class Resque_Worker { $this->registerSigHandlers(); $this->pruneDeadWorkers(); + Resque_Event::trigger('beforeFirstFork'); $this->registerWorker(); } diff --git a/test/Resque/Tests/EventTest.php b/test/Resque/Tests/EventTest.php new file mode 100644 index 0000000..3d2a536 --- /dev/null +++ b/test/Resque/Tests/EventTest.php @@ -0,0 +1,149 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_EventTest extends Resque_Tests_TestCase +{ + private $callbacksHit = array(); + + public function setUp() + { + Test_Job::$called = false; + + // Register a worker to test with + $this->worker = new Resque_Worker('jobs'); + $this->worker->registerWorker(); + } + + public function tearDown() + { + Resque_Event::clearListeners(); + $this->callbacksHit = array(); + } + + public function getEventTestJob() + { + $payload = array( + 'class' => 'Test_Job', + 'args' => array( + 'somevar', + ), + ); + $job = new Resque_Job('jobs', $payload); + $job->worker = $this->worker; + return $job; + } + + public function eventCallbackProvider() + { + return array( + array('beforePerform', 'beforePerformEventCallback'), + array('afterPerform', 'afterPerformEventCallback'), + array('afterFork', 'afterForkEventCallback'), + ); + } + + /** + * @dataProvider eventCallbackProvider + */ + public function testEventCallbacksFire($event, $callback) + { + Resque_Event::listen($event, array($this, $callback)); + + $job = $this->getEventTestJob(); + $this->worker->perform($job); + $this->worker->work(0); + + $this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called'); + } + + public function testBeforeForkEventCallbackFires() + { + $event = 'beforeFork'; + $callback = 'beforeForkEventCallback'; + + Resque_Event::listen($event, array($this, $callback)); + Resque::enqueue('jobs', 'Test_Job', array( + 'somevar' + )); + $job = $this->getEventTestJob(); + $this->worker->work(0); + $this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called'); + } + + public function testBeforePerformEventCanStopWork() + { + $callback = 'beforePerformEventDontPerformCallback'; + Resque_Event::listen('beforePerform', array($this, $callback)); + + $job = $this->getEventTestJob(); + + $this->assertFalse($job->perform()); + $this->assertContains($callback, $this->callbacksHit, $callback . ' callback was not called'); + $this->assertFalse(Test_Job::$called, 'Job was still performed though Resque_Job_DontPerform was thrown'); + } + + public function testAfterEnqueueEventCallbackFires() + { + $callback = 'afterEnqueueEventCallback'; + $event = 'afterEnqueue'; + + Resque_Event::listen($event, array($this, $callback)); + Resque::enqueue('jobs', 'Test_Job', array( + 'somevar' + )); + $this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called'); + } + + public function beforePerformEventDontPerformCallback($instance) + { + $this->callbacksHit[] = __FUNCTION__; + throw new Resque_Job_DontPerform; + } + + public function assertValidEventCallback($function, $job) + { + $this->callbacksHit[] = $function; + if (!$job instanceof Resque_Job) { + $this->fail('Callback job argument is not an instance of Resque_Job'); + } + $args = $job->getArguments(); + $this->assertEquals($args[0], 'somevar'); + } + + public function afterEnqueueEventCallback($class, $args) + { + $this->callbacksHit[] = __FUNCTION__; + $this->assertEquals('Test_Job', $class); + $this->assertEquals(array( + 'somevar', + ), $args); + } + + public function beforePerformEventCallback($job) + { + $this->assertValidEventCallback(__FUNCTION__, $job); + } + + public function afterPerformEventCallback($job) + { + $this->assertValidEventCallback(__FUNCTION__, $job); + } + + public function beforeForkEventCallback($job) + { + $this->assertValidEventCallback(__FUNCTION__, $job); + } + + public function afterForkEventCallback($job) + { + $this->assertValidEventCallback(__FUNCTION__, $job); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index 1d1db56..df6187b 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -152,7 +152,7 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $this->assertTrue(Test_Job_With_SetUp::$called); } - public function testJobWithTearDownCallbackFiresSetUp() + public function testJobWithTearDownCallbackFiresTearDown() { $payload = array( 'class' => 'Test_Job_With_TearDown', diff --git a/test/Resque/Tests/bootstrap.php b/test/Resque/Tests/bootstrap.php index 03f7e56..eb84258 100644 --- a/test/Resque/Tests/bootstrap.php +++ b/test/Resque/Tests/bootstrap.php @@ -95,9 +95,11 @@ if(function_exists('pcntl_signal')) { class Test_Job { + public static $called = false; + public function perform() { - + self::$called = true; } }