2013-03-13 12:41:32 +01:00

588 lines
16 KiB

* Resque worker that handles checking queues for jobs, fetching them
* off the queues, running them and handling the result.
* @package Resque/Worker
* @author Chris Boulton <>
* @license
class Resque_Worker
const LOG_NONE = 0;
const LOG_NORMAL = 1;
const LOG_VERBOSE = 2;
* @var int Current log level of this worker.
public $logLevel = 0;
* @var array Array of all associated queues for this worker.
private $queues = array();
* @var string The hostname of this worker.
private $hostname;
* @var boolean True if on the next iteration, the worker should shutdown.
private $shutdown = false;
* @var boolean True if this worker is paused.
private $paused = false;
* @var string String identifying this worker.
private $id;
* @var Resque_Job Current job, if any, being processed by this worker.
private $currentJob = null;
* @var int Process ID of child worker processes.
private $child = null;
* Return all workers known to Resque as instantiated instances.
* @return array
public static function all()
$workers = Resque::redis()->smembers('workers');
if(!is_array($workers)) {
$workers = array();
$instances = array();
foreach($workers as $workerId) {
$instances[] = self::find($workerId);
return $instances;
* Given a worker ID, check if it is registered/valid.
* @param string $workerId ID of the worker.
* @return boolean True if the worker exists, false if not.
public static function exists($workerId)
return (bool)Resque::redis()->sismember('workers', $workerId);
* Given a worker ID, find it and return an instantiated worker class for it.
* @param string $workerId The ID of the worker.
* @return Resque_Worker Instance of the worker. False if the worker does not exist.
public static function find($workerId)
if(!self::exists($workerId) || false === strpos($workerId, ":")) {
return false;
list($hostname, $pid, $queues) = explode(':', $workerId, 3);
$queues = explode(',', $queues);
$worker = new self($queues);
return $worker;
* Set the ID of this worker to a given ID string.
* @param string $workerId ID for the worker.
public function setId($workerId)
$this->id = $workerId;
* Instantiate a new worker, given a list of queues that it should be working
* on. The list of queues should be supplied in the priority that they should
* be checked for jobs (first come, first served)
* Passing a single '*' allows the worker to work on all queues in alphabetical
* order. You can easily add new queues dynamically and have them worked on using
* this method.
* @param string|array $queues String with a single queue name, array with multiple.
public function __construct($queues)
if(!is_array($queues)) {
$queues = array($queues);
$this->queues = $queues;
if(function_exists('gethostname')) {
$hostname = gethostname();
else {
$hostname = php_uname('n');
$this->hostname = $hostname;
$this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);
* The primary loop for a worker which when called on an instance starts
* the worker's life cycle.
* Queues are checked every $interval (seconds) for new jobs.
* @param int $interval How often to check for new jobs across the queues.
public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
while(true) {
if($this->shutdown) {
// Attempt to find and reserve a job
$job = false;
if(!$this->paused) {
if($blocking === true) {
$this->log('Starting blocking with timeout of ' . $interval, self::LOG_VERBOSE);
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval);
} else {
$this->log('Starting with interval of ' . $interval, self::LOG_VERBOSE);
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
$job = $this->reserve($blocking, $interval);
if(!$job) {
// For an interval of 0, break now - helps with unit testing etc
if($interval == 0) {
if($blocking === false)
// If no job was found, we sleep for $interval before continuing and checking again
$this->log('Sleeping for ' . $interval, self::LOG_VERBOSE);
if($this->paused) {
else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
usleep($interval * 1000000);
$this->log('got ' . $job);
Resque_Event::trigger('beforeFork', $job);
$this->child = Resque::fork();
// Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->log($status, self::LOG_VERBOSE);
if ($this->child === 0) {
if($this->child > 0) {
// Parent process, sit and wait
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->log($status, self::LOG_VERBOSE);
// Wait until the child process finishes before continuing
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
$this->child = null;
* Process a single job.
* @param Resque_Job $job The job to be processed.
public function perform(Resque_Job $job)
try {
Resque_Event::trigger('afterFork', $job);
catch(Exception $e) {
$this->log($job . ' failed: ' . $e->getMessage());
$this->log('done ' . $job);
* @param bool $blocking
* @param int $timeout
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
public function reserve($blocking = false, $timeout = null)
$queues = $this->queues();
if(!is_array($queues)) {
if($blocking === true) {
$job = Resque_Job::reserveBlocking($queues, $timeout);
if($job) {
$this->log('Found job on ' . $job->queue, self::LOG_VERBOSE);
return $job;
} else {
foreach($queues as $queue) {
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
$job = Resque_Job::reserve($queue);
if($job) {
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
return $job;
return false;
* Return an array containing all of the queues that this worker should use
* when searching for jobs.
* If * is found in the list of queues, every queue will be searched in
* alphabetic order. (@see $fetch)
* @param boolean $fetch If true, and the queue is set to *, will fetch
* all queue names from redis.
* @return array Array of associated queues.
public function queues($fetch = true)
if(!in_array('*', $this->queues) || $fetch == false) {
return $this->queues;
$queues = Resque::queues();
return $queues;
* Perform necessary actions to start a worker.
private function startup()
Resque_Event::trigger('beforeFirstFork', $this);
* On supported systems (with the PECL proctitle module installed), update
* the name of the currently running process to indicate the current state
* of a worker.
* @param string $status The updated process title.
private function updateProcLine($status)
if(function_exists('setproctitle')) {
setproctitle('resque-' . Resque::VERSION . ': ' . $status);
* Register signal handlers that a worker should respond to.
* TERM: Shutdown immediately and stop processing jobs.
* INT: Shutdown immediately and stop processing jobs.
* QUIT: Shutdown after the current job finishes processing.
* USR1: Kill the forked child immediately and continue processing jobs.
private function registerSigHandlers()
if(!function_exists('pcntl_signal')) {
declare(ticks = 1);
pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
pcntl_signal(SIGINT, array($this, 'shutDownNow'));
pcntl_signal(SIGQUIT, array($this, 'shutdown'));
pcntl_signal(SIGUSR1, array($this, 'killChild'));
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection'));
$this->log('Registered signals', self::LOG_VERBOSE);
* Signal handler callback for USR2, pauses processing of new jobs.
public function pauseProcessing()
$this->log('USR2 received; pausing job processing');
$this->paused = true;
* Signal handler callback for CONT, resumes worker allowing it to pick
* up new jobs.
public function unPauseProcessing()
$this->log('CONT received; resuming job processing');
$this->paused = false;
* Signal handler for SIGPIPE, in the event the redis connection has gone away.
* Attempts to reconnect to redis, or raises an Exception.
public function reestablishRedisConnection()
$this->log('SIGPIPE received; attempting to reconnect');
* Schedule a worker for shutdown. Will finish processing the current job
* and when the timeout interval is reached, the worker will shut down.
public function shutdown()
$this->shutdown = true;
* Force an immediate shutdown of the worker, killing any child jobs
* currently running.
public function shutdownNow()
* Kill a forked child job immediately. The job it is processing will not
* be completed.
public function killChild()
if(!$this->child) {
$this->log('No child to kill.', self::LOG_VERBOSE);
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
posix_kill($this->child, SIGKILL);
$this->child = null;
else {
$this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE);
* Look for any workers which should be running on this server and if
* they're not, remove them from Redis.
* This is a form of garbage collection to handle cases where the
* server may have been killed and the Resque workers did not die gracefully
* and therefore leave state information in Redis.
public function pruneDeadWorkers()
$workerPids = $this->workerPids();
$workers = self::all();
foreach($workers as $worker) {
if (is_object($worker)) {
list($host, $pid, $queues) = explode(':', (string)$worker, 3);
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
$this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE);
* Return an array of process IDs for all of the Resque workers currently
* running on this machine.
* @return array Array of Resque worker process IDs.
public function workerPids()
$pids = array();
exec('ps -A -o pid,command | grep [r]esque', $cmdOutput);
foreach($cmdOutput as $line) {
list($pids[],) = explode(' ', trim($line), 2);
return $pids;
* Register this worker in Redis.
public function registerWorker()
Resque::redis()->sadd('workers', (string)$this);
Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y'));
* Unregister this worker in Redis. (shutdown etc)
public function unregisterWorker()
if(is_object($this->currentJob)) {
$this->currentJob->fail(new Resque_Job_DirtyExitException);
$id = (string)$this;
Resque::redis()->srem('workers', $id);
Resque::redis()->del('worker:' . $id);
Resque::redis()->del('worker:' . $id . ':started');
Resque_Stat::clear('processed:' . $id);
Resque_Stat::clear('failed:' . $id);
* Tell Redis which job we're currently working on.
* @param object $job Resque_Job instance containing the job we're working on.
public function workingOn(Resque_Job $job)
$job->worker = $this;
$this->currentJob = $job;
$data = json_encode(array(
'queue' => $job->queue,
'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'),
'payload' => $job->payload
Resque::redis()->set('worker:' . $job->worker, $data);
* Notify Redis that we've finished working on a job, clearing the working
* state and incrementing the job stats.
public function doneWorking()
$this->currentJob = null;
Resque_Stat::incr('processed:' . (string)$this);
Resque::redis()->del('worker:' . (string)$this);
* Generate a string representation of this worker.
* @return string String identifier for this worker instance.
public function __toString()
return $this->id;
* Output a given log message to STDOUT.
* @param string $message Message to output.
* @param int $logLevel The logging level to capture
public function log($message, $logLevel = self::LOG_NORMAL)
if ($logLevel > $this->logLevel) {
if ($this->logLevel == self::LOG_NORMAL) {
fwrite(STDOUT, "*** " . $message . "\n");
fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
* Return an object describing the job this worker is currently working on.
* @return object Object with details of current job.
public function job()
$job = Resque::redis()->get('worker:' . $this);
if(!$job) {
return array();
else {
return json_decode($job, true);
* Get a statistic belonging to this worker.
* @param string $stat Statistic to fetch.
* @return int Statistic value.
public function getStat($stat)
return Resque_Stat::get($stat . ':' . $this);