diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 28161c9..a6c3985 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -86,7 +86,7 @@ class Resque_Worker */ public static function find($workerId) { - if(!self::exists($workerId) || false === strpos($workerId, ":")) { + if(!self::exists($workerId) || false === strpos($workerId, ":")) { return false; } @@ -148,83 +148,83 @@ class Resque_Worker $this->updateProcLine('Starting'); $this->startup(); - while(true) { - if($this->shutdown) { - break; - } + while(true) { + if($this->shutdown) { + break; + } - // Attempt to find and reserve a job - $job = false; - if(!$this->paused) { - if($blocking === true) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); - } else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); - } + // Attempt to find and reserve a job + $job = false; + if(!$this->paused) { + if($blocking === true) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); + } else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + } - $job = $this->reserve($blocking, $interval); - } + $job = $this->reserve($blocking, $interval); + } - if(!$job) { - // For an interval of 0, break now - helps with unit testing etc - if($interval == 0) { - break; - } + if(!$job) { + // For an interval of 0, break now - helps with unit testing etc + if($interval == 0) { + break; + } - if($blocking === false) - { - // If no job was found, we sleep for $interval before continuing and checking again - $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); - if($this->paused) { - $this->updateProcLine('Paused'); - } - else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); - } + if($blocking === false) + { + // If no job was found, we sleep for $interval before continuing and checking again + $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); + if($this->paused) { + $this->updateProcLine('Paused'); + } + else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + } - usleep($interval * 1000000); - } + usleep($interval * 1000000); + } - continue; - } + continue; + } - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); - Resque_Event::trigger('beforeFork', $job); - $this->workingOn($job); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); + Resque_Event::trigger('beforeFork', $job); + $this->workingOn($job); - $this->child = Resque::fork(); + $this->child = Resque::fork(); - // Forked and we're the child. Run the job. - if ($this->child === 0 || $this->child === false) { - $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->logger->log(Psr\Log\LogLevel::INFO, $status); - $this->perform($job); - if ($this->child === 0) { - exit(0); - } - } + // Forked and we're the child. Run the job. + if ($this->child === 0 || $this->child === false) { + $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->logger->log(Psr\Log\LogLevel::INFO, $status); + $this->perform($job); + if ($this->child === 0) { + exit(0); + } + } - if($this->child > 0) { - // Parent process, sit and wait - $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->logger->log(Psr\Log\LogLevel::INFO, $status); + if($this->child > 0) { + // Parent process, sit and wait + $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->logger->log(Psr\Log\LogLevel::INFO, $status); - // Wait until the child process finishes before continuing - pcntl_wait($status); - $exitStatus = pcntl_wexitstatus($status); - if($exitStatus !== 0) { - $job->fail(new Resque_Job_DirtyExitException( - 'Job exited with exit code ' . $exitStatus - )); - } - } + // Wait until the child process finishes before continuing + pcntl_wait($status); + $exitStatus = pcntl_wexitstatus($status); + if($exitStatus !== 0) { + $job->fail(new Resque_Job_DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + } + } - $this->child = null; - $this->doneWorking(); - } + $this->child = null; + $this->doneWorking(); + } $this->unregisterWorker(); } @@ -241,46 +241,46 @@ class Resque_Worker $job->perform(); } catch(Exception $e) { - $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage())); + $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage())); $job->fail($e); return; } $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); - $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); + $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); } - /** - * @param bool $blocking - * @param int $timeout - * @return object|boolean Instance of Resque_Job if a job is found, false if not. - */ - public function reserve($blocking = false, $timeout = null) - { - $queues = $this->queues(); - if(!is_array($queues)) { - return; - } + /** + * @param bool $blocking + * @param int $timeout + * @return object|boolean Instance of Resque_Job if a job is found, false if not. + */ + public function reserve($blocking = false, $timeout = null) + { + $queues = $this->queues(); + if(!is_array($queues)) { + return; + } - if($blocking === true) { - $job = Resque_Job::reserveBlocking($queues, $timeout); - if($job) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); - return $job; - } - } else { - foreach($queues as $queue) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); - $job = Resque_Job::reserve($queue); - if($job) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); - return $job; - } - } - } + if($blocking === true) { + $job = Resque_Job::reserveBlocking($queues, $timeout); + if($job) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); + return $job; + } + } else { + foreach($queues as $queue) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); + $job = Resque_Job::reserve($queue); + if($job) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); + return $job; + } + } + } - return false; - } + return false; + } /** * Return an array containing all of the queues that this worker should use @@ -351,7 +351,7 @@ class Resque_Worker pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection')); - $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); } /** @@ -359,7 +359,7 @@ class Resque_Worker */ public function pauseProcessing() { - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); $this->paused = true; } @@ -369,7 +369,7 @@ class Resque_Worker */ public function unPauseProcessing() { - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); $this->paused = false; } @@ -379,7 +379,7 @@ class Resque_Worker */ public function reestablishRedisConnection() { - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect'); Resque::redis()->establishConnection(); } @@ -390,7 +390,7 @@ class Resque_Worker public function shutdown() { $this->shutdown = true; - $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); } /** @@ -410,18 +410,18 @@ class Resque_Worker public function killChild() { if(!$this->child) { - $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); return; } - $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { - $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); posix_kill($this->child, SIGKILL); $this->child = null; } else { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); $this->shutdown(); } } @@ -439,14 +439,14 @@ class Resque_Worker $workerPids = $this->workerPids(); $workers = self::all(); foreach($workers as $worker) { - if (is_object($worker)) { - list($host, $pid, $queues) = explode(':', (string)$worker, 3); - if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { - continue; - } - $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); - $worker->unregisterWorker(); - } + if (is_object($worker)) { + list($host, $pid, $queues) = explode(':', (string)$worker, 3); + if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { + continue; + } + $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); + $worker->unregisterWorker(); + } } }