Added basic logging class for a fallback

Corrected unit tests for new code

Add partial unit coverage for logging class

Add newline to logTest file
This commit is contained in:
Rockstar04 2013-06-25 10:06:37 -07:00
parent 7aa9abdb2e
commit 726e58a297
8 changed files with 161 additions and 56 deletions

View file

@ -9,14 +9,10 @@
*/
class Resque_Worker
{
const LOG_NONE = 0;
const LOG_NORMAL = 1;
const LOG_VERBOSE = 2;
/**
* @var int Current log level of this worker.
*/
public $logLevel = 0;
* @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
*/
public $logger;
/**
* @var array Array of all associated queues for this worker.
@ -161,7 +157,7 @@ class Resque_Worker
$job = false;
if(!$this->paused) {
if($blocking === true) {
$this->log('Starting blocking with timeout of ' . $interval, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval);
} else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
@ -179,7 +175,7 @@ class Resque_Worker
if($blocking === false)
{
// If no job was found, we sleep for $interval before continuing and checking again
$this->log('Sleeping for ' . $interval, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
if($this->paused) {
$this->updateProcLine('Paused');
}
@ -193,7 +189,7 @@ class Resque_Worker
continue;
}
$this->log('got ' . $job);
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job);
@ -203,7 +199,7 @@ class Resque_Worker
if ($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, $status);
$this->perform($job);
if ($this->child === 0) {
exit(0);
@ -214,7 +210,7 @@ class Resque_Worker
// Parent process, sit and wait
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, $status);
// Wait until the child process finishes before continuing
pcntl_wait($status);
@ -245,13 +241,13 @@ class Resque_Worker
$job->perform();
}
catch(Exception $e) {
$this->log($job . ' failed: ' . $e->getMessage());
$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage()));
$job->fail($e);
return;
}
$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
$this->log('done ' . $job);
$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
}
/**
@ -269,15 +265,15 @@ class Resque_Worker
if($blocking === true) {
$job = Resque_Job::reserveBlocking($queues, $timeout);
if($job) {
$this->log('Found job on ' . $job->queue, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
return $job;
}
} else {
foreach($queues as $queue) {
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
$job = Resque_Job::reserve($queue);
if($job) {
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
return $job;
}
}
@ -355,7 +351,7 @@ class Resque_Worker
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection'));
$this->log('Registered signals', self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals');
}
/**
@ -363,7 +359,7 @@ class Resque_Worker
*/
public function pauseProcessing()
{
$this->log('USR2 received; pausing job processing');
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
$this->paused = true;
}
@ -373,7 +369,7 @@ class Resque_Worker
*/
public function unPauseProcessing()
{
$this->log('CONT received; resuming job processing');
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
$this->paused = false;
}
@ -383,7 +379,7 @@ class Resque_Worker
*/
public function reestablishRedisConnection()
{
$this->log('SIGPIPE received; attempting to reconnect');
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect');
Resque::redis()->establishConnection();
}
@ -394,7 +390,7 @@ class Resque_Worker
public function shutdown()
{
$this->shutdown = true;
$this->log('Exiting...');
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down');
}
/**
@ -414,18 +410,18 @@ class Resque_Worker
public function killChild()
{
if(!$this->child) {
$this->log('No child to kill.', self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.');
return;
}
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
posix_kill($this->child, SIGKILL);
$this->child = null;
}
else {
$this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
$this->shutdown();
}
}
@ -448,7 +444,7 @@ class Resque_Worker
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
continue;
}
$this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE);
$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
$worker->unregisterWorker();
}
}
@ -536,26 +532,6 @@ class Resque_Worker
return $this->id;
}
/**
* Output a given log message to STDOUT.
*
* @param string $message Message to output.
* @param int $logLevel The logging level to capture
*/
public function log($message, $logLevel = self::LOG_NORMAL)
{
if ($logLevel > $this->logLevel) {
return;
}
if ($this->logLevel == self::LOG_NORMAL) {
fwrite(STDOUT, "*** " . $message . "\n");
return;
}
fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
}
/**
* Return an object describing the job this worker is currently working on.
*
@ -582,5 +558,15 @@ class Resque_Worker
{
return Resque_Stat::get($stat . ':' . $this);
}
/**
* Inject the logging object into the worker
*
* @param Psr\Log\LoggerInterface $logger
*/
public function setLogger(Psr\Log\LoggerInterface $logger)
{
$this->logger = $logger;
}
}
?>