This commit is contained in:
Ruud Kamphuis 2013-03-12 11:18:37 +01:00
parent ff0d2bc655
commit 132531e1a2
7 changed files with 150 additions and 102 deletions

View File

@ -1,4 +1,3 @@
#!/usr/bin/env php
<?php <?php
// Find and initialize Composer // Find and initialize Composer

View File

@ -3,8 +3,9 @@ class PHP_Job
{ {
public function perform() public function perform()
{ {
sleep(120); fwrite(STDOUT, 'Start job! -> ');
fwrite(STDOUT, 'Hello!'); sleep(1);
fwrite(STDOUT, 'Job ended!' . PHP_EOL);
} }
} }
?> ?>

View File

@ -14,6 +14,6 @@ $args = array(
), ),
); );
$jobId = Resque::enqueue('default', $argv[1], $args, true); $jobId = Resque::enqueue($argv[1], $argv[2], $args, true);
echo "Queued job ".$jobId."\n\n"; echo "Queued job ".$jobId."\n\n";
?> ?>

View File

@ -110,21 +110,40 @@ class Resque
* @param string $queue The name of the queue to fetch an item from. * @param string $queue The name of the queue to fetch an item from.
* @return array Decoded item from the queue. * @return array Decoded item from the queue.
*/ */
public static function pop($queue, $interval = null) public static function pop($queue)
{ {
if($interval == null) { $item = self::redis()->lpop('queue:' . $queue);
$item = self::redis()->lpop('queue:' . $queue);
} else {
$item = self::redis()->blpop('queue:' . $queue, $interval ? $interval : Resque::DEFAULT_INTERVAL);
}
if(!$item) { if(!$item) {
return; return;
} }
return json_decode($interval == 0 ? $item : $item[1], true); return json_decode($item, true);
} }
/**
* Pop an item off the end of the specified queue, decode it and
* return it.
*
* @param string $queue The name of the queue to fetch an item from.
* @return array Decoded item from the queue.
*/
public static function blpop($queues, $interval = null)
{
$list = array();
foreach($queues AS $queue) {
$list[] = 'queue:' . $queue;
}
$item = self::redis()->blpop($list, $interval ? (int)$interval : Resque::DEFAULT_INTERVAL);
if(!$item) {
return;
}
return json_decode($item[1], true);
}
/** /**
* Return the size (number of pending jobs) of the specified queue. * Return the size (number of pending jobs) of the specified queue.
* *

View File

@ -71,22 +71,41 @@ class Resque_Job
return $id; return $id;
} }
/** /**
* Find the next available job from the specified queue and return an * Find the next available job from the specified queue and return an
* instance of Resque_Job for it. * instance of Resque_Job for it.
* *
* @param string $queue The name of the queue to check for a job in. * @param string $queue The name of the queue to check for a job in.
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/ */
public static function reserve($queue, $interval = null) public static function reserve($queue)
{ {
$payload = Resque::pop($queue, $interval); $payload = Resque::pop($queue);
if(!is_array($payload)) { if(!is_array($payload)) {
return false; return false;
} }
return new Resque_Job($queue, $payload); return new Resque_Job($queue, $payload);
} }
/**
* Find the next available job from the specified queue and return an
* instance of Resque_Job for it.
*
* @param string $queue The name of the queue to check for a job in.
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/
public static function reserveBlocking($queues, $interval = null)
{
$payload = Resque::blpop($queues, $interval);
if(!is_array($payload)) {
return false;
}
var_dump($payload);
return new Resque_Job($payload->queue, $payload);
}
/** /**
* Update the status of the current job. * Update the status of the current job.

View File

@ -143,7 +143,13 @@ class Resque_Redis
*/ */
public function __call($name, $args) { public function __call($name, $args) {
if(in_array($name, $this->keyCommands)) { if(in_array($name, $this->keyCommands)) {
$args[0] = self::$defaultNamespace . $args[0]; if(is_array($args[0])) {
foreach($args[0] AS $i => $v) {
$args[0][$i] = self::$defaultNamespace . $v;
}
} else {
$args[0] = self::$defaultNamespace . $args[0];
}
} }
try { try {
return $this->driver->__call($name, $args); return $this->driver->__call($name, $args);

View File

@ -152,70 +152,53 @@ class Resque_Worker
$this->updateProcLine('Starting'); $this->updateProcLine('Starting');
$this->startup(); $this->startup();
while(true) { while($job = $this->reserveBlocking($interval)) {
if($this->shutdown) {
break;
}
// Attempt to find and reserve a job
$job = false;
if(!$this->paused) {
$job = $this->reserve($interval);
}
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
if(!$job) { $this->log('got ' . $job);
// For an interval of 0, break now - helps with unit testing etc Resque_Event::trigger('beforeFork', $job);
if($interval == 0) { $this->workingOn($job);
break;
}
// If no job was found, we sleep for $interval before continuing and checking again $this->child = Resque::fork();
if($this->paused) {
$this->updateProcLine('Paused');
usleep($interval * 1000000); //it's paused, so don't hog redis with requests.
}
continue; // Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->perform($job);
if ($this->child === 0) {
exit(0);
}
} }
$this->log('got ' . $job); if($this->child > 0) {
Resque_Event::trigger('beforeFork', $job); // Parent process, sit and wait
$this->workingOn($job); $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->child = Resque::fork(); // Wait until the child process finishes before continuing
pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
}
}
// Forked and we're the child. Run the job. $this->child = null;
if ($this->child === 0 || $this->child === false) { $this->doneWorking();
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->perform($job);
if ($this->child === 0) {
exit(0);
}
}
if($this->child > 0) { if($this->shutdown) {
// Parent process, sit and wait break;
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); }
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
// Wait until the child process finishes before continuing if($this->paused) {
pcntl_wait($status); break;
$exitStatus = pcntl_wexitstatus($status); }
if($exitStatus !== 0) { }
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
}
}
$this->child = null;
$this->doneWorking();
}
$this->unregisterWorker(); $this->unregisterWorker();
} }
@ -241,28 +224,49 @@ class Resque_Worker
$this->log('done ' . $job); $this->log('done ' . $job);
} }
/** /**
* Attempt to find a job from the top of one of the queues for this worker. * Attempt to find a job from the top of one of the queues for this worker.
* *
* @return object|boolean Instance of Resque_Job if a job is found, false if not. * @return object|boolean Instance of Resque_Job if a job is found, false if not.
*/ */
public function reserve($interval = null) public function reserve()
{ {
$queues = $this->queues(); $queues = $this->queues();
if(!is_array($queues)) { if(!is_array($queues)) {
return; return;
} }
foreach($queues as $queue) { foreach($queues as $queue) {
$this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE); $this->log('Checking ' . $queue, self::LOG_VERBOSE);
$job = Resque_Job::reserve($queue, $interval); $job = Resque_Job::reserve($queue);
if($job) { if($job) {
$this->log('Found job on ' . $queue, self::LOG_VERBOSE); $this->log('Found job on ' . $queue, self::LOG_VERBOSE);
return $job; return $job;
} }
} }
return false; return false;
} }
/**
* Attempt to find a job from the top of one of the queues for this worker.
*
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
*/
public function reserveBlocking($interval = null)
{
$queues = $this->queues();
if(!is_array($queues)) {
return;
}
$job = Resque_Job::reserveBlocking($queues, $interval);
if($job) {
$this->log('Found job on ' . $job->queue, self::LOG_VERBOSE);
return $job;
}
return false;
}
/** /**
* Return an array containing all of the queues that this worker should use * Return an array containing all of the queues that this worker should use