mirror of
https://github.com/idanoo/php-resque.git
synced 2024-11-22 00:11:53 +00:00
Merge pull request #115 from Rockstar04/logging
PSR-3 Compliant Logging
This commit is contained in:
commit
468f1ce78a
25
bin/resque
25
bin/resque
@ -39,15 +39,15 @@ if(!empty($REDIS_BACKEND)) {
|
||||
Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB);
|
||||
}
|
||||
|
||||
$logLevel = 0;
|
||||
$logLevel = false;
|
||||
$LOGGING = getenv('LOGGING');
|
||||
$VERBOSE = getenv('VERBOSE');
|
||||
$VVERBOSE = getenv('VVERBOSE');
|
||||
if(!empty($LOGGING) || !empty($VERBOSE)) {
|
||||
$logLevel = Resque_Worker::LOG_NORMAL;
|
||||
$logLevel = true;
|
||||
}
|
||||
else if(!empty($VVERBOSE)) {
|
||||
$logLevel = Resque_Worker::LOG_VERBOSE;
|
||||
$logLevel = true;
|
||||
}
|
||||
|
||||
$APP_INCLUDE = getenv('APP_INCLUDE');
|
||||
@ -59,6 +59,12 @@ if($APP_INCLUDE) {
|
||||
require_once $APP_INCLUDE;
|
||||
}
|
||||
|
||||
// See if the APP_INCLUDE containes a logger object,
|
||||
// If none exists, fallback to internal logger
|
||||
if (!isset($logger) && !is_object($logger)) {
|
||||
$logger = new Resque_Log($logLevel);
|
||||
}
|
||||
|
||||
$BLOCKING = getenv('BLOCKING') !== FALSE;
|
||||
|
||||
$interval = 5;
|
||||
@ -75,7 +81,7 @@ if(!empty($COUNT) && $COUNT > 1) {
|
||||
|
||||
$PREFIX = getenv('PREFIX');
|
||||
if(!empty($PREFIX)) {
|
||||
fwrite(STDOUT, '*** Prefix set to '.$PREFIX."\n");
|
||||
$logger->log(Psr\Log\LogLevel::INFO, 'Prefix set to {prefix}', array('prefix' => $PREFIX));
|
||||
Resque_Redis::prefix($PREFIX);
|
||||
}
|
||||
|
||||
@ -83,14 +89,15 @@ if($count > 1) {
|
||||
for($i = 0; $i < $count; ++$i) {
|
||||
$pid = Resque::fork();
|
||||
if($pid == -1) {
|
||||
die("Could not fork worker ".$i."\n");
|
||||
$logger->log(Psr\Log\LogLevel::EMERGENCY, 'Could not fork worker {count}', array('count' => $i));
|
||||
die();
|
||||
}
|
||||
// Child, start the worker
|
||||
else if(!$pid) {
|
||||
$queues = explode(',', $QUEUE);
|
||||
$worker = new Resque_Worker($queues);
|
||||
$worker->logLevel = $logLevel;
|
||||
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
|
||||
$worker->setLogger($logger);
|
||||
$logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker));
|
||||
$worker->work($interval, $BLOCKING);
|
||||
break;
|
||||
}
|
||||
@ -100,7 +107,7 @@ if($count > 1) {
|
||||
else {
|
||||
$queues = explode(',', $QUEUE);
|
||||
$worker = new Resque_Worker($queues);
|
||||
$worker->logLevel = $logLevel;
|
||||
$worker->setLogger($logger);
|
||||
|
||||
$PIDFILE = getenv('PIDFILE');
|
||||
if ($PIDFILE) {
|
||||
@ -108,7 +115,7 @@ else {
|
||||
die('Could not write PID information to ' . $PIDFILE);
|
||||
}
|
||||
|
||||
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
|
||||
$logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker));
|
||||
$worker->work($interval, $BLOCKING);
|
||||
}
|
||||
?>
|
@ -20,7 +20,8 @@
|
||||
"require": {
|
||||
"php": ">=5.3.0",
|
||||
"ext-pcntl": "*",
|
||||
"colinmollenhour/credis": "1.2.*"
|
||||
"colinmollenhour/credis": "1.2.*",
|
||||
"psr/log": "1.0.0"
|
||||
},
|
||||
"suggest": {
|
||||
"ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.",
|
||||
|
77
composer.lock
generated
77
composer.lock
generated
@ -3,7 +3,7 @@
|
||||
"This file locks the dependencies of your project to a known state",
|
||||
"Read more about it at http://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file"
|
||||
],
|
||||
"hash": "1f551c3cdade1b7ff7d9e32a44eeb3dc",
|
||||
"hash": "646010a06695709794f1bc38392e392f",
|
||||
"packages": [
|
||||
{
|
||||
"name": "colinmollenhour/credis",
|
||||
@ -42,21 +42,59 @@
|
||||
"description": "Credis is a lightweight interface to the Redis key-value store which wraps the phpredis library when available for better performance.",
|
||||
"homepage": "https://github.com/colinmollenhour/credis",
|
||||
"time": "2013-03-19 02:57:04"
|
||||
},
|
||||
{
|
||||
"name": "psr/log",
|
||||
"version": "1.0.0",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/php-fig/log",
|
||||
"reference": "1.0.0"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://github.com/php-fig/log/archive/1.0.0.zip",
|
||||
"reference": "1.0.0",
|
||||
"shasum": ""
|
||||
},
|
||||
"type": "library",
|
||||
"autoload": {
|
||||
"psr-0": {
|
||||
"Psr\\Log\\": ""
|
||||
}
|
||||
},
|
||||
"notification-url": "https://packagist.org/downloads/",
|
||||
"license": [
|
||||
"MIT"
|
||||
],
|
||||
"authors": [
|
||||
{
|
||||
"name": "PHP-FIG",
|
||||
"homepage": "http://www.php-fig.org/"
|
||||
}
|
||||
],
|
||||
"description": "Common interface for logging libraries",
|
||||
"keywords": [
|
||||
"log",
|
||||
"psr",
|
||||
"psr-3"
|
||||
],
|
||||
"time": "2012-12-21 11:40:51"
|
||||
}
|
||||
],
|
||||
"packages-dev": [
|
||||
{
|
||||
"name": "phpunit/php-code-coverage",
|
||||
"version": "1.2.9",
|
||||
"version": "1.2.11",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/sebastianbergmann/php-code-coverage.git",
|
||||
"reference": "1.2.9"
|
||||
"reference": "1.2.11"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/1.2.9",
|
||||
"reference": "1.2.9",
|
||||
"url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/1.2.11",
|
||||
"reference": "1.2.11",
|
||||
"shasum": ""
|
||||
},
|
||||
"require": {
|
||||
@ -65,6 +103,9 @@
|
||||
"phpunit/php-text-template": ">=1.1.1@stable",
|
||||
"phpunit/php-token-stream": ">=1.1.3@stable"
|
||||
},
|
||||
"require-dev": {
|
||||
"phpunit/phpunit": "3.7.*"
|
||||
},
|
||||
"suggest": {
|
||||
"ext-dom": "*",
|
||||
"ext-xdebug": ">=2.0.5"
|
||||
@ -96,7 +137,7 @@
|
||||
"testing",
|
||||
"xunit"
|
||||
],
|
||||
"time": "2013-02-26 18:55:56"
|
||||
"time": "2013-05-23 18:23:24"
|
||||
},
|
||||
{
|
||||
"name": "phpunit/php-file-iterator",
|
||||
@ -278,16 +319,16 @@
|
||||
},
|
||||
{
|
||||
"name": "phpunit/phpunit",
|
||||
"version": "3.7.18",
|
||||
"version": "3.7.21",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/sebastianbergmann/phpunit.git",
|
||||
"reference": "3.7.18"
|
||||
"reference": "3.7.21"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/3.7.18",
|
||||
"reference": "3.7.18",
|
||||
"url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/3.7.21",
|
||||
"reference": "3.7.21",
|
||||
"shasum": ""
|
||||
},
|
||||
"require": {
|
||||
@ -301,7 +342,7 @@
|
||||
"phpunit/php-text-template": ">=1.1.1",
|
||||
"phpunit/php-timer": ">=1.0.2,<1.1.0",
|
||||
"phpunit/phpunit-mock-objects": ">=1.2.0,<1.3.0",
|
||||
"symfony/yaml": ">=2.2.0"
|
||||
"symfony/yaml": ">=2.0,<3.0"
|
||||
},
|
||||
"require-dev": {
|
||||
"pear-pear/pear": "1.9.4"
|
||||
@ -348,7 +389,7 @@
|
||||
"testing",
|
||||
"xunit"
|
||||
],
|
||||
"time": "2013-03-07 21:45:39"
|
||||
"time": "2013-05-23 18:54:29"
|
||||
},
|
||||
{
|
||||
"name": "phpunit/phpunit-mock-objects",
|
||||
@ -401,17 +442,17 @@
|
||||
},
|
||||
{
|
||||
"name": "symfony/yaml",
|
||||
"version": "v2.2.0",
|
||||
"version": "v2.3.1",
|
||||
"target-dir": "Symfony/Component/Yaml",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/symfony/Yaml.git",
|
||||
"reference": "v2.2.0-RC3"
|
||||
"reference": "v2.3.1"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://api.github.com/repos/symfony/Yaml/zipball/v2.2.0-RC3",
|
||||
"reference": "v2.2.0-RC3",
|
||||
"url": "https://api.github.com/repos/symfony/Yaml/zipball/v2.3.1",
|
||||
"reference": "v2.3.1",
|
||||
"shasum": ""
|
||||
},
|
||||
"require": {
|
||||
@ -420,7 +461,7 @@
|
||||
"type": "library",
|
||||
"extra": {
|
||||
"branch-alias": {
|
||||
"dev-master": "2.2-dev"
|
||||
"dev-master": "2.3-dev"
|
||||
}
|
||||
},
|
||||
"autoload": {
|
||||
@ -444,7 +485,7 @@
|
||||
],
|
||||
"description": "Symfony Yaml Component",
|
||||
"homepage": "http://symfony.com",
|
||||
"time": "2013-01-27 16:49:19"
|
||||
"time": "2013-05-10 18:12:13"
|
||||
}
|
||||
],
|
||||
"aliases": [
|
||||
|
62
lib/Resque/Log.php
Normal file
62
lib/Resque/Log.php
Normal file
@ -0,0 +1,62 @@
|
||||
<?php
|
||||
/**
|
||||
* Resque default logger PSR-3 compliant
|
||||
*
|
||||
* @package Resque/Stat
|
||||
* @author Chris Boulton <chris@bigcommerce.com>
|
||||
* @license http://www.opensource.org/licenses/mit-license.php
|
||||
*/
|
||||
class Resque_Log extends Psr\Log\AbstractLogger
|
||||
{
|
||||
public $verbose;
|
||||
|
||||
public function __construct($verbose = false) {
|
||||
$this->verbose = $verbose;
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs with an arbitrary level.
|
||||
*
|
||||
* @param mixed $level PSR-3 log level constant, or equivalent string
|
||||
* @param string $message Message to log, may contain a { placeholder }
|
||||
* @param array $context Variables to replace { placeholder }
|
||||
* @return null
|
||||
*/
|
||||
public function log($level, $message, array $context = array())
|
||||
{
|
||||
if ($this->verbose) {
|
||||
fwrite(
|
||||
STDOUT,
|
||||
'[' . $level . '] [' . strftime('%T %Y-%m-%d') . '] ' . $this->interpolate($message, $context) . PHP_EOL
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!($level === Psr\Log\LogLevel::INFO || $level === Psr\Log\LogLevel::DEBUG)) {
|
||||
fwrite(
|
||||
STDOUT,
|
||||
'[' . $level . '] ' . $this->interpolate($message, $context) . PHP_EOL
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fill placeholders with the provided context
|
||||
* @author Jordi Boggiano j.boggiano@seld.be
|
||||
*
|
||||
* @param string $message Message to be logged
|
||||
* @param array $context Array of variables to use in message
|
||||
* @return string
|
||||
*/
|
||||
public function interpolate($message, array $context = array())
|
||||
{
|
||||
// build a replacement array with braces around the context keys
|
||||
$replace = array();
|
||||
foreach ($context as $key => $val) {
|
||||
$replace['{' . $key . '}'] = $val;
|
||||
}
|
||||
|
||||
// interpolate replacement values into the message and return
|
||||
return strtr($message, $replace);
|
||||
}
|
||||
}
|
@ -9,14 +9,10 @@
|
||||
*/
|
||||
class Resque_Worker
|
||||
{
|
||||
const LOG_NONE = 0;
|
||||
const LOG_NORMAL = 1;
|
||||
const LOG_VERBOSE = 2;
|
||||
|
||||
/**
|
||||
* @var int Current log level of this worker.
|
||||
*/
|
||||
public $logLevel = 0;
|
||||
* @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
|
||||
*/
|
||||
public $logger;
|
||||
|
||||
/**
|
||||
* @var array Array of all associated queues for this worker.
|
||||
@ -90,7 +86,7 @@ class Resque_Worker
|
||||
*/
|
||||
public static function find($workerId)
|
||||
{
|
||||
if(!self::exists($workerId) || false === strpos($workerId, ":")) {
|
||||
if(!self::exists($workerId) || false === strpos($workerId, ":")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -152,83 +148,83 @@ class Resque_Worker
|
||||
$this->updateProcLine('Starting');
|
||||
$this->startup();
|
||||
|
||||
while(true) {
|
||||
if($this->shutdown) {
|
||||
break;
|
||||
}
|
||||
while(true) {
|
||||
if($this->shutdown) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Attempt to find and reserve a job
|
||||
$job = false;
|
||||
if(!$this->paused) {
|
||||
if($blocking === true) {
|
||||
$this->log('Starting blocking with timeout of ' . $interval, self::LOG_VERBOSE);
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval);
|
||||
} else {
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
|
||||
}
|
||||
// Attempt to find and reserve a job
|
||||
$job = false;
|
||||
if(!$this->paused) {
|
||||
if($blocking === true) {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval);
|
||||
} else {
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
|
||||
}
|
||||
|
||||
$job = $this->reserve($blocking, $interval);
|
||||
}
|
||||
$job = $this->reserve($blocking, $interval);
|
||||
}
|
||||
|
||||
if(!$job) {
|
||||
// For an interval of 0, break now - helps with unit testing etc
|
||||
if($interval == 0) {
|
||||
break;
|
||||
}
|
||||
if(!$job) {
|
||||
// For an interval of 0, break now - helps with unit testing etc
|
||||
if($interval == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if($blocking === false)
|
||||
{
|
||||
// If no job was found, we sleep for $interval before continuing and checking again
|
||||
$this->log('Sleeping for ' . $interval, self::LOG_VERBOSE);
|
||||
if($this->paused) {
|
||||
$this->updateProcLine('Paused');
|
||||
}
|
||||
else {
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
|
||||
}
|
||||
if($blocking === false)
|
||||
{
|
||||
// If no job was found, we sleep for $interval before continuing and checking again
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
|
||||
if($this->paused) {
|
||||
$this->updateProcLine('Paused');
|
||||
}
|
||||
else {
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
|
||||
}
|
||||
|
||||
usleep($interval * 1000000);
|
||||
}
|
||||
usleep($interval * 1000000);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->log('got ' . $job);
|
||||
Resque_Event::trigger('beforeFork', $job);
|
||||
$this->workingOn($job);
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
|
||||
Resque_Event::trigger('beforeFork', $job);
|
||||
$this->workingOn($job);
|
||||
|
||||
$this->child = Resque::fork();
|
||||
$this->child = Resque::fork();
|
||||
|
||||
// Forked and we're the child. Run the job.
|
||||
if ($this->child === 0 || $this->child === false) {
|
||||
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
|
||||
$this->updateProcLine($status);
|
||||
$this->log($status, self::LOG_VERBOSE);
|
||||
$this->perform($job);
|
||||
if ($this->child === 0) {
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
// Forked and we're the child. Run the job.
|
||||
if ($this->child === 0 || $this->child === false) {
|
||||
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
|
||||
$this->updateProcLine($status);
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, $status);
|
||||
$this->perform($job);
|
||||
if ($this->child === 0) {
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
if($this->child > 0) {
|
||||
// Parent process, sit and wait
|
||||
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
|
||||
$this->updateProcLine($status);
|
||||
$this->log($status, self::LOG_VERBOSE);
|
||||
if($this->child > 0) {
|
||||
// Parent process, sit and wait
|
||||
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
|
||||
$this->updateProcLine($status);
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, $status);
|
||||
|
||||
// Wait until the child process finishes before continuing
|
||||
pcntl_wait($status);
|
||||
$exitStatus = pcntl_wexitstatus($status);
|
||||
if($exitStatus !== 0) {
|
||||
$job->fail(new Resque_Job_DirtyExitException(
|
||||
'Job exited with exit code ' . $exitStatus
|
||||
));
|
||||
}
|
||||
}
|
||||
// Wait until the child process finishes before continuing
|
||||
pcntl_wait($status);
|
||||
$exitStatus = pcntl_wexitstatus($status);
|
||||
if($exitStatus !== 0) {
|
||||
$job->fail(new Resque_Job_DirtyExitException(
|
||||
'Job exited with exit code ' . $exitStatus
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
$this->child = null;
|
||||
$this->doneWorking();
|
||||
}
|
||||
$this->child = null;
|
||||
$this->doneWorking();
|
||||
}
|
||||
|
||||
$this->unregisterWorker();
|
||||
}
|
||||
@ -245,46 +241,46 @@ class Resque_Worker
|
||||
$job->perform();
|
||||
}
|
||||
catch(Exception $e) {
|
||||
$this->log($job . ' failed: ' . $e->getMessage());
|
||||
$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage()));
|
||||
$job->fail($e);
|
||||
return;
|
||||
}
|
||||
|
||||
$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
|
||||
$this->log('done ' . $job);
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $blocking
|
||||
* @param int $timeout
|
||||
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
|
||||
*/
|
||||
public function reserve($blocking = false, $timeout = null)
|
||||
{
|
||||
$queues = $this->queues();
|
||||
if(!is_array($queues)) {
|
||||
return;
|
||||
}
|
||||
/**
|
||||
* @param bool $blocking
|
||||
* @param int $timeout
|
||||
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
|
||||
*/
|
||||
public function reserve($blocking = false, $timeout = null)
|
||||
{
|
||||
$queues = $this->queues();
|
||||
if(!is_array($queues)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if($blocking === true) {
|
||||
$job = Resque_Job::reserveBlocking($queues, $timeout);
|
||||
if($job) {
|
||||
$this->log('Found job on ' . $job->queue, self::LOG_VERBOSE);
|
||||
return $job;
|
||||
}
|
||||
} else {
|
||||
foreach($queues as $queue) {
|
||||
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
|
||||
$job = Resque_Job::reserve($queue);
|
||||
if($job) {
|
||||
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
|
||||
return $job;
|
||||
}
|
||||
}
|
||||
}
|
||||
if($blocking === true) {
|
||||
$job = Resque_Job::reserveBlocking($queues, $timeout);
|
||||
if($job) {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
|
||||
return $job;
|
||||
}
|
||||
} else {
|
||||
foreach($queues as $queue) {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
|
||||
$job = Resque_Job::reserve($queue);
|
||||
if($job) {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
|
||||
return $job;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an array containing all of the queues that this worker should use
|
||||
@ -355,7 +351,7 @@ class Resque_Worker
|
||||
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
|
||||
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
|
||||
pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection'));
|
||||
$this->log('Registered signals', self::LOG_VERBOSE);
|
||||
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals');
|
||||
}
|
||||
|
||||
/**
|
||||
@ -363,7 +359,7 @@ class Resque_Worker
|
||||
*/
|
||||
public function pauseProcessing()
|
||||
{
|
||||
$this->log('USR2 received; pausing job processing');
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
|
||||
$this->paused = true;
|
||||
}
|
||||
|
||||
@ -373,7 +369,7 @@ class Resque_Worker
|
||||
*/
|
||||
public function unPauseProcessing()
|
||||
{
|
||||
$this->log('CONT received; resuming job processing');
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
|
||||
$this->paused = false;
|
||||
}
|
||||
|
||||
@ -383,7 +379,7 @@ class Resque_Worker
|
||||
*/
|
||||
public function reestablishRedisConnection()
|
||||
{
|
||||
$this->log('SIGPIPE received; attempting to reconnect');
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'SIGPIPE received; attempting to reconnect');
|
||||
Resque::redis()->establishConnection();
|
||||
}
|
||||
|
||||
@ -394,7 +390,7 @@ class Resque_Worker
|
||||
public function shutdown()
|
||||
{
|
||||
$this->shutdown = true;
|
||||
$this->log('Exiting...');
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down');
|
||||
}
|
||||
|
||||
/**
|
||||
@ -414,18 +410,18 @@ class Resque_Worker
|
||||
public function killChild()
|
||||
{
|
||||
if(!$this->child) {
|
||||
$this->log('No child to kill.', self::LOG_VERBOSE);
|
||||
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.');
|
||||
return;
|
||||
}
|
||||
|
||||
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
|
||||
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
|
||||
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
|
||||
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
|
||||
posix_kill($this->child, SIGKILL);
|
||||
$this->child = null;
|
||||
}
|
||||
else {
|
||||
$this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE);
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
|
||||
$this->shutdown();
|
||||
}
|
||||
}
|
||||
@ -443,14 +439,14 @@ class Resque_Worker
|
||||
$workerPids = $this->workerPids();
|
||||
$workers = self::all();
|
||||
foreach($workers as $worker) {
|
||||
if (is_object($worker)) {
|
||||
list($host, $pid, $queues) = explode(':', (string)$worker, 3);
|
||||
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
|
||||
continue;
|
||||
}
|
||||
$this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE);
|
||||
$worker->unregisterWorker();
|
||||
}
|
||||
if (is_object($worker)) {
|
||||
list($host, $pid, $queues) = explode(':', (string)$worker, 3);
|
||||
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
|
||||
continue;
|
||||
}
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
|
||||
$worker->unregisterWorker();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -536,26 +532,6 @@ class Resque_Worker
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Output a given log message to STDOUT.
|
||||
*
|
||||
* @param string $message Message to output.
|
||||
* @param int $logLevel The logging level to capture
|
||||
*/
|
||||
public function log($message, $logLevel = self::LOG_NORMAL)
|
||||
{
|
||||
if ($logLevel > $this->logLevel) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($this->logLevel == self::LOG_NORMAL) {
|
||||
fwrite(STDOUT, "*** " . $message . "\n");
|
||||
return;
|
||||
}
|
||||
|
||||
fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an object describing the job this worker is currently working on.
|
||||
*
|
||||
@ -582,5 +558,15 @@ class Resque_Worker
|
||||
{
|
||||
return Resque_Stat::get($stat . ':' . $this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inject the logging object into the worker
|
||||
*
|
||||
* @param Psr\Log\LoggerInterface $logger
|
||||
*/
|
||||
public function setLogger(Psr\Log\LoggerInterface $logger)
|
||||
{
|
||||
$this->logger = $logger;
|
||||
}
|
||||
}
|
||||
?>
|
||||
|
@ -16,6 +16,7 @@ class Resque_Tests_EventTest extends Resque_Tests_TestCase
|
||||
|
||||
// Register a worker to test with
|
||||
$this->worker = new Resque_Worker('jobs');
|
||||
$this->worker->setLogger(new Resque_Log());
|
||||
$this->worker->registerWorker();
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase
|
||||
|
||||
// Register a worker to test with
|
||||
$this->worker = new Resque_Worker('jobs');
|
||||
$this->worker->setLogger(new Resque_Log());
|
||||
}
|
||||
|
||||
public function testJobStatusCanBeTracked()
|
||||
|
@ -17,6 +17,7 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase
|
||||
|
||||
// Register a worker to test with
|
||||
$this->worker = new Resque_Worker('jobs');
|
||||
$this->worker->setLogger(new Resque_Log());
|
||||
$this->worker->registerWorker();
|
||||
}
|
||||
|
||||
|
31
test/Resque/Tests/LogTest.php
Normal file
31
test/Resque/Tests/LogTest.php
Normal file
@ -0,0 +1,31 @@
|
||||
<?php
|
||||
/**
|
||||
* Resque_Log tests.
|
||||
*
|
||||
* @package Resque/Tests
|
||||
* @author Chris Boulton <chris@bigcommerce.com>
|
||||
* @license http://www.opensource.org/licenses/mit-license.php
|
||||
*/
|
||||
class Resque_Tests_LogTest extends Resque_Tests_TestCase
|
||||
{
|
||||
public function testLogInterpolate()
|
||||
{
|
||||
$logger = new Resque_Log();
|
||||
$actual = $logger->interpolate('string {replace}', array('replace' => 'value'));
|
||||
$expected = 'string value';
|
||||
|
||||
$this->assertEquals($expected, $actual);
|
||||
}
|
||||
|
||||
public function testLogInterpolateMutiple()
|
||||
{
|
||||
$logger = new Resque_Log();
|
||||
$actual = $logger->interpolate(
|
||||
'string {replace1} {replace2}',
|
||||
array('replace1' => 'value1', 'replace2' => 'value2')
|
||||
);
|
||||
$expected = 'string value1 value2';
|
||||
|
||||
$this->assertEquals($expected, $actual);
|
||||
}
|
||||
}
|
@ -11,6 +11,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testWorkerRegistersInList()
|
||||
{
|
||||
$worker = new Resque_Worker('*');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
|
||||
// Make sure the worker is in the list
|
||||
@ -23,6 +24,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
// Register a few workers
|
||||
for($i = 0; $i < $num; ++$i) {
|
||||
$worker = new Resque_Worker('queue_' . $i);
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
}
|
||||
|
||||
@ -33,6 +35,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testGetWorkerById()
|
||||
{
|
||||
$worker = new Resque_Worker('*');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
|
||||
$newWorker = Resque_Worker::find((string)$worker);
|
||||
@ -47,6 +50,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testWorkerCanUnregister()
|
||||
{
|
||||
$worker = new Resque_Worker('*');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
$worker->unregisterWorker();
|
||||
|
||||
@ -58,6 +62,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testPausedWorkerDoesNotPickUpJobs()
|
||||
{
|
||||
$worker = new Resque_Worker('*');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->pauseProcessing();
|
||||
Resque::enqueue('jobs', 'Test_Job');
|
||||
$worker->work(0);
|
||||
@ -68,6 +73,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testResumedWorkerPicksUpJobs()
|
||||
{
|
||||
$worker = new Resque_Worker('*');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->pauseProcessing();
|
||||
Resque::enqueue('jobs', 'Test_Job');
|
||||
$worker->work(0);
|
||||
@ -83,6 +89,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
'queue1',
|
||||
'queue2'
|
||||
));
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
Resque::enqueue('queue1', 'Test_Job_1');
|
||||
Resque::enqueue('queue2', 'Test_Job_2');
|
||||
@ -101,6 +108,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
'medium',
|
||||
'low'
|
||||
));
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
|
||||
// Queue the jobs in a different order
|
||||
@ -122,6 +130,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testWildcardQueueWorkerWorksAllQueues()
|
||||
{
|
||||
$worker = new Resque_Worker('*');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
|
||||
Resque::enqueue('queue1', 'Test_Job_1');
|
||||
@ -137,6 +146,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testWorkerDoesNotWorkOnUnknownQueues()
|
||||
{
|
||||
$worker = new Resque_Worker('queue1');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
Resque::enqueue('queue2', 'Test_Job');
|
||||
|
||||
@ -147,6 +157,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
{
|
||||
Resque::enqueue('jobs', 'Test_Job');
|
||||
$worker = new Resque_Worker('jobs');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$job = $worker->reserve();
|
||||
$worker->workingOn($job);
|
||||
$worker->doneWorking();
|
||||
@ -156,6 +167,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testWorkerRecordsWhatItIsWorkingOn()
|
||||
{
|
||||
$worker = new Resque_Worker('jobs');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
|
||||
$payload = array(
|
||||
@ -178,6 +190,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
Resque::enqueue('jobs', 'Invalid_Job');
|
||||
|
||||
$worker = new Resque_Worker('jobs');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->work(0);
|
||||
$worker->work(0);
|
||||
|
||||
@ -189,15 +202,18 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
{
|
||||
// Register a good worker
|
||||
$goodWorker = new Resque_Worker('jobs');
|
||||
$goodWorker->setLogger(new Resque_Log());
|
||||
$goodWorker->registerWorker();
|
||||
$workerId = explode(':', $goodWorker);
|
||||
|
||||
// Register some bad workers
|
||||
$worker = new Resque_Worker('jobs');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->setId($workerId[0].':1:jobs');
|
||||
$worker->registerWorker();
|
||||
|
||||
$worker = new Resque_Worker(array('high', 'low'));
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->setId($workerId[0].':2:high,low');
|
||||
$worker->registerWorker();
|
||||
|
||||
@ -213,12 +229,14 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
{
|
||||
// Register a bad worker on this machine
|
||||
$worker = new Resque_Worker('jobs');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$workerId = explode(':', $worker);
|
||||
$worker->setId($workerId[0].':1:jobs');
|
||||
$worker->registerWorker();
|
||||
|
||||
// Register some other false workers
|
||||
$worker = new Resque_Worker('jobs');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->setId('my.other.host:1:jobs');
|
||||
$worker->registerWorker();
|
||||
|
||||
@ -235,6 +253,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testWorkerFailsUncompletedJobsOnExit()
|
||||
{
|
||||
$worker = new Resque_Worker('jobs');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
|
||||
$payload = array(
|
||||
@ -251,6 +270,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
|
||||
public function testBlockingListPop()
|
||||
{
|
||||
$worker = new Resque_Worker('jobs');
|
||||
$worker->setLogger(new Resque_Log());
|
||||
$worker->registerWorker();
|
||||
|
||||
Resque::enqueue('jobs', 'Test_Job_1');
|
||||
|
Loading…
Reference in New Issue
Block a user