mirror of
https://github.com/idanoo/php-resque.git
synced 2024-11-21 16:01:53 +00:00
idanoo
80d64e79ff
2.0.0 (2021-02-19) Moved to PSR-4 Namespaced codebase Added more comments throughout Co-Authored-By: idanoo <daniel@m2.nz> Co-Committed-By: idanoo <daniel@m2.nz>
9.1 KiB
9.1 KiB
For an overview of how to use php-resque, see README.md
.
The following is a step-by-step breakdown of how php-resque operates.
Enqueue Job
What happens when you call Resque::enqueue()
?
Resque::enqueue()
callsResque_Job::create()
with the same arguments it received.Resque_Job::create()
checks that your$args
(the third argument) are eithernull
or in an arrayResque_Job::create()
generates a job ID (a "token" in most of the docs)Resque_Job::create()
pushes the job to the requested queue (first argument)Resque_Job::create()
, if status monitoring is enabled for the job (fourth argument), calls\Resque\Job\Status::create()
with the job ID as its only argument\Resque\Job\Status::create()
creates a key in Redis with the job ID in its name, and the current status (as well as a couple of timestamps) as its value, then returns control toResque_Job::create()
Resque_Job::create()
returns control toResque::enqueue()
, with the job ID as a return valueResque::enqueue()
triggers theafterEnqueue
event, then returns control to your application, again with the job ID as its return value
Workers At Work
How do the workers process the queues?
Resque_Worker::work()
, the main loop of the worker process, callsResque_Worker->reserve()
to check for a jobResque_Worker->reserve()
checks whether to use blocking pops or not (fromBLOCKING
), then acts accordingly:
- Blocking Pop
Resque_Worker->reserve()
callsResque_Job::reserveBlocking()
with the entire queue list and the timeout (fromINTERVAL
) as argumentsResque_Job::reserveBlocking()
callsResque::blpop()
(which in turn calls Redis'blpop
, after prepping the queue list for the call, then processes the response for consistency with other aspects of the library, before finally returning control [and the queue/content of the retrieved job, if any] toResque_Job::reserveBlocking()
)Resque_Job::reserveBlocking()
checks whether the job content is an array (it should contain the job's type [class], payload [args], and ID), and aborts processing if notResque_Job::reserveBlocking()
creates a newResque_Job
object with the queue and content as constructor arguments to initialize the job itself, and returns it, along with control of the process, toResque_Worker->reserve()
- Queue Polling
Resque_Worker->reserve()
iterates through the queue list, callingResque_Job::reserve()
with the current queue's name as the sole argument on each passResque_Job::reserve()
passes the queue name on toResque::pop()
, which in turn calls Redis'lpop
with the same argument, then returns control (and the job content, if any) toResque_Job::reserve()
Resque_Job::reserve()
checks whether the job content is an array (as before, it should contain the job's type [class], payload [args], and ID), and aborts processing if notResque_Job::reserve()
creates a newResque_Job
object in the same manner as above, and also returns this object (along with control of the process) toResque_Worker->reserve()
- In either case,
Resque_Worker->reserve()
returns the newResque_Job
object, along with control, up toResque_Worker::work()
; if no job is found, it simply returnsFALSE
- No Jobs
- If blocking mode is not enabled,
Resque_Worker::work()
sleeps forINTERVAL
seconds; it callsusleep()
for this, so fractional seconds are supported
- If blocking mode is not enabled,
- Job Reserved
Resque_Worker::work()
triggers abeforeFork
eventResque_Worker::work()
callsResque_Worker->workingOn()
with the newResque_Job
object as its argumentResque_Worker->workingOn()
does some reference assignments to help keep track of the worker/job relationship, then updates the job status fromWAITING
toRUNNING
Resque_Worker->workingOn()
stores the newResque_Job
object's payload in a Redis key associated to the worker itself (this is to prevent the job from being lost indefinitely, but does rely on that PID never being allocated on that host to a different worker process), then returns control toResque_Worker::work()
Resque_Worker::work()
forks a child process to run the actualperform()
- The next steps differ between the worker and the child, now running in separate processes:
- Worker
- The worker waits for the job process to complete
- If the exit status is not 0, the worker calls
Resque_Job->fail()
with aResque\Job\DirtyExitException
as its only argument. Resque_Job->fail()
triggers anonFailure
eventResque_Job->fail()
updates the job status fromRUNNING
toFAILED
Resque_Job->fail()
callsResque_Failure::create()
with the job payload, theResque\Job\DirtyExitException
, the internal ID of the worker, and the queue name as argumentsResque_Failure::create()
creates a new object of whatever type has been set as theResque_Failure
"backend" handler; by default, this is aResqueFailureRedis
object, whose constructor simply collects the data passed intoResque_Failure::create()
and pushes it into Redis in thefailed
queueResque_Job->fail()
increments two failure counters in Redis: one for a total count, and one for the workerResque_Job->fail()
returns control to the worker (still inResque_Worker::work()
) without a value
- Job
- The job calls
Resque_Worker->perform()
with theResque_Job
as its only argument. Resque_Worker->perform()
sets up atry...catch
block so it can properly handle exceptions by marking jobs as failed (by callingResque_Job->fail()
, as above)- Inside the
try...catch
,Resque_Worker->perform()
triggers anafterFork
event - Still inside the
try...catch
,Resque_Worker->perform()
callsResque_Job->perform()
with no arguments Resque_Job->perform()
callsResque_Job->getInstance()
with no arguments- If
Resque_Job->getInstance()
has already been called, it returns the existing instance; otherwise: Resque_Job->getInstance()
checks that the job's class (type) exists and has aperform()
method; if not, in either case, it throws an exception which will be caught byResque_Worker->perform()
Resque_Job->getInstance()
creates an instance of the job's class, and initializes it with a reference to theResque_Job
itself, the job's arguments (which it gets by callingResque_Job->getArguments()
, which in turn simply returns the value ofargs[0]
, or an empty array if no arguments were passed), and the queue nameResque_Job->getInstance()
returns control, along with the job class instance, toResque_Job->perform()
Resque_Job->perform()
sets up its owntry...catch
block to handleResque_Job_DontPerform
exceptions; any other exceptions are passed up toResque_Worker->perform()
Resque_Job->perform()
triggers abeforePerform
eventResque_Job->perform()
callssetUp()
on the instance, if it existsResque_Job->perform()
callsperform()
on the instanceResque_Job->perform()
callstearDown()
on the instance, if it existsResque_Job->perform()
triggers anafterPerform
event- The
try...catch
block ends, suppressingResque_Job_DontPerform
exceptions by returning control, and the valueFALSE
, toResque_Worker->perform()
; any other situation returns the valueTRUE
along with control, instead - The
try...catch
block inResque_Worker->perform()
ends Resque_Worker->perform()
updates the job status fromRUNNING
toCOMPLETE
, then returns control, with no value, to the worker (again still inResque_Worker::work()
)Resque_Worker::work()
callsexit(0)
to terminate the job process cleanly
- The job calls
- SPECIAL CASE: Non-forking OS (Windows)
- Same as the job above, except it doesn't call
exit(0)
when done
- Same as the job above, except it doesn't call
Resque_Worker::work()
callsResque_Worker->doneWorking()
with no argumentsResque_Worker->doneWorking()
increments two processed counters in Redis: one for a total count, and one for the workerResque_Worker->doneWorking()
deletes the Redis key set inResque_Worker->workingOn()
, then returns control, with no value, toResque_Worker::work()
Resque_Worker::work()
returns control to the beginning of the main loop, where it will wait for the next job to become available, and start this process all over again