Convert indentation in the Worker class.... to . . . . tabs

This was painful for me as a PSR 1-2 follower
This commit is contained in:
Trent Petersen 2013-07-10 10:07:19 -05:00
parent 5cef885a06
commit 25a804d93d

View File

@ -86,7 +86,7 @@ class Resque_Worker
*/ */
public static function find($workerId) public static function find($workerId)
{ {
if(!self::exists($workerId) || false === strpos($workerId, ":")) { if(!self::exists($workerId) || false === strpos($workerId, ":")) {
return false; return false;
} }
@ -148,83 +148,83 @@ class Resque_Worker
$this->updateProcLine('Starting'); $this->updateProcLine('Starting');
$this->startup(); $this->startup();
while(true) { while(true) {
if($this->shutdown) { if($this->shutdown) {
break; break;
} }
// Attempt to find and reserve a job // Attempt to find and reserve a job
$job = false; $job = false;
if(!$this->paused) { if(!$this->paused) {
if($blocking === true) { if($blocking === true) {
$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval);
} else { } else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
} }
$job = $this->reserve($blocking, $interval); $job = $this->reserve($blocking, $interval);
} }
if(!$job) { if(!$job) {
// For an interval of 0, break now - helps with unit testing etc // For an interval of 0, break now - helps with unit testing etc
if($interval == 0) { if($interval == 0) {
break; break;
} }
if($blocking === false) if($blocking === false)
{ {
// If no job was found, we sleep for $interval before continuing and checking again // If no job was found, we sleep for $interval before continuing and checking again
$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
if($this->paused) { if($this->paused) {
$this->updateProcLine('Paused'); $this->updateProcLine('Paused');
} }
else { else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues)); $this->updateProcLine('Waiting for ' . implode(',', $this->queues));
} }
usleep($interval * 1000000); usleep($interval * 1000000);
} }
continue; continue;
} }
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
Resque_Event::trigger('beforeFork', $job); Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job); $this->workingOn($job);
$this->child = Resque::fork(); $this->child = Resque::fork();
// Forked and we're the child. Run the job. // Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) { if ($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status); $this->updateProcLine($status);
$this->logger->log(Psr\Log\LogLevel::INFO, $status); $this->logger->log(Psr\Log\LogLevel::INFO, $status);
$this->perform($job); $this->perform($job);
if ($this->child === 0) { if ($this->child === 0) {
exit(0); exit(0);
} }
} }
if($this->child > 0) { if($this->child > 0) {
// Parent process, sit and wait // Parent process, sit and wait
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->updateProcLine($status); $this->updateProcLine($status);
$this->logger->log(Psr\Log\LogLevel::INFO, $status); $this->logger->log(Psr\Log\LogLevel::INFO, $status);
// Wait until the child process finishes before continuing // Wait until the child process finishes before continuing
pcntl_wait($status); pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status); $exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) { if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException( $job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus 'Job exited with exit code ' . $exitStatus
)); ));
} }
} }
$this->child = null; $this->child = null;
$this->doneWorking(); $this->doneWorking();
} }
$this->unregisterWorker(); $this->unregisterWorker();
} }
@ -241,46 +241,46 @@ class Resque_Worker
$job->perform(); $job->perform();
} }
catch(Exception $e) { catch(Exception $e) {
$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage())); $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage()));
$job->fail($e); $job->fail($e);
return; return;
} }
$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
} }
/** /**
* @param bool $blocking * @param bool $blocking
* @param int $timeout * @param int $timeout
* @return object|boolean Instance of Resque_Job if a job is found, false if not. * @return object|boolean Instance of Resque_Job if a job is found, false if not.
*/ */
public function reserve($blocking = false, $timeout = null) public function reserve($blocking = false, $timeout = null)
{ {
$queues = $this->queues(); $queues = $this->queues();
if(!is_array($queues)) { if(!is_array($queues)) {
return; return;
} }
if($blocking === true) { if($blocking === true) {
$job = Resque_Job::reserveBlocking($queues, $timeout); $job = Resque_Job::reserveBlocking($queues, $timeout);
if($job) { if($job) {
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
return $job; return $job;
} }
} else { } else {
foreach($queues as $queue) { foreach($queues as $queue) {
$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
$job = Resque_Job::reserve($queue); $job = Resque_Job::reserve($queue);
if($job) { if($job) {
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
return $job; return $job;
} }
} }
} }
return false; return false;
} }
/** /**
* Return an array containing all of the queues that this worker should use * Return an array containing all of the queues that this worker should use
@ -351,7 +351,7 @@ class Resque_Worker
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection')); pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection'));
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals');
} }
/** /**
@ -359,7 +359,7 @@ class Resque_Worker
*/ */
public function pauseProcessing() public function pauseProcessing()
{ {
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
$this->paused = true; $this->paused = true;
} }
@ -369,7 +369,7 @@ class Resque_Worker
*/ */
public function unPauseProcessing() public function unPauseProcessing()
{ {
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
$this->paused = false; $this->paused = false;
} }
@ -379,7 +379,7 @@ class Resque_Worker
*/ */
public function reestablishRedisConnection() public function reestablishRedisConnection()
{ {
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect'); $this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect');
Resque::redis()->establishConnection(); Resque::redis()->establishConnection();
} }
@ -390,7 +390,7 @@ class Resque_Worker
public function shutdown() public function shutdown()
{ {
$this->shutdown = true; $this->shutdown = true;
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down');
} }
/** /**
@ -410,18 +410,18 @@ class Resque_Worker
public function killChild() public function killChild()
{ {
if(!$this->child) { if(!$this->child) {
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.');
return; return;
} }
$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
posix_kill($this->child, SIGKILL); posix_kill($this->child, SIGKILL);
$this->child = null; $this->child = null;
} }
else { else {
$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
$this->shutdown(); $this->shutdown();
} }
} }
@ -439,14 +439,14 @@ class Resque_Worker
$workerPids = $this->workerPids(); $workerPids = $this->workerPids();
$workers = self::all(); $workers = self::all();
foreach($workers as $worker) { foreach($workers as $worker) {
if (is_object($worker)) { if (is_object($worker)) {
list($host, $pid, $queues) = explode(':', (string)$worker, 3); list($host, $pid, $queues) = explode(':', (string)$worker, 3);
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
continue; continue;
} }
$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
$worker->unregisterWorker(); $worker->unregisterWorker();
} }
} }
} }