mirror of
https://github.com/idanoo/php-resque
synced 2025-07-01 13:42:22 +00:00
- Updated travis builds to run on PHP 7.0, 7.1 and 7.2.
- Added ability to specify multiple log levels. [DEBUG/INFO/NOTICE/WARNING/ERROR/CRITICAL/ALERT/EMERGENCY] - Default is now . - Removed VERBOSE / VVERBOSE flags. - Enabled date/time logging by default.
This commit is contained in:
parent
177f8e3c19
commit
0d0c0d0a7e
8 changed files with 68 additions and 68 deletions
|
@ -21,7 +21,7 @@ class Resque_Worker
|
|||
/**
|
||||
* @var array Array of all associated queues for this worker.
|
||||
*/
|
||||
private $queues = array();
|
||||
private $queues = [];
|
||||
|
||||
/**
|
||||
* @var string The hostname of this worker.
|
||||
|
@ -69,7 +69,7 @@ class Resque_Worker
|
|||
$this->logger = new Resque_Log();
|
||||
|
||||
if (!is_array($queues)) {
|
||||
$queues = array($queues);
|
||||
$queues = [$queues];
|
||||
}
|
||||
|
||||
$this->queues = $queues;
|
||||
|
@ -86,10 +86,10 @@ class Resque_Worker
|
|||
{
|
||||
$workers = Resque::redis()->smembers('workers');
|
||||
if (!is_array($workers)) {
|
||||
$workers = array();
|
||||
$workers = [];
|
||||
}
|
||||
|
||||
$instances = array();
|
||||
$instances = [];
|
||||
foreach ($workers as $workerId) {
|
||||
$instances[] = self::find($workerId);
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ class Resque_Worker
|
|||
$job = false;
|
||||
if (!$this->paused) {
|
||||
if ($blocking === true) {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', ['interval' => $interval]);
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval);
|
||||
} else {
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
|
||||
|
@ -175,7 +175,7 @@ class Resque_Worker
|
|||
|
||||
if ($blocking === false) {
|
||||
// If no job was found, we sleep for $interval before continuing and checking again
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', ['interval' => $interval]);
|
||||
if ($this->paused) {
|
||||
$this->updateProcLine('Paused');
|
||||
} else {
|
||||
|
@ -188,7 +188,7 @@ class Resque_Worker
|
|||
continue;
|
||||
}
|
||||
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', ['job' => $job]);
|
||||
Resque_Event::trigger('beforeFork', $job);
|
||||
$this->workingOn($job);
|
||||
|
||||
|
@ -239,13 +239,13 @@ class Resque_Worker
|
|||
Resque_Event::trigger('afterFork', $job);
|
||||
$job->perform();
|
||||
} catch (Exception $e) {
|
||||
$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e));
|
||||
$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', ['job' => $job, 'stack' => $e]);
|
||||
$job->fail($e);
|
||||
return;
|
||||
}
|
||||
|
||||
$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
|
||||
$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', ['job' => $job]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -257,21 +257,21 @@ class Resque_Worker
|
|||
{
|
||||
$queues = $this->queues();
|
||||
if (!is_array($queues)) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if ($blocking === true) {
|
||||
$job = Resque_Job::reserveBlocking($queues, $timeout);
|
||||
if ($job) {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', ['queue' => $job->queue]);
|
||||
return $job;
|
||||
}
|
||||
} else {
|
||||
foreach ($queues as $queue) {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', ['queue' => $queue]);
|
||||
$job = Resque_Job::reserve($queue);
|
||||
if ($job) {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', ['queue' => $job->queue]);
|
||||
return $job;
|
||||
}
|
||||
}
|
||||
|
@ -344,12 +344,12 @@ class Resque_Worker
|
|||
return;
|
||||
}
|
||||
|
||||
pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
|
||||
pcntl_signal(SIGINT, array($this, 'shutDownNow'));
|
||||
pcntl_signal(SIGQUIT, array($this, 'shutdown'));
|
||||
pcntl_signal(SIGUSR1, array($this, 'killChild'));
|
||||
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
|
||||
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
|
||||
pcntl_signal(SIGTERM, [$this, 'shutDownNow']);
|
||||
pcntl_signal(SIGINT, [$this, 'shutDownNow']);
|
||||
pcntl_signal(SIGQUIT, [$this, 'shutdown']);
|
||||
pcntl_signal(SIGUSR1, [$this, 'killChild']);
|
||||
pcntl_signal(SIGUSR2, [$this, 'pauseProcessing']);
|
||||
pcntl_signal(SIGCONT, [$this, 'unPauseProcessing']);
|
||||
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals');
|
||||
}
|
||||
|
||||
|
@ -403,13 +403,13 @@ class Resque_Worker
|
|||
return;
|
||||
}
|
||||
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', ['child' => $this->child]);
|
||||
if (exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
|
||||
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
|
||||
$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', ['child' => $this->child]);
|
||||
posix_kill($this->child, SIGKILL);
|
||||
$this->child = null;
|
||||
} else {
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', ['child' => $this->child]);
|
||||
$this->shutdown();
|
||||
}
|
||||
}
|
||||
|
@ -432,7 +432,7 @@ class Resque_Worker
|
|||
if ($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
|
||||
continue;
|
||||
}
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
|
||||
$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', ['worker' => (string)$worker]);
|
||||
$worker->unregisterWorker();
|
||||
}
|
||||
}
|
||||
|
@ -446,7 +446,7 @@ class Resque_Worker
|
|||
*/
|
||||
public function workerPids()
|
||||
{
|
||||
$pids = array();
|
||||
$pids = [];
|
||||
exec('ps -A -o pid,command | grep [r]esque', $cmdOutput);
|
||||
foreach ($cmdOutput as $line) {
|
||||
list($pids[],) = explode(' ', trim($line), 2);
|
||||
|
@ -490,11 +490,11 @@ class Resque_Worker
|
|||
$job->worker = $this;
|
||||
$this->currentJob = $job;
|
||||
$job->updateStatus(Resque_Job_Status::STATUS_RUNNING);
|
||||
$data = json_encode(array(
|
||||
$data = json_encode([
|
||||
'queue' => $job->queue,
|
||||
'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'),
|
||||
'payload' => $job->payload
|
||||
));
|
||||
]);
|
||||
Resque::redis()->set('worker:' . $job->worker, $data);
|
||||
}
|
||||
|
||||
|
@ -523,13 +523,13 @@ class Resque_Worker
|
|||
/**
|
||||
* Return an object describing the job this worker is currently working on.
|
||||
*
|
||||
* @return object Object with details of current job.
|
||||
* @return array Array with details of current job.
|
||||
*/
|
||||
public function job()
|
||||
{
|
||||
$job = Resque::redis()->get('worker:' . $this);
|
||||
if (!$job) {
|
||||
return array();
|
||||
return [];
|
||||
} else {
|
||||
return json_decode($job, true);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue