add beforeEnqueue hook that allows the enqueue to be cancelled

This commit is contained in:
Chris Boulton 2014-09-18 23:20:04 +10:00
parent c335bc3555
commit a95c24b32e
5 changed files with 106 additions and 29 deletions

View File

@ -413,6 +413,18 @@ Called whenever a job fails. Arguments passed (in this order) include:
* Exception - The exception that was thrown when the job failed
* Resque_Job - The job that failed
#### beforeEnqueue ####
Called immediately before a job is enqueued using the `Resque::enqueue` method.
Arguments passed (in this order) include:
* Class - string containing the name of the job to be enqueued
* Arguments - array of arguments for the job
* Queue - string containing the name of the queue the job is to be enqueued in
* ID - string containing the token of the job to be enqueued
You can prevent enqueing of the job by throwing an exception of `Resque_Job_DontCreate`.
#### afterEnqueue ####
Called after a job has been queued using the `Resque::enqueue` method. Arguments passed

View File

@ -201,17 +201,24 @@ class Resque
*/
public static function enqueue($queue, $class, $args = null, $trackStatus = false)
{
$result = Resque_Job::create($queue, $class, $args, $trackStatus);
if ($result) {
Resque_Event::trigger('afterEnqueue', array(
'class' => $class,
'args' => $args,
'queue' => $queue,
'id' => $result,
));
$id = Resque::generateJobId();
$hookParams = array(
'class' => $class,
'args' => $args,
'queue' => $queue,
'id' => $id,
);
try {
Resque_Event::trigger('beforeEnqueue', $hookParams);
}
catch(Resque_Job_DontCreate $e) {
return $id;
}
return $result;
Resque_Job::create($queue, $class, $args, $trackStatus, $id);
Resque_Event::trigger('afterEnqueue', $hookParams);
return $id;
}
/**
@ -341,5 +348,15 @@ class Resque
$result = self::redis()->del('queue:' . $queue);
return ($result == 1) ? $counter : 0;
}
/*
* Generate an identifier to attach to a job for status tracking.
*
* @return string
*/
public static function generateJobId()
{
return md5(uniqid('', true));
}
}

View File

@ -50,14 +50,17 @@ class Resque_Job
*
* @return string
*/
public static function create($queue, $class, $args = null, $monitor = false)
public static function create($queue, $class, $args = null, $monitor = false, $id = null)
{
if (is_null($id)) {
$id = Resque::generateJobId();
}
if($args !== null && !is_array($args)) {
throw new InvalidArgumentException(
'Supplied $args must be an array.'
);
}
$id = md5(uniqid('', true));
Resque::push($queue, array(
'class' => $class,
'args' => array($args),

View File

@ -0,0 +1,12 @@
<?php
/**
* Exception to be thrown if while enqueuing a job it should not be created.
*
* @package Resque/Job
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_DontCreate extends Exception
{
}

View File

@ -9,11 +9,11 @@
class Resque_Tests_EventTest extends Resque_Tests_TestCase
{
private $callbacksHit = array();
public function setUp()
{
Test_Job::$called = false;
// Register a worker to test with
$this->worker = new Resque_Worker('jobs');
$this->worker->setLogger(new Resque_Log());
@ -38,7 +38,7 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
$job->worker = $this->worker;
return $job;
}
public function eventCallbackProvider()
{
return array(
@ -47,7 +47,7 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
array('afterFork', 'afterForkEventCallback'),
);
}
/**
* @dataProvider eventCallbackProvider
*/
@ -58,10 +58,10 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
$job = $this->getEventTestJob();
$this->worker->perform($job);
$this->worker->work(0);
$this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called');
}
public function testBeforeForkEventCallbackFires()
{
$event = 'beforeFork';
@ -76,6 +76,18 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
$this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called');
}
public function testBeforeEnqueueEventCallbackFires()
{
$event = 'beforeEnqueue';
$callback = 'beforeEnqueueEventCallback';
Resque_Event::listen($event, array($this, $callback));
Resque::enqueue('jobs', 'Test_Job', array(
'somevar'
));
$this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called');
}
public function testBeforePerformEventCanStopWork()
{
$callback = 'beforePerformEventDontPerformCallback';
@ -87,23 +99,34 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
$this->assertContains($callback, $this->callbacksHit, $callback . ' callback was not called');
$this->assertFalse(Test_Job::$called, 'Job was still performed though Resque_Job_DontPerform was thrown');
}
public function testBeforeEnqueueEventStopsJobCreation()
{
$callback = 'beforeEnqueueEventDontCreateCallback';
Resque_Event::listen('beforeEnqueue', array($this, $callback));
Resque_Event::listen('afterEnqueue', array($this, 'afterEnqueueEventCallback'));
Resque::enqueue('test_job', 'TestClass');
$this->assertContains($callback, $this->callbacksHit, $callback . ' callback was not called');
$this->assertNotContains('afterEnqueueEventCallback', $this->callbacksHit, 'afterEnqueue was still called, even though it should not have been');
}
public function testAfterEnqueueEventCallbackFires()
{
$callback = 'afterEnqueueEventCallback';
$event = 'afterEnqueue';
$event = 'afterEnqueue';
Resque_Event::listen($event, array($this, $callback));
Resque::enqueue('jobs', 'Test_Job', array(
'somevar'
));
));
$this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called');
}
public function testStopListeningRemovesListener()
{
$callback = 'beforePerformEventCallback';
$event = 'beforePerform';
$event = 'beforePerform';
Resque_Event::listen($event, array($this, $callback));
Resque_Event::stopListening($event, array($this, $callback));
@ -112,18 +135,23 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
$this->worker->perform($job);
$this->worker->work(0);
$this->assertNotContains($callback, $this->callbacksHit,
$this->assertNotContains($callback, $this->callbacksHit,
$event . ' callback (' . $callback .') was called though Resque_Event::stopListening was called'
);
}
public function beforePerformEventDontPerformCallback($instance)
{
$this->callbacksHit[] = __FUNCTION__;
throw new Resque_Job_DontPerform;
}
public function beforeEnqueueEventDontCreateCallback($queue, $class, $args, $track = false)
{
$this->callbacksHit[] = __FUNCTION__;
throw new Resque_Job_DontCreate;
}
public function assertValidEventCallback($function, $job)
{
$this->callbacksHit[] = $function;
@ -133,7 +161,7 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
$args = $job->getArguments();
$this->assertEquals($args[0], 'somevar');
}
public function afterEnqueueEventCallback($class, $args)
{
$this->callbacksHit[] = __FUNCTION__;
@ -142,12 +170,17 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
'somevar',
), $args);
}
public function beforeEnqueueEventCallback($job)
{
$this->callbacksHit[] = __FUNCTION__;
}
public function beforePerformEventCallback($job)
{
$this->assertValidEventCallback(__FUNCTION__, $job);
}
public function afterPerformEventCallback($job)
{
$this->assertValidEventCallback(__FUNCTION__, $job);
@ -157,7 +190,7 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
{
$this->assertValidEventCallback(__FUNCTION__, $job);
}
public function afterForkEventCallback($job)
{
$this->assertValidEventCallback(__FUNCTION__, $job);