mirror of
https://github.com/idanoo/php-resque
synced 2025-07-01 05:32:20 +00:00
Initial commit
This commit is contained in:
commit
cb4205d508
37 changed files with 2808 additions and 0 deletions
564
lib/Resque/Worker.php
Normal file
564
lib/Resque/Worker.php
Normal file
|
@ -0,0 +1,564 @@
|
|||
<?php
|
||||
require_once 'Stat.php';
|
||||
require_once 'Job.php';
|
||||
require_once 'Job/DirtyExitException.php';
|
||||
|
||||
/**
|
||||
* Resque worker that handles checking queues for jobs, fetching them
|
||||
* off the queues, running them and handling the result.
|
||||
*
|
||||
* @package Resque/Worker
|
||||
* @author Chris Boulton <chris.boulton@interspire.com>
|
||||
* @copyright (c) 2010 Chris Boulton
|
||||
* @license http://www.opensource.org/licenses/mit-license.php
|
||||
*/
|
||||
class Resque_Worker
|
||||
{
|
||||
const LOG_NONE = 0;
|
||||
const LOG_NORMAL = 1;
|
||||
const LOG_VERBOSE = 2;
|
||||
|
||||
/**
|
||||
* @var int Current log level of this worker.
|
||||
*/
|
||||
public $logLevel = 0;
|
||||
|
||||
/**
|
||||
* @var array Array of all associated queues for this worker.
|
||||
*/
|
||||
private $queues = array();
|
||||
|
||||
/**
|
||||
* @var string The hostname of this worker.
|
||||
*/
|
||||
private $hostname;
|
||||
|
||||
/**
|
||||
* @var boolean True if on the next iteration, the worker should shutdown.
|
||||
*/
|
||||
private $shutdown = false;
|
||||
|
||||
/**
|
||||
* @var boolean True if this worker is paused.
|
||||
*/
|
||||
private $paused = false;
|
||||
|
||||
/**
|
||||
* @var string String identifying this worker.
|
||||
*/
|
||||
private $id;
|
||||
|
||||
/**
|
||||
* @var Resque_Job Current job, if any, being processed by this worker.
|
||||
*/
|
||||
private $currentJob = null;
|
||||
|
||||
/**
|
||||
* Return all workers known to Resque as instantiated instances.
|
||||
*/
|
||||
public static function all()
|
||||
{
|
||||
$workers = Resque::redis()->smembers('workers');
|
||||
if(!is_array($workers)) {
|
||||
$workers = array();
|
||||
}
|
||||
|
||||
$instances = array();
|
||||
foreach($workers as $workerId) {
|
||||
$instances[] = self::find($workerId);
|
||||
}
|
||||
return $instances;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a worker ID, check if it is registered/valid.
|
||||
*
|
||||
* @param string $workerId ID of the worker.
|
||||
* @return boolean True if the worker exists, false if not.
|
||||
*/
|
||||
public static function exists($workerId)
|
||||
{
|
||||
return (bool)Resque::redis()->sismember('workers', $workerId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a worker ID, find it and return an instantiated worker class for it.
|
||||
*
|
||||
* @param string $workerId The ID of the worker.
|
||||
* @return Resque_Worker Instance of the worker. False if the worker does not exist.
|
||||
*/
|
||||
public static function find($workerId)
|
||||
{
|
||||
if(!self::exists($workerId)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
list($hostname, $pid, $queues) = explode(':', $workerId, 3);
|
||||
$queues = explode(',', $queues);
|
||||
$worker = new self($queues);
|
||||
$worker->setId($workerId);
|
||||
return $worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the ID of this worker to a given ID string.
|
||||
*
|
||||
* @param string $workerId ID for the worker.
|
||||
*/
|
||||
public function setId($workerId)
|
||||
{
|
||||
$this->id = $workerId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiate a new worker, given a list of queues that it should be working
|
||||
* on. The list of queues should be supplied in the priority that they should
|
||||
* be checked for jobs (first come, first served)
|
||||
*
|
||||
* Passing a single '*' allows the worker to work on all queues in alphabetical
|
||||
* order. You can easily add new queues dynamically and have them worked on using
|
||||
* this method.
|
||||
*
|
||||
* @param string|array $queues String with a single queue name, array with multiple.
|
||||
*/
|
||||
public function __construct($queues)
|
||||
{
|
||||
if(!is_array($queues)) {
|
||||
$queues = array($queues);
|
||||
}
|
||||
|
||||
$this->queues = $queues;
|
||||
if(function_exists('gethostname')) {
|
||||
$hostname = gethostname();
|
||||
}
|
||||
else {
|
||||
$hostname = php_uname('n');
|
||||
}
|
||||
$this->hostname = $hostname;
|
||||
$this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);
|
||||
}
|
||||
|
||||
/**
|
||||
* The primary loop for a worker which when called on an instance starts
|
||||
* the worker's life cycle.
|
||||
*
|
||||
* Queues are checked every $interval (seconds) for new jobs.
|
||||
*
|
||||
* @param int $interval How often to check for new jobs across the queues.
|
||||
*/
|
||||
public function work($interval = 5)
|
||||
{
|
||||
$this->updateProcLine('Starting');
|
||||
$this->startup();
|
||||
|
||||
while(true) {
|
||||
if($this->shutdown) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Attempt to find and reserve a job
|
||||
$job = false;
|
||||
if(!$this->paused) {
|
||||
$job = $this->reserve();
|
||||
}
|
||||
|
||||
if(!$job) {
|
||||
// For an interval of 0, break now - helps with unit testing etc
|
||||
if($interval == 0) {
|
||||
break;
|
||||
}
|
||||
// If no job was found, we sleep for $interval before continuing and checking again
|
||||
$this->log('Sleeping for ' . $interval, true);
|
||||
if($this->paused) {
|
||||
$this->updateProcLine('Paused');
|
||||
}
|
||||
else {
|
||||
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
|
||||
}
|
||||
sleep($interval);
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->log('got ' . $job);
|
||||
$this->workingOn($job);
|
||||
|
||||
$this->child = $this->fork();
|
||||
|
||||
// Forked and we're the child. Run the job.
|
||||
if($this->child === 0 || $this->child === false) {
|
||||
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
|
||||
$this->updateProcLine($status);
|
||||
$this->log($status, self::LOG_VERBOSE);
|
||||
$this->perform($job);
|
||||
if($this->child === 0) {
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
if($this->child > 0) {
|
||||
// Parent process, sit and wait
|
||||
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
|
||||
$this->updateProcLine($status);
|
||||
$this->log($status, self::LOG_VERBOSE);
|
||||
|
||||
// Wait until the child process finishes before continuing
|
||||
pcntl_wait($status);
|
||||
$exitStatus = pcntl_wexitstatus($status);
|
||||
if($exitStatus !== 0) {
|
||||
$job->fail(new Resque_Job_DirtyExitException(
|
||||
'Job exited with exit code ' . $exitStatus
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
$this->child = null;
|
||||
$this->doneWorking();
|
||||
}
|
||||
|
||||
$this->unregisterWorker();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single job.
|
||||
*
|
||||
* @param object|null $job The job to be processed.
|
||||
*/
|
||||
public function perform(Resque_Job $job)
|
||||
{
|
||||
try {
|
||||
$job->perform();
|
||||
}
|
||||
catch(Exception $e) {
|
||||
$this->log($job . ' failed: ' . $e->getMessage());
|
||||
$job->fail($e);
|
||||
return;
|
||||
}
|
||||
|
||||
$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
|
||||
$this->log('done ' . $job);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to find a job from the top of one of the queues for this worker.
|
||||
*
|
||||
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
|
||||
*/
|
||||
public function reserve()
|
||||
{
|
||||
$queues = $this->queues();
|
||||
if(!is_array($queues)) {
|
||||
return;
|
||||
}
|
||||
foreach($queues as $queue) {
|
||||
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
|
||||
$job = Resque_Job::reserve($queue);
|
||||
if($job) {
|
||||
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
|
||||
return $job;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an array containing all of the queues that this worker should use
|
||||
* when searching for jobs.
|
||||
*
|
||||
* If * is found in the list of queues, every queue will be searched in
|
||||
* alphabetic order.
|
||||
*
|
||||
* @return array Array of associated queues.
|
||||
*/
|
||||
public function queues()
|
||||
{
|
||||
if(!in_array('*', $this->queues)) {
|
||||
return $this->queues;
|
||||
}
|
||||
|
||||
$queues = Resque::queues();
|
||||
sort($queues);
|
||||
return $queues;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to fork a child process from the parent to run a job in.
|
||||
*
|
||||
* Return values are those of pcntl_fork().
|
||||
*
|
||||
* @return int -1 if the fork failed, 0 for the forked child, the PID of the child for the parent.
|
||||
*/
|
||||
private function fork()
|
||||
{
|
||||
if(!function_exists('pcntl_fork')) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$pid = pcntl_fork();
|
||||
if($pid === -1) {
|
||||
throw new RuntimeException('Unable to fork child worker.');
|
||||
}
|
||||
|
||||
return $pid;
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform necessary actions to start a worker.
|
||||
*/
|
||||
private function startup()
|
||||
{
|
||||
$this->registerSigHandlers();
|
||||
$this->pruneDeadWorkers();
|
||||
$this->registerWorker();
|
||||
}
|
||||
|
||||
/**
|
||||
* On supported systems (with the PECL proctitle module installed), update
|
||||
* the name of the currently running process to indicate the current state
|
||||
* of a worker.
|
||||
*
|
||||
* @param string $status The updated process title.
|
||||
*/
|
||||
private function updateProcLine($status)
|
||||
{
|
||||
if(function_exists('setproctitle')) {
|
||||
setproctitle('resque-' . Resque::VERSION . ': ' . $status);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register signal handlers that a worker should respond to.
|
||||
*
|
||||
* TERM: Shutdown immediately and stop processing jobs.
|
||||
* INT: Shutdown immediately and stop processing jobs.
|
||||
* QUIT: Shutdown after the current job finishes processing.
|
||||
* USR1: Kill the forked child immediately and continue processing jobs.
|
||||
*/
|
||||
private function registerSigHandlers()
|
||||
{
|
||||
if(!function_exists('pcntl_signal')) {
|
||||
return;
|
||||
}
|
||||
|
||||
declare(ticks = 1);
|
||||
pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
|
||||
pcntl_signal(SIGINT, array($this, 'shutDownNow'));
|
||||
pcntl_signal(SIGQUIT, array($this, 'shutdown'));
|
||||
pcntl_signal(SIGUSR1, array($this, 'killChild'));
|
||||
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
|
||||
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
|
||||
$this->log('Registered signals', self::LOG_VERBOSE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal handler callback for USR2, pauses processing of new jobs.
|
||||
*/
|
||||
public function pauseProcessing()
|
||||
{
|
||||
$this->log('USR2 received; pausing job processing');
|
||||
$this->paused = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal handler callback for CONT, resumes worker allowing it to pick
|
||||
* up new jobs.
|
||||
*/
|
||||
public function unPauseProcessing()
|
||||
{
|
||||
$this->log('CONT received; resuming job processing');
|
||||
$this->paused = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule a worker for shutdown. Will finish processing the current job
|
||||
* and when the timeout interval is reached, the worker will shut down.
|
||||
*/
|
||||
public function shutdown()
|
||||
{
|
||||
$this->shutdown = true;
|
||||
$this->log('Exiting...');
|
||||
}
|
||||
|
||||
/**
|
||||
* Force an immediate shutdown of the worker, killing any child jobs
|
||||
* currently running.
|
||||
*/
|
||||
public function shutdownNow()
|
||||
{
|
||||
$this->shutdown();
|
||||
$this->killChild();
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill a forked child job immediately. The job it is processing will not
|
||||
* be completed.
|
||||
*/
|
||||
public function killChild()
|
||||
{
|
||||
if(!$this->child) {
|
||||
$this->log('No child to kill.', self::LOG_VERBOSE);
|
||||
return;
|
||||
}
|
||||
|
||||
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
|
||||
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
|
||||
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
|
||||
posix_kill($this->child, SIGKILL);
|
||||
$this->child = null;
|
||||
}
|
||||
else {
|
||||
$this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE);
|
||||
$this->shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Look for any workers which should be running on this server and if
|
||||
* they're not, remove them from Redis.
|
||||
*
|
||||
* This is a form of garbage collection to handle cases where the
|
||||
* server may have been killed and the Resque workers did not die gracefully
|
||||
* and therefore leave state information in Redis.
|
||||
*/
|
||||
public function pruneDeadWorkers()
|
||||
{
|
||||
$workerPids = $this->workerPids();
|
||||
if(empty($workerPids)) {
|
||||
continue;
|
||||
}
|
||||
$workers = self::all();
|
||||
foreach($workers as $worker) {
|
||||
list($host, $pid, $queues) = explode(':', (string)$worker, 3);
|
||||
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
|
||||
continue;
|
||||
}
|
||||
$this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE);
|
||||
$worker->unregisterWorker();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an array of process IDs for all of the Resque workers currently
|
||||
* running on this machine.
|
||||
*
|
||||
* @return array Array of Resque worker process IDs.
|
||||
*/
|
||||
public function workerPids()
|
||||
{
|
||||
$pids = array();
|
||||
exec('ps -A -o pid,command | grep [r]esque', $cmdOutput);
|
||||
foreach($cmdOutput as $line) {
|
||||
list($pids[],) = explode(' ', $line, 2);
|
||||
}
|
||||
return $pids;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register this worker in Redis.
|
||||
*/
|
||||
public function registerWorker()
|
||||
{
|
||||
Resque::redis()->sadd('workers', $this);
|
||||
Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y'));
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister this worker in Redis. (shutdown etc)
|
||||
*/
|
||||
public function unregisterWorker()
|
||||
{
|
||||
if(is_object($this->currentJob)) {
|
||||
$this->currentJob->fail(new Resque_Job_DirtyExitException);
|
||||
}
|
||||
|
||||
$id = (string)$this;
|
||||
Resque::redis()->srem('workers', $id);
|
||||
Resque::redis()->del('worker:' . $id);
|
||||
Resque::redis()->del('worker:' . $id . ':started');
|
||||
Resque_Stat::clear('processed:' . $id);
|
||||
Resque_Stat::clear('failed:' . $id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tell Redis which job we're currently working on.
|
||||
*
|
||||
* @param object $job Resque_Job instance containing the job we're working on.
|
||||
*/
|
||||
public function workingOn(Resque_Job $job)
|
||||
{
|
||||
$job->worker = $this;
|
||||
$this->currentJob = $job;
|
||||
$job->updateStatus(Resque_Job_Status::STATUS_RUNNING);
|
||||
$data = json_encode(array(
|
||||
'queue' => $job->queue,
|
||||
'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'),
|
||||
'payload' => $job->payload
|
||||
));
|
||||
Resque::redis()->set('worker:' . $job->worker, $data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify Redis that we've finished working on a job, clearing the working
|
||||
* state and incrementing the job stats.
|
||||
*/
|
||||
public function doneWorking()
|
||||
{
|
||||
$this->currentJob = null;
|
||||
Resque_Stat::incr('processed');
|
||||
Resque_Stat::incr('processed:' . (string)$this);
|
||||
Resque::redis()->del('worker:' . (string)$this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a string representation of this worker.
|
||||
*
|
||||
* @return string String identifier for this worker instance.
|
||||
*/
|
||||
public function __toString()
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Output a given log message to STDOUT.
|
||||
*
|
||||
* @param string $message Message to output.
|
||||
*/
|
||||
public function log($message)
|
||||
{
|
||||
if($this->logLevel == self::LOG_NORMAL) {
|
||||
fwrite(STDOUT, "*** " . $message . "\n");
|
||||
}
|
||||
else if($this->logLevel == self::LOG_VERBOSE) {
|
||||
fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an object describing the job this worker is currently working on.
|
||||
*
|
||||
* @return object Object with details of current job.
|
||||
*/
|
||||
public function job()
|
||||
{
|
||||
$job = Resque::redis()->get('worker:' . $this);
|
||||
if(!$job) {
|
||||
return new stdClass;
|
||||
}
|
||||
else {
|
||||
return json_decode($job);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a statistic belonging to this worker.
|
||||
*
|
||||
* @param string $stat Statistic to fetch.
|
||||
* @return int Statistic value.
|
||||
*/
|
||||
public function getStat($stat)
|
||||
{
|
||||
return Resque_Stat::get($stat . ':' . $this);
|
||||
}
|
||||
}
|
||||
?>
|
Loading…
Add table
Add a link
Reference in a new issue