Merge branch 'master' of github.com:chrisboulton/php-resque

This commit is contained in:
chris.boulton 2010-10-02 09:33:08 +10:00
commit 2c26cf8d58
38 changed files with 3026 additions and 0 deletions

18
CHANGELOG.markdown Normal file
View File

@ -0,0 +1,18 @@
## 1.1 (????-??-??) ##
* Change arguments for jobs to be an array as they're easier to work with in
PHP.
* Implement ability to have setUp and tearDown methods for jobs, called before
and after every single run.
* Ability to specify a cluster/multiple redis servers and consistent hash
between them (Thanks dceballos)
* Fix `APP_INCLUDE` environment variable not loading correctly.
* Jobs are no longer defined as static methods, and classes are instantiated
first. This change is NOT backwards compatible and requires job classes are
updated.
* Job arguments are passed to the job class when it is instantiated, and
are accessible by $this->args. This change will break existing job classes
that rely on arguments that have not been updated.
## 1.0 (2010-04-18) ##
* Initial release

20
LICENSE Normal file
View File

@ -0,0 +1,20 @@
(c) 2010 Chris Boulton <chris.boulton@interspire.com>
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

226
README.markdown Normal file
View File

@ -0,0 +1,226 @@
php-resque: PHP Resque Worker (and Enqueue)
===========================================
Resque is a Redis-backed library for creating background jobs, placing
those jobs on multiple queues, and processing them later.
Resque was pioneered and is developed by the fine folks at GitHub (yes,
I am a kiss-ass), and written in Ruby.
What you're seeing here is an almost direct port of the Resque worker
and enqueue system to PHP, which I've thrown together because I'm sure
my PHP developers would have a fit if they had to write a line of Ruby.
For more information on Resque, visit the official GitHub project:
<http://github.com/defunkt/resque/>
And for background information, the launch post on the GitHub blog:
<http://github.com/blog/542-introducing-resque>
The PHP port does NOT include its own web interface for viewing queue
stats, as the data is stored in the exact same expected format as the
Ruby version of Resque.
The PHP port allows for much the same as the Ruby version of Rescue:
* Workers can be distributed between multiple machines
* Includes support for priorities (queues)
* Resilient to memory leaks (fork)
* Expects failure
In addition, it also:
* Has the ability to track the status of jobs
* Will mark a job as failed, if a forked child running a job does
not exit with a status code as 0
* Has built in support for `setUp` and `tearDown` methods, called
pre and post jobs
## Jobs ##
### Queueing Jobs ###
Jobs are queued as follows:
require_once 'lib/Resque.php';
// Required if redis is located elsewhere
Resque::setBackend('localhost', 6379);
$args = array(
'name' => 'Chris'
);
Resque::enqueue('default', 'My_Job', $args);
### Defining Jobs ###
Each job should be in it's own class, and include a `perform` method.
class My_Job
{
public function perform()
{
// Work work work
echo $this->args['name'];
}
}
When the job is run, the class will be instantiated and any arguments
will be set as an array on the instantiated object, and are accessible
via `$this->args`.
Any exception thrown by a job will result in the job failing - be
careful here and make sure you handle the exceptions that shouldn't
result in a job failing.
Jobs can also have `setUp` and `tearDown` methods. If a `setUp` method
is defined, it will be called before the `perform` method is run.
The `tearDown` method if defined, will be called after the job finishes.
class My_Job
{
public function setUp()
{
// ... Set up environment for this job
}
public function perform()
{
// .. Run job
}
public function tearDown()
{
// ... Remove environment for this job
}
}
### Tracking Job Statuses ###
php-resque has the ability to perform basic status tracking of a queued
job. The status information will allow you to check if a job is in the
queue, currently being run, has finished, or failed.
To track the status of a job, pass `true` as the fourth argument to
`Resque::enqueue`. A token used for tracking the job status will be
returned:
$token = Resque::enqueue('default', 'My_Job', $args, true);
echo $token;
To fetch the status of a job:
$status = new Resque_Job_Status($token);
echo $status->get(); // Outputs the status
Job statuses are defined as constants in the `Resque_Job_Status` class.
Valid statuses include:
* `Resque_Job_Status::STATUS_WAITING` - Job is still queued
* `Resque_Job_Status::STATUS_RUNNING` - Job is currently running
* `Resque_Job_Status::STATUS_FAILED` - Job has failed
* `Resque_Job_Status::STATUS_COMPLETE` - Job is complete
* `false` - Failed to fetch the status - is the token valid?
Statuses are available for up to 24 hours after a job has completed
or failed, and are then automatically expired. A status can also
forcefully be expired by calling the `stop()` method on a status
class.
## Workers ##
Workers work in the exact same way as the Ruby workers. For complete
documentation on workers, see the original documentation.
A basic "up-and-running" resque.php file is included that sets up a
running worker environment is included in the root directory.
The exception to the similarities with the Ruby version of resque is
how a worker is initially setup. To work under all environments,
not having a single environment such as with Ruby, the PHP port makes
*no* assumptions about your setup.
To start a worker, it's very similar to the Ruby version:
$ QUEUE=file_serve php resque.php
It's your responsibility to tell the worker which file to include to get
your application underway. You do so by setting the `APP_INCLUDE` environment
variable:
$ QUEUE=file_serve APP_INCLUDE=../application/init.php php resque.php
Getting your application underway also includes telling the worker your job
classes, by means of either an autoloader or including them.
### Logging ###
The port supports the same environment variables for logging to STDOUT.
Setting `VERBOSE` will print basic debugging information and `VVERBOSE`
will print detailed information.
$ VERBOSE QUEUE=file_serve php resque.php
$ VVERBOSE QUEUE=file_serve php resque.php
### Priorities and Queue Lists ###
Similarly, priority and queue list functionality works exactly
the same as the Ruby workers. Multiple queues should be separated with
a comma, and the order that they're supplied in is the order that they're
checked in.
As per the original example:
$ QUEUES=file_serve,warm_cache php resque.php
The `file_serve` queue will always be checked for new jobs on each
iteration before the `warm_cache` queue is checked.
### Running All Queues ###
All queues are supported in the same manner and processed in alphabetical
order:
$ QUEUES=* php resque.php
### Running Multiple Workers ###
Multiple workers ca be launched and automatically worked by supplying
the `COUNT` environment variable:
$ COUNT=5 php resque.php
### Forking ###
Similarly to the Ruby versions, supported platforms will immediately
fork after picking up a job. The forked child will exit as soon as
the job finishes.
The difference with php-resque is that if a forked child does not
exit nicely (PHP error or such), php-resque will automatically fail
the job.
### Signals ###
Signals also work on supported platforms exactly as in the Ruby
version of Resque:
* `QUIT` - Wait for child to finish processing then exit
* `TERM` / `INT` - Immediately kill child then exit
* `USR1` - Immediately kill child but don't exit
* `USR2` - Pause worker, no new jobs will be processed
* `CONT` - Resume worker.
### Process Titles/Statuses ###
The Ruby version of Resque has a nifty feature whereby the process
title of the worker is updated to indicate what the worker is doing,
and any forked children also set their process title with the job
being run. This helps identify running processes on the server and
their resque status.
**PHP does not have this functionality by default.**
A PECL module (<http://pecl.php.net/package/proctitle>) exists that
adds this funcitonality to PHP, so if you'd like process titles updated,
install the PECL module as well. php-resque will detect and use it.

10
TODO.markdown Normal file
View File

@ -0,0 +1,10 @@
* Write tests for:
* `Resque_Failure`
* `Resque_Failure_Redis`
* Plugin/hook type system similar to Ruby version (when done, implement the
setUp and tearDown methods as a plugin)
* Change to preforking worker model
* Clean up /bin and /demo
* Add a way to store arbitrary text in job statuses (for things like progress
indicators)
* Write plugin for Ruby resque that calls setUp and tearDown methods

1
bin/resque Normal file
View File

@ -0,0 +1 @@
#!/bin/sh

17
build.xml Normal file
View File

@ -0,0 +1,17 @@
<project name="php-resque" default="build">
<target name="clean">
<delete dir="${basedir}/build" />
</target>
<target name="prepare">
<mkdir dir="${basedir}/build" />
<mkdir dir="${basedir}/build/logs" />
</target>
<target name="phpunit">
<exec dir="${basedir}" executable="phpunit" failonerror="true">
<arg line="--log-junit ${basedir}/build/logs/phpunit.xml
--coverage-clover ${basedir}/build/logs/clover.xml
--coverage-html ${basedir}/build/coverage" />
</exec>
</target>
<target name="build" depends="clean,prepare,phpunit" />
</project>

9
demo/bad_job.php Normal file
View File

@ -0,0 +1,9 @@
<?php
class Bad_PHP_Job
{
public function perform()
{
throw new Exception('Unable to run this job!');
}
}
?>

20
demo/check_status.php Normal file
View File

@ -0,0 +1,20 @@
<?php
if(empty($argv[1])) {
die('Specify the ID of a job to monitor the status of.');
}
require '../lib/resque.php';
date_default_timezone_set('GMT');
Resque::setBackend('127.0.0.1:6379');
$status = new Resque_Job_Status($argv[1]);
if(!$status->isTracking()) {
die("Resque is not tracking the status of this job.\n");
}
echo "Tracking status of ".$argv[1].". Press [break] to stop.\n\n";
while(true) {
fwrite(STDOUT, "Status of ".$argv[1]." is: ".$status->get()."\n");
sleep(1);
}
?>

10
demo/job.php Normal file
View File

@ -0,0 +1,10 @@
<?php
class PHP_Job
{
public function perform()
{
sleep(120);
fwrite(STDOUT, 'Hello!');
}
}
?>

9
demo/long_job.php Normal file
View File

@ -0,0 +1,9 @@
<?php
class Long_PHP_Job
{
public function perform()
{
sleep(600);
}
}
?>

9
demo/php_error_job.php Normal file
View File

@ -0,0 +1,9 @@
<?php
class PHP_Error_Job
{
public function perform()
{
callToUndefinedFunction();
}
}
?>

19
demo/queue.php Normal file
View File

@ -0,0 +1,19 @@
<?php
if(empty($argv[1])) {
die('Specify the name of a job to add. e.g, php queue.php PHP_Job');
}
require '../lib/Resque.php';
date_default_timezone_set('GMT');
Resque::setBackend('127.0.0.1:6379');
$args = array(
'time' => time(),
'array' => array(
'test' => 'test',
),
);
$jobId = Resque::enqueue('default', $argv[1], $args, true);
echo "Queued job ".$jobId."\n\n";
?>

8
demo/resque.php Normal file
View File

@ -0,0 +1,8 @@
<?php
date_default_timezone_set('GMT');
require 'bad_job.php';
require 'job.php';
require 'php_error_job.php';
require '../resque.php';
?>

22
lib/Redisent/LICENSE Normal file
View File

@ -0,0 +1,22 @@
Copyright (c) 2009 Justin Poliey <jdp34@njit.edu>
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,67 @@
# Redisent
Redisent is a simple, no-nonsense interface to the [Redis](http://code.google.com/p/redis/) key-value store for modest developers.
Due to the way it is implemented, it is flexible and tolerant of changes to the Redis protocol.
## Getting to work
If you're at all familiar with the Redis protocol and PHP objects, you've already mastered Redisent.
All Redisent does is map the Redis protocol to a PHP object, abstract away the nitty-gritty, and make the return values PHP compatible.
require 'redisent.php';
$redis = new Redisent('localhost');
$redis->set('awesome', 'absolutely');
echo sprintf('Is Redisent awesome? %s.\n', $redis->get('awesome'));
You use the exact same command names, and the exact same argument order. **How wonderful.** How about a more complex example?
require 'redisent.php';
$redis = new Redisent('localhost');
$redis->rpush('particles', 'proton');
$redis->rpush('particles', 'electron');
$redis->rpush('particles', 'neutron');
$particles = $redis->lrange('particles', 0, -1);
$particle_count = $redis->llen('particles');
echo "<p>The {$particle_count} particles that make up atoms are:</p>";
echo "<ul>";
foreach ($particles as $particle) {
echo "<li>{$particle}</li>";
}
echo "</ul>";
Be aware that Redis error responses will be wrapped in a RedisException class and thrown, so do be sure to use proper coding techniques.
## Clustering your servers
Redisent also includes a way for developers to fully utilize the scalability of Redis with multiple servers and [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing).
Using the RedisentCluster class, you can use Redisent the same way, except that keys will be hashed across multiple servers.
Here is how to set up a cluster:
include 'redisent_cluster.php';
$cluster = new RedisentCluster(array(
array('host' => '127.0.0.1', 'port' => 6379),
array('host' => '127.0.0.1', 'port' => 6380)
));
You can then use Redisent the way you normally would, i.e., `$cluster->set('key', 'value')` or `$cluster->lrange('particles', 0, -1)`.
But what about when you need to use commands that are server specific and do not operate on keys? You can use routing, with the `RedisentCluster::to` method.
To use routing, you need to assign a server an alias in the constructor of the Redis cluster. Aliases are not required on all servers, just the ones you want to be able to access directly.
include 'redisent_cluster.php';
$cluster = new RedisentCluster(array(
'alpha' => array('host' => '127.0.0.1', 'port' => 6379),
array('host' => '127.0.0.1', 'port' => 6380)
));
Now there is an alias of the server running on 127.0.0.1:6379 called **alpha**, and can be interacted with like this:
// get server info
$cluster->to('alpha')->info();
Now you have complete programatic control over your Redis servers.
## About
&copy; 2009 [Justin Poliey](http://justinpoliey.com)

137
lib/Redisent/Redisent.php Normal file
View File

@ -0,0 +1,137 @@
<?php
/**
* Redisent, a Redis interface for the modest
* @author Justin Poliey <jdp34@njit.edu>
* @copyright 2009 Justin Poliey <jdp34@njit.edu>
* @license http://www.opensource.org/licenses/mit-license.php The MIT License
* @package Redisent
*/
define('CRLF', sprintf('%s%s', chr(13), chr(10)));
/**
* Wraps native Redis errors in friendlier PHP exceptions
*/
class RedisException extends Exception {
}
/**
* Redisent, a Redis interface for the modest among us
*/
class Redisent {
/**
* Socket connection to the Redis server
* @var resource
* @access private
*/
private $__sock;
/**
* Redis bulk commands, they are sent in a slightly different format to the server
* @var array
* @access private
*/
private $bulk_cmds = array(
'SET', 'GETSET', 'SETNX', 'ECHO',
'RPUSH', 'LPUSH', 'LSET', 'LREM',
'SADD', 'SREM', 'SMOVE', 'SISMEMBER'
);
/**
* Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
* @param string $host The hostname of the Redis server
* @param integer $port The port number of the Redis server
*/
function __construct($host, $port = 6379) {
$this->__sock = fsockopen($host, $port, $errno, $errstr);
if (!$this->__sock) {
throw new Exception("{$errno} - {$errstr}");
}
}
function __destruct() {
fclose($this->__sock);
}
function __call($name, $args) {
/* Build the Redis protocol command */
$name = strtoupper($name);
if (in_array($name, $this->bulk_cmds)) {
$value = array_pop($args);
$command = sprintf("%s %s %d%s%s%s", $name, trim(implode(' ', $args)), strlen($value), CRLF, $value, CRLF);
}
else {
$command = sprintf("%s %s%s", $name, trim(implode(' ', $args)), CRLF);
}
/* Open a Redis connection and execute the command */
fwrite($this->__sock, $command);
/* Parse the response based on the reply identifier */
$reply = trim(fgets($this->__sock, 512));
switch (substr($reply, 0, 1)) {
/* Error reply */
case '-':
echo $command."\n";
throw new RedisException(substr(trim($reply), 4));
break;
/* Inline reply */
case '+':
$response = substr(trim($reply), 1);
break;
/* Bulk reply */
case '$':
if ($reply == '$-1') {
$response = null;
break;
}
$read = 0;
$size = substr($reply, 1);
do {
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
$response = fread($this->__sock, $block_size);
$read += $block_size;
} while ($read < $size);
fread($this->__sock, 2); /* discard crlf */
break;
/* Multi-bulk reply */
case '*':
$count = substr($reply, 1);
if ($count == '-1') {
return null;
}
$response = array();
for ($i = 0; $i < $count; $i++) {
$bulk_head = trim(fgets($this->__sock, 512));
$size = substr($bulk_head, 1);
if ($size == '-1') {
$response[] = null;
}
else {
$read = 0;
$block = "";
do {
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
$block .= fread($this->__sock, $block_size);
$read += $block_size;
} while ($read < $size);
fread($this->__sock, 2); /* discard crlf */
$response[] = $block;
}
}
break;
/* Integer reply */
case ':':
$response = substr(trim($reply), 1);
break;
default:
throw new RedisException("invalid server response: {$reply}");
break;
}
/* Party on */
return $response;
}
}

View File

@ -0,0 +1,138 @@
<?php
/**
* Redisent, a Redis interface for the modest
* @author Justin Poliey <jdp34@njit.edu>
* @copyright 2009 Justin Poliey <jdp34@njit.edu>
* @license http://www.opensource.org/licenses/mit-license.php The MIT License
* @package Redisent
*/
require 'redisent.php';
/**
* A generalized Redisent interface for a cluster of Redis servers
*/
class RedisentCluster {
/**
* Collection of Redisent objects attached to Redis servers
* @var array
* @access private
*/
private $redisents;
/**
* Aliases of Redisent objects attached to Redis servers, used to route commands to specific servers
* @see RedisentCluster::to
* @var array
* @access private
*/
private $aliases;
/**
* Hash ring of Redis server nodes
* @var array
* @access private
*/
private $ring;
/**
* Individual nodes of pointers to Redis servers on the hash ring
* @var array
* @access private
*/
private $nodes;
/**
* Number of replicas of each node to make around the hash ring
* @var integer
* @access private
*/
private $replicas = 128;
/**
* The commands that are not subject to hashing
* @var array
* @access private
*/
private $dont_hash = array(
'RANDOMKEY', 'DBSIZE',
'SELECT', 'MOVE', 'FLUSHDB', 'FLUSHALL',
'SAVE', 'BGSAVE', 'LASTSAVE', 'SHUTDOWN',
'INFO', 'MONITOR', 'SLAVEOF'
);
/**
* Creates a Redisent interface to a cluster of Redis servers
* @param array $servers The Redis servers in the cluster. Each server should be in the format array('host' => hostname, 'port' => port)
*/
function __construct($servers) {
$this->ring = array();
$this->aliases = array();
foreach ($servers as $alias => $server) {
$this->redisents[] = new Redisent($server['host'], $server['port']);
if (is_string($alias)) {
$this->aliases[$alias] = $this->redisents[count($this->redisents)-1];
}
for ($replica = 1; $replica <= $this->replicas; $replica++) {
$this->ring[crc32($server['host'].':'.$server['port'].'-'.$replica)] = $this->redisents[count($this->redisents)-1];
}
}
ksort($this->ring, SORT_NUMERIC);
$this->nodes = array_keys($this->ring);
}
/**
* Routes a command to a specific Redis server aliased by {$alias}.
* @param string $alias The alias of the Redis server
* @return Redisent The Redisent object attached to the Redis server
*/
function to($alias) {
if (isset($this->aliases[$alias])) {
return $this->aliases[$alias];
}
else {
throw new Exception("That Redisent alias does not exist");
}
}
/* Execute a Redis command on the cluster */
function __call($name, $args) {
/* Pick a server node to send the command to */
$name = strtoupper($name);
if (!in_array($name, $this->dont_hash)) {
$node = $this->nextNode(crc32($args[0]));
$redisent = $this->ring[$node];
}
else {
$redisent = $this->redisents[0];
}
/* Execute the command on the server */
return call_user_func_array(array($redisent, $name), $args);
}
/**
* Routes to the proper server node
* @param integer $needle The hash value of the Redis command
* @return Redisent The Redisent object associated with the hash
*/
private function nextNode($needle) {
$haystack = $this->nodes;
while (count($haystack) > 2) {
$try = floor(count($haystack) / 2);
if ($haystack[$try] == $needle) {
return $needle;
}
if ($needle < $haystack[$try]) {
$haystack = array_slice($haystack, 0, $try + 1);
}
if ($needle > $haystack[$try]) {
$haystack = array_slice($haystack, $try + 1);
}
}
return $haystack[count($haystack)-1];
}
}

134
lib/Resque.php Normal file
View File

@ -0,0 +1,134 @@
<?php
require_once dirname(__FILE__) . '/Resque/Exception.php';
/**
* Base Resque class.
*
* @package Resque
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque
{
const VERSION = '1.0';
/**
* @var Resque_Redis Instance of Resque_Redis that talks to redis.
*/
public static $redis = null;
/**
* Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to.
*
* @param mixed $server Host/port combination separated by a colon, or
* a nested array of servers with host/port pairs.
*/
public static function setBackend($server)
{
if(is_array($server)) {
require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
self::$redis = new Resque_RedisCluster($server);
}
else {
list($host, $port) = explode(':', $server);
require_once dirname(__FILE__) . '/Resque/Redis.php';
self::$redis = new Resque_Redis($host, $port);
}
}
/**
* Return an instance of the Resque_Redis class instantiated for Resque.
*
* @return Resque_Redis Instance of Resque_Redis.
*/
public static function redis()
{
if(is_null(self::$redis)) {
self::setBackend('localhost:6379');
}
return self::$redis;
}
/**
* Push a job to the end of a specific queue. If the queue does not
* exist, then create it as well.
*
* @param string $queue The name of the queue to add the job to.
* @param object $item Job description as an object to be JSON encoded.
*/
public static function push($queue, $item)
{
self::redis()->sadd('queues', $queue);
self::redis()->rpush('queue:' . $queue, json_encode($item));
}
/**
* Pop an item off the end of the specified queue, decode it and
* return it.
*
* @param string $queue The name of the queue to fetch an item from.
* @return object Decoded item from the queue.
*/
public static function pop($queue)
{
$item = self::redis()->lpop('queue:' . $queue);
if(!$item) {
return;
}
return json_decode($item, true);
}
/**
* Return the size (number of pending jobs) of the specified queue.
*
* @return int The size of the queue.
*/
public static function size($queue)
{
return self::redis()->llen('queue:' . $queue);
}
/**
* Create a new job and save it to the specified queue.
*
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $monitor Set to true to be able to monitor the status of a job.
*/
public static function enqueue($queue, $class, $args = null, $trackStatus = false)
{
require_once dirname(__FILE__) . '/Resque/Job.php';
return Resque_Job::create($queue, $class, $args, $trackStatus);
}
/**
* Reserve and return the next available job in the specified queue.
*
* @param string $queue Queue to fetch next available job from.
* @return Resque_Job Instance of Resque_Job to be processed, false if none or error.
*/
public static function reserve($queue)
{
require_once dirname(__FILE__) . '/Resque/Job.php';
return Resque_Job::reserve($queue);
}
/**
* Get an array of all known queues.
*
* @return array Array of queues.
*/
public static function queues()
{
$queues = self::redis()->smembers('queues');
if(!is_array($queues)) {
$queues = array();
}
return $queues;
}
}

13
lib/Resque/Exception.php Normal file
View File

@ -0,0 +1,13 @@
<?php
/**
* Resque exception.
*
* @package Resque
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Exception extends Exception
{
}
?>

59
lib/Resque/Failure.php Normal file
View File

@ -0,0 +1,59 @@
<?php
require_once dirname(__FILE__) . '/Failure/Interface.php';
/**
* Failed Resque job.
*
* @package Resque/Failure
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Failure
{
/**
* @var string Class name representing the backend to pass failed jobs off to.
*/
private static $backend;
/**
* Create a new failed job on the backend.
*
* @param object $payload The contents of the job that has just failed.
* @param object $exception The exception generated when the job failed to run.
* @param object $worker Instance of Resque_Worker that was running this job when it failed.
* @param string $queue The name of the queue that this job was fetched from.
*/
public static function create($payload, Exception $exception, Resque_Worker $worker, $queue)
{
$backend = self::getBackend();
new $backend($payload, $exception, $worker, $queue);
}
/**
* Return an instance of the backend for saving job failures.
*
* @return object Instance of backend object.
*/
public function getBackend()
{
if(self::$backend === null) {
require dirname(__FILE__) . '/Failure/Redis.php';
self::$backend = 'Resque_Failure_Redis';
}
return self::$backend;
}
/**
* Set the backend to use for raised job failures. The supplied backend
* should be the name of a class to be instantiated when a job fails.
* It is your responsibility to have the backend class loaded (or autoloaded)
*
* @param string $backend The class name of the backend to pipe failures to.
*/
public function setBackend($backend)
{
self::$backend = $backend;
}
}

View File

@ -0,0 +1,22 @@
<?php
/**
* Interface that all failure backends should implement.
*
* @package Resque/Failure
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
interface Resque_Failure_Interface
{
/**
* Initialize a failed job class and save it (where appropriate).
*
* @param object $payload Object containing details of the failed job.
* @param object $exception Instance of the exception that was thrown by the failed job.
* @param object $worker Instance of Resque_Worker that received the job.
* @param string $queue The name of the queue the job was fetched from.
*/
public function __construct($payload, $exception, $worker, $queue);
}
?>

View File

@ -0,0 +1,35 @@
<?php
/**
* Redis backend for storing failed Resque jobs.
*
* @package Resque/Failure
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Failure_Redis implements Resque_Failure_Interface
{
/**
* Initialize a failed job class and save it (where appropriate).
*
* @param object $payload Object containing details of the failed job.
* @param object $exception Instance of the exception that was thrown by the failed job.
* @param object $worker Instance of Resque_Worker that received the job.
* @param string $queue The name of the queue the job was fetched from.
*/
public function __construct($payload, $exception, $worker, $queue)
{
$data = new stdClass;
$data->failed_at = strftime('%a %b %d %H:%M:%S %Z %Y');
$data->payload = $payload;
$data->exception = get_class($exception);
$data->error = $exception->getMessage();
$data->backtrace = explode("\n", $exception->getTraceAsString());
$data->worker = (string)$worker;
$data->queue = $queue;
$data = json_encode($data);
Resque::redis()->rpush('failed', $data);
}
}
?>

198
lib/Resque/Job.php Normal file
View File

@ -0,0 +1,198 @@
<?php
require_once dirname(__FILE__) . '/Job/Status.php';
/**
* Resque job.
*
* @package Resque/Job
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job
{
/**
* @var string The name of the queue that this job belongs to.
*/
public $queue;
/**
* @var Resque_Worker Instance of the Resque worker running this job.
*/
public $worker;
/**
* @var object Object containing details of the job.
*/
public $payload;
/**
* Instantiate a new instance of a job.
*
* @param string $queue The queue that the job belongs to.
* @param object $payload Object containing details of the job.
*/
public function __construct($queue, $payload)
{
$this->queue = $queue;
$this->payload = $payload;
}
/**
* Create a new job and save it to the specified queue.
*
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $monitor Set to true to be able to monitor the status of a job.
*/
public static function create($queue, $class, $args = null, $monitor = false)
{
if($args !== null && !is_array($args)) {
throw new InvalidArgumentException(
'Supplied $args must be an array.'
);
}
$id = md5(uniqid('', true));
Resque::push($queue, array(
'class' => $class,
'args' => $args,
'id' => $id,
));
if($monitor) {
Resque_Job_Status::create($id);
}
return $id;
}
/**
* Find the next available job from the specified queue and return an
* instance of Resque_Job for it.
*
* @param string $queue The name of the queue to check for a job in.
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
*/
public static function reserve($queue)
{
$payload = Resque::pop($queue);
if(!$payload) {
return false;
}
return new Resque_Job($queue, $payload);
}
/**
* Update the status of the current job.
*
* @param int $status Status constant from Resque_Job_Status indicating the current status of a job.
*/
public function updateStatus($status)
{
if(empty($this->payload->id)) {
return;
}
$statusInstance = new Resque_Job_Status($this->payload['id']);
$statusInstance->update($status);
}
/**
* Return the status of the current job.
*
* @return int The status of the job as one of the Resque_Job_Status constants.
*/
public function getStatus()
{
$status = new Resque_Job_Status($this->payload['id']);
return $status->get();
}
/**
* Actually execute a job by calling the perform method on the class
* associated with the job with the supplied arguments.
*
* @throws Resque_Exception When the job's class could not be found or it does not contain a perform method.
*/
public function perform()
{
if(!class_exists($this->payload['class'])) {
throw new Resque_Exception(
'Could not find job class ' . $this->payload['class'] . '.'
);
}
if(!method_exists($this->payload['class'], 'perform')) {
throw new Resque_Exception(
'Job class ' . $this->payload['class'] . ' does not contain a perform method.'
);
}
$instance = new $this->payload['class'];
$isntance->args = $this->payload['args'];
if(method_exists($instance, 'setUp')) {
$instance->setUp();
}
$instance->perform();
if(method_exists($instance, 'tearDown')) {
$instance->tearDown();
}
}
/**
* Mark the current job as having failed.
*/
public function fail($exception)
{
$this->updateStatus(Resque_Job_Status::STATUS_FAILED);
require_once dirname(__FILE__) . '/Failure.php';
Resque_Failure::create(
$this->payload,
$exception,
$this->worker,
$this->queue
);
Resque_Stat::incr('failed');
Resque_Stat::incr('failed:' . $this->worker);
}
/**
* Re-queue the current job.
*/
public function recreate()
{
$status = new Resque_Job_Status($this->payload['id']);
$monitor = false;
if($status->isTracking()) {
$monitor = true;
}
return self::create($this->queue, $this->payload['class'], $this->payload['args'], $monitor);
}
/**
* Generate a string representation used to describe the current job.
*
* @return string The string representation of the job.
*/
public function __toString()
{
$name = array(
'Job{' . $this->queue .'}'
);
if(!empty($this->payload['id'])) {
$name[] = 'ID: ' . $this->payload['id'];
}
$name[] = $this->payload['class'];
if(!empty($this->payload['args'])) {
$name[] = json_encode($this->payload['args']);
}
return '(' . implode(' | ', $name) . ')';
}
}
?>

View File

@ -0,0 +1,13 @@
<?php
/**
* Runtime exception class for a job that does not exit cleanly.
*
* @package Resque/Job
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_DirtyExitException extends RuntimeException
{
}

144
lib/Resque/Job/Status.php Normal file
View File

@ -0,0 +1,144 @@
<?php
/**
* Status tracker/information for a job.
*
* @package Resque/Job
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_Status
{
const STATUS_WAITING = 1;
const STATUS_RUNNING = 2;
const STATUS_FAILED = 3;
const STATUS_COMPLETE = 4;
/**
* @var string The ID of the job this status class refers back to.
*/
private $id;
/**
* @var mixed Cache variable if the status of this job is being monitored or not.
* True/false when checked at least once or null if not checked yet.
*/
private $isTracking = null;
/**
* @var array Array of statuses that are considered final/complete.
*/
private static $completeStatuses = array(
self::STATUS_FAILED,
self::STATUS_COMPLETE
);
/**
* Setup a new instance of the job monitor class for the supplied job ID.
*
* @param string $id The ID of the job to manage the status for.
*/
public function __construct($id)
{
$this->id = $id;
}
/**
* Create a new status monitor item for the supplied job ID. Will create
* all necessary keys in Redis to monitor the status of a job.
*
* @param string $id The ID of the job to monitor the status of.
*/
public static function create($id)
{
$statusPacket = array(
'status' => self::STATUS_WAITING,
'updated' => time(),
'started' => time(),
);
Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket));
}
/**
* Check if we're actually checking the status of the loaded job status
* instance.
*
* @return boolean True if the status is being monitored, false if not.
*/
public function isTracking()
{
if($this->isTracking === false) {
return false;
}
if(!Resque::redis()->exists((string)$this)) {
$this->isTracking = false;
return false;
}
$this->isTracking = true;
return true;
}
/**
* Update the status indicator for the current job with a new status.
*
* @param int The status of the job (see constants in Resque_Job_Status)
*/
public function update($status)
{
if(!$this->isTracking()) {
return;
}
$statusPacket = array(
'status' => $status,
'updated' => time(),
);
Resque::redis()->set((string)$this, json_encode($statusPacket));
// Expire the status for completed jobs after 24 hours
if(in_array($status, self::$completeStatuses)) {
Resque::redis()->expire((string)$this, 86400);
}
}
/**
* Fetch the status for the job being monitored.
*
* @return mixed False if the status is not being monitored, otherwise the status as
* as an integer, based on the Resque_Job_Status constants.
*/
public function get()
{
if(!$this->isTracking()) {
return false;
}
$statusPacket = json_decode(Resque::redis()->get((string)$this), true);
if(!$statusPacket) {
return false;
}
return $statusPacket['status'];
}
/**
* Stop tracking the status of a job.
*/
public function stop()
{
Resque::redis()->del((string)$this);
}
/**
* Generate a string representation of this object.
*
* @return string String representation of the current job status class.
*/
public function __toString()
{
return 'job:' . $this->id . ':status';
}
}
?>

101
lib/Resque/Redis.php Normal file
View File

@ -0,0 +1,101 @@
<?php
// Third- party apps may have already loaded Resident from elsewhere
// so lets be careful.
if(!class_exists('Redisent')) {
require_once dirname(__FILE__) . '/../Redisent/Redisent.php';
}
/**
* Extended Redisent class used by Resque for all communication with
* redis. Essentially adds namespace support to Redisent.
*
* @package Resque/Redis
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Redis extends Redisent
{
/**
* @var array List of all commands in Redis that supply a key as their
* first argument. Used to prefix keys with the Resque namespace.
*/
private $keyCommands = array(
'exists',
'del',
'type',
'keys',
'expire',
'ttl',
'move',
'set',
'get',
'getset',
'setnx',
'incr',
'incrby',
'decrby',
'decrby',
'rpush',
'lpush',
'llen',
'lrange',
'ltrim',
'lindex',
'lset',
'lrem',
'lpop',
'rpop',
'sadd',
'srem',
'spop',
'scard',
'sismember',
'smembers',
'srandmember',
'zadd',
'zrem',
'zrange',
'zrevrange',
'zrangebyscore',
'zcard',
'zscore',
'zremrangebyscore',
'sort'
);
// sinterstore
// sunion
// sunionstore
// sdiff
// sdiffstore
// sinter
// smove
// rename
// rpoplpush
// mget
// msetnx
// mset
// renamenx
/**
* Magic method to handle all function requests and prefix key based
* operations with the 'resque:' key prefix.
*
* @param string $name The name of the method called.
* @param array $args Array of supplied arguments to the method.
* @return mixed Return value from Resident::call() based on the command.
*/
public function __call($name, $args) {
$args = func_get_args();
if(in_array($name, $this->keyCommands)) {
$args[1][0] = 'resque:' . $args[1][0];
}
try {
return parent::__call($name, $args[1]);
}
catch(RedisException $e) {
return false;
}
}
}
?>

101
lib/Resque/RedisCluster.php Normal file
View File

@ -0,0 +1,101 @@
<?php
// Third- party apps may have already loaded Resident from elsewhere
// so lets be careful.
if(!class_exists('RedisentCluster')) {
require_once dirname(__FILE__) . '/../Redisent/RedisentCluster.php';
}
/**
* Extended Redisent class used by Resque for all communication with
* redis. Essentially adds namespace support to Redisent.
*
* @package Resque/Redis
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_RedisCluster extends RedisentCluster
{
/**
* @var array List of all commands in Redis that supply a key as their
* first argument. Used to prefix keys with the Resque namespace.
*/
private $keyCommands = array(
'exists',
'del',
'type',
'keys',
'expire',
'ttl',
'move',
'set',
'get',
'getset',
'setnx',
'incr',
'incrby',
'decrby',
'decrby',
'rpush',
'lpush',
'llen',
'lrange',
'ltrim',
'lindex',
'lset',
'lrem',
'lpop',
'rpop',
'sadd',
'srem',
'spop',
'scard',
'sismember',
'smembers',
'srandmember',
'zadd',
'zrem',
'zrange',
'zrevrange',
'zrangebyscore',
'zcard',
'zscore',
'zremrangebyscore',
'sort'
);
// sinterstore
// sunion
// sunionstore
// sdiff
// sdiffstore
// sinter
// smove
// rename
// rpoplpush
// mget
// msetnx
// mset
// renamenx
/**
* Magic method to handle all function requests and prefix key based
* operations with the 'resque:' key prefix.
*
* @param string $name The name of the method called.
* @param array $args Array of supplied arguments to the method.
* @return mixed Return value from Resident::call() based on the command.
*/
public function __call($name, $args) {
$args = func_get_args();
if(in_array($name, $this->keyCommands)) {
$args[1][0] = 'resque:' . $args[1][0];
}
try {
return parent::__call($name, $args[1]);
}
catch(RedisException $e) {
return false;
}
}
}
?>

57
lib/Resque/Stat.php Normal file
View File

@ -0,0 +1,57 @@
<?php
/**
* Resque statistic management (jobs processed, failed, etc)
*
* @package Resque/Stat
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Stat
{
/**
* Get the value of the supplied statistic counter for the specified statistic.
*
* @param string $stat The name of the statistic to get the stats for.
* @return mixed Value of the statistic.
*/
public function get($stat)
{
return (int)Resque::redis()->get('stat:' . $stat);
}
/**
* Increment the value of the specified statistic by a certain amount (default is 1)
*
* @param string $stat The name of the statistic to increment.
* @param int $by The amount to increment the statistic by.
* @return boolean True if successful, false if not.
*/
public function incr($stat, $by = 1)
{
return (bool)Resque::redis()->incrby('stat:' . $stat, $by);
}
/**
* Decrement the value of the specified statistic by a certain amount (default is 1)
*
* @param string $stat The name of the statistic to decrement.
* @param int $by The amount to decrement the statistic by.
* @return boolean True if successful, false if not.
*/
public function decr($stat, $by = 1)
{
return (bool)Resque::redis()->decrby('stat:' . $stat, $by);
}
/**
* Delete a statistic with the given name.
*
* @param string $stat The name of the statistic to delete.
* @return boolean True if successful, false if not.
*/
public function clear($stat)
{
return (bool)Resque::redis()->del('stat:' . $stat);
}
}

561
lib/Resque/Worker.php Normal file
View File

@ -0,0 +1,561 @@
<?php
require_once dirname(__FILE__) . '/Stat.php';
require_once dirname(__FILE__) . '/Job.php';
require_once dirname(__FILE__) . '/Job/DirtyExitException.php';
/**
* Resque worker that handles checking queues for jobs, fetching them
* off the queues, running them and handling the result.
*
* @package Resque/Worker
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Worker
{
const LOG_NONE = 0;
const LOG_NORMAL = 1;
const LOG_VERBOSE = 2;
/**
* @var int Current log level of this worker.
*/
public $logLevel = 0;
/**
* @var array Array of all associated queues for this worker.
*/
private $queues = array();
/**
* @var string The hostname of this worker.
*/
private $hostname;
/**
* @var boolean True if on the next iteration, the worker should shutdown.
*/
private $shutdown = false;
/**
* @var boolean True if this worker is paused.
*/
private $paused = false;
/**
* @var string String identifying this worker.
*/
private $id;
/**
* @var Resque_Job Current job, if any, being processed by this worker.
*/
private $currentJob = null;
/**
* Return all workers known to Resque as instantiated instances.
*/
public static function all()
{
$workers = Resque::redis()->smembers('workers');
if(!is_array($workers)) {
$workers = array();
}
$instances = array();
foreach($workers as $workerId) {
$instances[] = self::find($workerId);
}
return $instances;
}
/**
* Given a worker ID, check if it is registered/valid.
*
* @param string $workerId ID of the worker.
* @return boolean True if the worker exists, false if not.
*/
public static function exists($workerId)
{
return (bool)Resque::redis()->sismember('workers', $workerId);
}
/**
* Given a worker ID, find it and return an instantiated worker class for it.
*
* @param string $workerId The ID of the worker.
* @return Resque_Worker Instance of the worker. False if the worker does not exist.
*/
public static function find($workerId)
{
if(!self::exists($workerId)) {
return false;
}
list($hostname, $pid, $queues) = explode(':', $workerId, 3);
$queues = explode(',', $queues);
$worker = new self($queues);
$worker->setId($workerId);
return $worker;
}
/**
* Set the ID of this worker to a given ID string.
*
* @param string $workerId ID for the worker.
*/
public function setId($workerId)
{
$this->id = $workerId;
}
/**
* Instantiate a new worker, given a list of queues that it should be working
* on. The list of queues should be supplied in the priority that they should
* be checked for jobs (first come, first served)
*
* Passing a single '*' allows the worker to work on all queues in alphabetical
* order. You can easily add new queues dynamically and have them worked on using
* this method.
*
* @param string|array $queues String with a single queue name, array with multiple.
*/
public function __construct($queues)
{
if(!is_array($queues)) {
$queues = array($queues);
}
$this->queues = $queues;
if(function_exists('gethostname')) {
$hostname = gethostname();
}
else {
$hostname = php_uname('n');
}
$this->hostname = $hostname;
$this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);
}
/**
* The primary loop for a worker which when called on an instance starts
* the worker's life cycle.
*
* Queues are checked every $interval (seconds) for new jobs.
*
* @param int $interval How often to check for new jobs across the queues.
*/
public function work($interval = 5)
{
$this->updateProcLine('Starting');
$this->startup();
while(true) {
if($this->shutdown) {
break;
}
// Attempt to find and reserve a job
$job = false;
if(!$this->paused) {
$job = $this->reserve();
}
if(!$job) {
// For an interval of 0, break now - helps with unit testing etc
if($interval == 0) {
break;
}
// If no job was found, we sleep for $interval before continuing and checking again
$this->log('Sleeping for ' . $interval, true);
if($this->paused) {
$this->updateProcLine('Paused');
}
else {
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
}
sleep($interval);
continue;
}
$this->log('got ' . $job);
$this->workingOn($job);
$this->child = $this->fork();
// Forked and we're the child. Run the job.
if($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->perform($job);
if($this->child === 0) {
exit(0);
}
}
if($this->child > 0) {
// Parent process, sit and wait
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
// Wait until the child process finishes before continuing
pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
}
}
$this->child = null;
$this->doneWorking();
}
$this->unregisterWorker();
}
/**
* Process a single job.
*
* @param object|null $job The job to be processed.
*/
public function perform(Resque_Job $job)
{
try {
$job->perform();
}
catch(Exception $e) {
$this->log($job . ' failed: ' . $e->getMessage());
$job->fail($e);
return;
}
$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
$this->log('done ' . $job);
}
/**
* Attempt to find a job from the top of one of the queues for this worker.
*
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
*/
public function reserve()
{
$queues = $this->queues();
if(!is_array($queues)) {
return;
}
foreach($queues as $queue) {
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
$job = Resque_Job::reserve($queue);
if($job) {
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
return $job;
}
}
return false;
}
/**
* Return an array containing all of the queues that this worker should use
* when searching for jobs.
*
* If * is found in the list of queues, every queue will be searched in
* alphabetic order.
*
* @return array Array of associated queues.
*/
public function queues()
{
if(!in_array('*', $this->queues)) {
return $this->queues;
}
$queues = Resque::queues();
sort($queues);
return $queues;
}
/**
* Attempt to fork a child process from the parent to run a job in.
*
* Return values are those of pcntl_fork().
*
* @return int -1 if the fork failed, 0 for the forked child, the PID of the child for the parent.
*/
private function fork()
{
if(!function_exists('pcntl_fork')) {
return false;
}
$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}
return $pid;
}
/**
* Perform necessary actions to start a worker.
*/
private function startup()
{
$this->registerSigHandlers();
$this->pruneDeadWorkers();
$this->registerWorker();
}
/**
* On supported systems (with the PECL proctitle module installed), update
* the name of the currently running process to indicate the current state
* of a worker.
*
* @param string $status The updated process title.
*/
private function updateProcLine($status)
{
if(function_exists('setproctitle')) {
setproctitle('resque-' . Resque::VERSION . ': ' . $status);
}
}
/**
* Register signal handlers that a worker should respond to.
*
* TERM: Shutdown immediately and stop processing jobs.
* INT: Shutdown immediately and stop processing jobs.
* QUIT: Shutdown after the current job finishes processing.
* USR1: Kill the forked child immediately and continue processing jobs.
*/
private function registerSigHandlers()
{
if(!function_exists('pcntl_signal')) {
return;
}
declare(ticks = 1);
pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
pcntl_signal(SIGINT, array($this, 'shutDownNow'));
pcntl_signal(SIGQUIT, array($this, 'shutdown'));
pcntl_signal(SIGUSR1, array($this, 'killChild'));
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
$this->log('Registered signals', self::LOG_VERBOSE);
}
/**
* Signal handler callback for USR2, pauses processing of new jobs.
*/
public function pauseProcessing()
{
$this->log('USR2 received; pausing job processing');
$this->paused = true;
}
/**
* Signal handler callback for CONT, resumes worker allowing it to pick
* up new jobs.
*/
public function unPauseProcessing()
{
$this->log('CONT received; resuming job processing');
$this->paused = false;
}
/**
* Schedule a worker for shutdown. Will finish processing the current job
* and when the timeout interval is reached, the worker will shut down.
*/
public function shutdown()
{
$this->shutdown = true;
$this->log('Exiting...');
}
/**
* Force an immediate shutdown of the worker, killing any child jobs
* currently running.
*/
public function shutdownNow()
{
$this->shutdown();
$this->killChild();
}
/**
* Kill a forked child job immediately. The job it is processing will not
* be completed.
*/
public function killChild()
{
if(!$this->child) {
$this->log('No child to kill.', self::LOG_VERBOSE);
return;
}
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
posix_kill($this->child, SIGKILL);
$this->child = null;
}
else {
$this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE);
$this->shutdown();
}
}
/**
* Look for any workers which should be running on this server and if
* they're not, remove them from Redis.
*
* This is a form of garbage collection to handle cases where the
* server may have been killed and the Resque workers did not die gracefully
* and therefore leave state information in Redis.
*/
public function pruneDeadWorkers()
{
$workerPids = $this->workerPids();
$workers = self::all();
foreach($workers as $worker) {
list($host, $pid, $queues) = explode(':', (string)$worker, 3);
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
continue;
}
$this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE);
$worker->unregisterWorker();
}
}
/**
* Return an array of process IDs for all of the Resque workers currently
* running on this machine.
*
* @return array Array of Resque worker process IDs.
*/
public function workerPids()
{
$pids = array();
exec('ps -A -o pid,command | grep [r]esque', $cmdOutput);
foreach($cmdOutput as $line) {
list($pids[],) = explode(' ', $line, 2);
}
return $pids;
}
/**
* Register this worker in Redis.
*/
public function registerWorker()
{
Resque::redis()->sadd('workers', $this);
Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y'));
}
/**
* Unregister this worker in Redis. (shutdown etc)
*/
public function unregisterWorker()
{
if(is_object($this->currentJob)) {
$this->currentJob->fail(new Resque_Job_DirtyExitException);
}
$id = (string)$this;
Resque::redis()->srem('workers', $id);
Resque::redis()->del('worker:' . $id);
Resque::redis()->del('worker:' . $id . ':started');
Resque_Stat::clear('processed:' . $id);
Resque_Stat::clear('failed:' . $id);
}
/**
* Tell Redis which job we're currently working on.
*
* @param object $job Resque_Job instance containing the job we're working on.
*/
public function workingOn(Resque_Job $job)
{
$job->worker = $this;
$this->currentJob = $job;
$job->updateStatus(Resque_Job_Status::STATUS_RUNNING);
$data = json_encode(array(
'queue' => $job->queue,
'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'),
'payload' => $job->payload
));
Resque::redis()->set('worker:' . $job->worker, $data);
}
/**
* Notify Redis that we've finished working on a job, clearing the working
* state and incrementing the job stats.
*/
public function doneWorking()
{
$this->currentJob = null;
Resque_Stat::incr('processed');
Resque_Stat::incr('processed:' . (string)$this);
Resque::redis()->del('worker:' . (string)$this);
}
/**
* Generate a string representation of this worker.
*
* @return string String identifier for this worker instance.
*/
public function __toString()
{
return $this->id;
}
/**
* Output a given log message to STDOUT.
*
* @param string $message Message to output.
*/
public function log($message)
{
if($this->logLevel == self::LOG_NORMAL) {
fwrite(STDOUT, "*** " . $message . "\n");
}
else if($this->logLevel == self::LOG_VERBOSE) {
fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
}
}
/**
* Return an object describing the job this worker is currently working on.
*
* @return object Object with details of current job.
*/
public function job()
{
$job = Resque::redis()->get('worker:' . $this);
if(!$job) {
return array();
}
else {
return json_decode($job, true);
}
}
/**
* Get a statistic belonging to this worker.
*
* @param string $stat Statistic to fetch.
* @return int Statistic value.
*/
public function getStat($stat)
{
return Resque_Stat::get($stat . ':' . $this);
}
}
?>

23
phpunit.xml Normal file
View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit backupGlobals="false"
backupStaticAttributes="false"
colors="false"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
syntaxCheck="false"
>
<testsuites>
<testsuite name="Resque Test Suite">
<directory>./test/Resque/</directory>
</testsuite>
</testsuites>
<filter>
<whitelist>
<directory suffix=".php">./lib/Resque/</directory>
</whitelist>
</filter>
</phpunit>

68
resque.php Normal file
View File

@ -0,0 +1,68 @@
<?php
if(empty($_ENV)) {
die("\$_ENV does not seem to be available. Ensure 'E' is in your variables_order php.ini setting\n");
}
if(empty($_ENV['QUEUE'])) {
die("Set QUEUE env var containing the list of queues to work.\n");
}
if(!empty($_ENV['APP_INCLUDE'])) {
if(!file_exists($_ENV['APP_INCLUDE'])) {
die('APP_INCLUDE ('.$_ENV['APP_INCLUDE'].") does not exist.\n");
}
require_once $_ENV['APP_INCLUDE'];
}
require 'lib/Resque.php';
require 'lib/Resque/Worker.php';
if(!empty($_ENV['REDIS_BACKEND'])) {
Resque::setBackend($_ENV['REDIS_BACKEND']);
}
$logLevel = 0;
if(!empty($_ENV['LOGGING']) || !empty($_ENV['VERBOSE'])) {
$logLevel = Resque_Worker::LOG_NORMAL;
}
else if(!empty($_ENV['VVERBOSE'])) {
$logLevel = Resque_Worker::LOG_VERBOSE;
}
$interval = 5;
if(!empty($_ENV['INTERVAL'])) {
$interval = $_ENV['INTERVAL'];
}
$count = 1;
if(!empty($_ENV['COUNT']) && $_ENV['COUNT'] > 1) {
$count = $_ENV['COUNT'];
}
if($count > 1) {
for($i = 0; $i < $count; ++$i) {
$pid = pcntl_fork();
if($pid == -1) {
die("Could not fork worker ".$i."\n");
}
// Child, start the worker
else if(!$pid) {
$queues = explode(',', $_ENV['QUEUE']);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
$worker->work($interval);
break;
}
}
}
// Start a single worker
else {
$queues = explode(',', $_ENV['QUEUE']);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
$worker->work($interval);
}
?>

View File

@ -0,0 +1,102 @@
<?php
require_once dirname(__FILE__) . '/bootstrap.php';
/**
* Resque_Job_Status tests.
*
* @package Resque/Tests
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase
{
public function setUp()
{
parent::setUp();
// Register a worker to test with
$this->worker = new Resque_Worker('jobs');
}
public function testJobStatusCanBeTracked()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
$status = new Resque_Job_Status($token);
$this->assertTrue($status->isTracking());
}
public function testJobStatusIsReturnedViaJobInstance()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
$job = Resque_Job::reserve('jobs');
$this->assertEquals(Resque_Job_Status::STATUS_WAITING, $job->getStatus());
}
public function testQueuedJobReturnsQueuedStatus()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
$status = new Resque_Job_Status($token);
$this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get());
}
public function testRunningJobReturnsRunningStatus()
{
$token = Resque::enqueue('jobs', 'Failing_Job', null, true);
$job = $this->worker->reserve();
$this->worker->workingOn($job);
$status = new Resque_Job_Status($token);
$this->assertEquals(Resque_Job_Status::STATUS_RUNNING, $status->get());
}
public function testFailedJobReturnsFailedStatus()
{
$token = Resque::enqueue('jobs', 'Failing_Job', null, true);
$this->worker->work(0);
$status = new Resque_Job_Status($token);
$this->assertEquals(Resque_Job_Status::STATUS_FAILED, $status->get());
}
public function testCompletedJobReturnsCompletedStatus()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
$this->worker->work(0);
$status = new Resque_Job_Status($token);
$this->assertEquals(Resque_Job_Status::STATUS_COMPLETE, $status->get());
}
public function testStatusIsNotTrackedWhenToldNotTo()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, false);
$status = new Resque_Job_Status($token);
$this->assertFalse($status->isTracking());
}
public function testStatusTrackingCanBeStopped()
{
Resque_Job_Status::create('test');
$status = new Resque_Job_Status('test');
$this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get());
$status->stop();
$this->assertFalse($status->get());
}
public function testRecreatedJobWithTrackingStillTracksStatus()
{
$originalToken = Resque::enqueue('jobs', 'Test_Job', null, true);
$job = $this->worker->reserve();
// Mark this job as being worked on to ensure that the new status is still
// waiting.
$this->worker->workingOn($job);
// Now recreate it
$newToken = $job->recreate();
// Make sure we've got a new job returned
$this->assertNotEquals($originalToken, $newToken);
// Now check the status of the new job
$newJob = Resque_Job::reserve('jobs');
$this->assertEquals(Resque_Job_Status::STATUS_WAITING, $newJob->getStatus());
}
}

View File

@ -0,0 +1,169 @@
<?php
require_once dirname(__FILE__) . '/bootstrap.php';
/**
* Resque_Job tests.
*
* @package Resque/Tests
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Tests_JobTest extends Resque_Tests_TestCase
{
protected $worker;
public function setUp()
{
parent::setUp();
// Register a worker to test with
$this->worker = new Resque_Worker('jobs');
$this->worker->registerWorker();
}
public function testJobCanBeQueued()
{
$this->assertTrue((bool)Resque::enqueue('jobs', 'Test_Job'));
}
public function testQeueuedJobCanBeReserved()
{
Resque::enqueue('jobs', 'Test_Job');
$job = Resque_Job::reserve('jobs');
if($job == false) {
$this->fail('Job could not be reserved.');
}
$this->assertEquals('jobs', $job->queue);
$this->assertEquals('Test_Job', $job->payload['class']);
}
/**
* @expectedException InvalidArgumentException
*/
public function testObjectArgumentsCannotBePassedToJob()
{
$args = new stdClass;
$args->test = 'somevalue';
Resque::enqueue('jobs', 'Test_Job', $args);
}
public function testQueuedJobReturnsExactSamePassedInArguments()
{
$args = array(
'int' => 123,
'numArray' => array(
1,
2,
),
'assocArray' => array(
'key1' => 'value1',
'key2' => 'value2'
),
);
Resque::enqueue('jobs', 'Test_Job', $args);
$job = Resque_Job::reserve('jobs');
$this->assertEquals($args, $job->payload['args']);
}
public function testAfterJobIsReservedItIsRemoved()
{
Resque::enqueue('jobs', 'Test_Job');
Resque_Job::reserve('jobs');
$this->assertFalse(Resque_Job::reserve('jobs'));
}
public function testRecreatedJobMatchesExistingJob()
{
$args = array(
'int' => 123,
'numArray' => array(
1,
2,
),
'assocArray' => array(
'key1' => 'value1',
'key2' => 'value2'
),
);
Resque::enqueue('jobs', 'Test_Job', $args);
$job = Resque_Job::reserve('jobs');
// Now recreate it
$job->recreate();
$newJob = Resque_Job::reserve('jobs');
$this->assertEquals($job->payload['class'], $newJob->payload['class']);
$this->assertEquals($job->payload['args'], $newJob->payload['args']);
}
public function testFailedJobExceptionsAreCaught()
{
$payload = array(
'class' => 'Failing_Job',
'args' => null
);
$job = new Resque_Job('jobs', $payload);
$job->worker = $this->worker;
$this->worker->perform($job);
$this->assertEquals(1, Resque_Stat::get('failed'));
$this->assertEquals(1, Resque_Stat::get('failed:'.$this->worker));
}
/**
* @expectedException Resque_Exception
*/
public function testJobWithoutPerformMethodThrowsException()
{
Resque::enqueue('jobs', 'Test_Job_Without_Perform_Method');
$job = $this->worker->reserve();
$job->worker = $this->worker;
$job->perform();
}
/**
* @expectedException Resque_Exception
*/
public function testInvalidJobThrowsException()
{
Resque::enqueue('jobs', 'Invalid_Job');
$job = $this->worker->reserve();
$job->worker = $this->worker;
$job->perform();
}
public function testJobWithSetUpCallbackFiresSetUp()
{
$payload = array(
'class' => 'Test_Job_With_SetUp',
'args' => array(
'somevar',
'somevar2',
),
);
$job = new Resque_Job('jobs', $payload);
$job->perform();
$this->assertTrue(Test_Job_With_SetUp::$called);
}
public function testJobWithTearDownCallbackFiresSetUp()
{
$payload = array(
'class' => 'Test_Job_With_TearDown',
'args' => array(
'somevar',
'somevar2',
),
);
$job = new Resque_Job('jobs', $payload);
$job->perform();
$this->assertTrue(Test_Job_With_TearDown::$called);
}
}

View File

@ -0,0 +1,52 @@
<?php
require_once dirname(__FILE__) . '/bootstrap.php';
/**
* Resque_Stat tests.
*
* @package Resque/Tests
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Tests_StatTest extends Resque_Tests_TestCase
{
public function testStatCanBeIncremented()
{
Resque_Stat::incr('test_incr');
Resque_Stat::incr('test_incr');
$this->assertEquals(2, $this->redis->get('resque:stat:test_incr'));
}
public function testStatCanBeIncrementedByX()
{
Resque_Stat::incr('test_incrX', 10);
Resque_Stat::incr('test_incrX', 11);
$this->assertEquals(21, $this->redis->get('resque:stat:test_incrX'));
}
public function testStatCanBeDecremented()
{
Resque_Stat::incr('test_decr', 22);
Resque_Stat::decr('test_decr');
$this->assertEquals(21, $this->redis->get('resque:stat:test_decr'));
}
public function testStatCanBeDecrementedByX()
{
Resque_Stat::incr('test_decrX', 22);
Resque_Stat::decr('test_decrX', 11);
$this->assertEquals(11, $this->redis->get('resque:stat:test_decrX'));
}
public function testGetStatByName()
{
Resque_Stat::incr('test_get', 100);
$this->assertEquals(100, Resque_Stat::get('test_get'));
}
public function testGetUnknownStatReturns0()
{
$this->assertEquals(0, Resque_Stat::get('test_get_unknown'));
}
}

View File

@ -0,0 +1,24 @@
<?php
/**
* Resque test case class. Contains setup and teardown methods.
*
* @package Resque/Tests
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Tests_TestCase extends PHPUnit_Framework_TestCase
{
protected $resque;
protected $redis;
public function setUp()
{
$config = file_get_contents(REDIS_CONF);
preg_match('#^\s*port\s+([0-9]+)#m', $config, $matches);
$this->redis = new Redisent('localhost', $matches[1]);
// Flush redis
$this->redis->flushAll();
}
}

View File

@ -0,0 +1,253 @@
<?php
require_once dirname(__FILE__) . '/bootstrap.php';
/**
* Resque_Worker tests.
*
* @package Resque/Tests
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Tests_WorkerTest extends Resque_Tests_TestCase
{
public function testWorkerRegistersInList()
{
$worker = new Resque_Worker('*');
$worker->registerWorker();
// Make sure the worker is in the list
$this->assertTrue((bool)$this->redis->sismember('resque:workers', (string)$worker));
}
public function testGetAllWorkers()
{
$num = 3;
// Register a few workers
for($i = 0; $i < $num; ++$i) {
$worker = new Resque_Worker('queue_' . $i);
$worker->registerWorker();
}
// Now try to get them
$this->assertEquals($num, count(Resque_Worker::all()));
}
public function testGetWorkerById()
{
$worker = new Resque_Worker('*');
$worker->registerWorker();
$newWorker = Resque_Worker::find((string)$worker);
$this->assertEquals((string)$worker, (string)$newWorker);
}
public function testInvalidWorkerDoesNotExist()
{
$this->assertFalse(Resque_Worker::exists('blah'));
}
public function testWorkerCanUnregister()
{
$worker = new Resque_Worker('*');
$worker->registerWorker();
$worker->unregisterWorker();
$this->assertFalse(Resque_Worker::exists((string)$worker));
$this->assertEquals(array(), Resque_Worker::all());
$this->assertEquals(array(), $this->redis->smembers('resque:workers'));
}
public function testPausedWorkerDoesNotPickUpJobs()
{
$worker = new Resque_Worker('*');
$worker->pauseProcessing();
Resque::enqueue('jobs', 'Test_Job');
$worker->work(0);
$worker->work(0);
$this->assertEquals(0, Resque_Stat::get('processed'));
}
public function testResumedWorkerPicksUpJobs()
{
$worker = new Resque_Worker('*');
$worker->pauseProcessing();
Resque::enqueue('jobs', 'Test_Job');
$worker->work(0);
$this->assertEquals(0, Resque_Stat::get('processed'));
$worker->unPauseProcessing();
$worker->work(0);
$this->assertEquals(1, Resque_Stat::get('processed'));
}
public function testWorkerCanWorkOverMultipleQueues()
{
$worker = new Resque_Worker(array(
'queue1',
'queue2'
));
$worker->registerWorker();
Resque::enqueue('queue1', 'Test_Job_1');
Resque::enqueue('queue2', 'Test_Job_2');
$job = $worker->reserve();
$this->assertEquals('queue1', $job->queue);
$job = $worker->reserve();
$this->assertEquals('queue2', $job->queue);
}
public function testWorkerWorksQueuesInSpecifiedOrder()
{
$worker = new Resque_Worker(array(
'high',
'medium',
'low'
));
$worker->registerWorker();
// Queue the jobs in a different order
Resque::enqueue('low', 'Test_Job_1');
Resque::enqueue('high', 'Test_Job_2');
Resque::enqueue('medium', 'Test_Job_3');
// Now check we get the jobs back in the right order
$job = $worker->reserve();
$this->assertEquals('high', $job->queue);
$job = $worker->reserve();
$this->assertEquals('medium', $job->queue);
$job = $worker->reserve();
$this->assertEquals('low', $job->queue);
}
public function testWildcardQueueWorkerWorksAllQueues()
{
$worker = new Resque_Worker('*');
$worker->registerWorker();
Resque::enqueue('queue1', 'Test_Job_1');
Resque::enqueue('queue2', 'Test_Job_2');
$job = $worker->reserve();
$this->assertEquals('queue1', $job->queue);
$job = $worker->reserve();
$this->assertEquals('queue2', $job->queue);
}
public function testWorkerDoesNotWorkOnUnknownQueues()
{
$worker = new Resque_Worker('queue1');
$worker->registerWorker();
Resque::enqueue('queue2', 'Test_Job');
$this->assertFalse($worker->reserve());
}
public function testWorkerClearsItsStatusWhenNotWorking()
{
Resque::enqueue('jobs', 'Test_Job');
$worker = new Resque_Worker('jobs');
$job = $worker->reserve();
$worker->workingOn($job);
$worker->doneWorking();
$this->assertEquals(array(), $worker->job());
}
public function testWorkerRecordsWhatItIsWorkingOn()
{
$worker = new Resque_Worker('jobs');
$worker->registerWorker();
$payload = array(
'class' => 'Test_Job'
);
$job = new Resque_Job('jobs', $payload);
$worker->workingOn($job);
$job = $worker->job();
$this->assertEquals('jobs', $job['queue']);
if(!isset($job['run_at'])) {
$this->fail('Job does not have run_at time');
}
$this->assertEquals($payload, $job['payload']);
}
public function testWorkerErasesItsStatsWhenShutdown()
{
Resque::enqueue('jobs', 'Test_Job');
Resque::enqueue('jobs', 'Invalid_Job');
$worker = new Resque_Worker('jobs');
$worker->work(0);
$worker->work(0);
$this->assertEquals(0, $worker->getStat('processed'));
$this->assertEquals(0, $worker->getStat('failed'));
}
public function testWorkerCleansUpDeadWorkersOnStartup()
{
// Register a good worker
$goodWorker = new Resque_Worker('jobs');
$goodWorker->registerWorker();
$workerId = explode(':', $goodWorker);
// Register some bad workers
$worker = new Resque_Worker('jobs');
$worker->setId($workerId[0].':1:jobs');
$worker->registerWorker();
$worker = new Resque_Worker(array('high', 'low'));
$worker->setId($workerId[0].':2:high,low');
$worker->registerWorker();
$this->assertEquals(3, count(Resque_Worker::all()));
$goodWorker->pruneDeadWorkers();
// There should only be $goodWorker left now
$this->assertEquals(1, count(Resque_Worker::all()));
}
public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers()
{
// Register a bad worker on this machine
$worker = new Resque_Worker('jobs');
$workerId = explode(':', $worker);
$worker->setId($workerId[0].':1:jobs');
$worker->registerWorker();
// Register some other false workers
$worker = new Resque_Worker('jobs');
$worker->setId('my.other.host:1:jobs');
$worker->registerWorker();
$this->assertEquals(2, count(Resque_Worker::all()));
$worker->pruneDeadWorkers();
// my.other.host should be left
$workers = Resque_Worker::all();
$this->assertEquals(1, count($workers));
$this->assertEquals((string)$worker, (string)$workers[0]);
}
public function testWorkerFailsUncompletedJobsOnExit()
{
$worker = new Resque_Worker('jobs');
$worker->registerWorker();
$payload = array(
'class' => 'Test_Job'
);
$job = new Resque_Job('jobs', $payload);
$worker->workingOn($job);
$worker->unregisterWorker();
$this->assertEquals(1, Resque_Stat::get('failed'));
}
}

View File

@ -0,0 +1,149 @@
<?php
/**
* Resque test bootstrap file - sets up a test environment.
*
* @package Resque/Tests
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
define('CWD', dirname(__FILE__));
define('RESQUE_LIB', CWD . '/../../../lib/');
define('TEST_MISC', realpath(CWD . '/../../misc/'));
define('REDIS_CONF', TEST_MISC . '/redis.conf');
// Change to the directory this file lives in. This is important, due to
// how we'll be running redis.
require_once CWD . '/TestCase.php';
// Include Resque
require_once RESQUE_LIB . 'Resque.php';
require_once RESQUE_LIB . 'Resque/Worker.php';
// Attempt to start our own redis instance for tesitng.
exec('which redis-server', $output, $returnVar);
if($returnVar != 0) {
echo "Cannot find redis-server in path. Please make sure redis is installed.\n";
exit(1);
}
exec('cd ' . TEST_MISC . '; redis-server ' . REDIS_CONF, $output, $returnVar);
if($returnVar != 0) {
echo "Cannot start redis-server.\n";
exit(1);
}
// Get redis port from conf
$config = file_get_contents(REDIS_CONF);
if(!preg_match('#^\s*port\s+([0-9]+)#m', $config, $matches)) {
echo "Could not determine redis port from redis.conf";
exit(1);
}
Resque::setBackend('localhost:' . $matches[1]);
// Shutdown
function killRedis()
{
$config = file_get_contents(REDIS_CONF);
if(!preg_match('#^\s*pidfile\s+([^\s]+)#m', $config, $matches)) {
return;
}
$pidFile = TEST_MISC . '/' . $matches[1];
$pid = trim(file_get_contents($pidFile));
posix_kill($pid, 9);
if(is_file($pidFile)) {
unlink($pidFile);
}
// Remove the redis database
if(!preg_match('#^\s*dir\s+([^\s]+)#m', $config, $matches)) {
return;
}
$dir = $matches[1];
if(!preg_match('#^\s*dbfilename\s+([^\s]+)#m', $config, $matches)) {
return;
}
$filename = TEST_MISC . '/' . $dir . '/' . $matches[1];
if(is_file($filename)) {
unlink($filename);
}
}
register_shutdown_function('killRedis');
if(function_exists('pcntl_signal')) {
// Override INT and TERM signals, so they do a clean shutdown and also
// clean up redis-server as well.
function sigint()
{
exit;
}
pcntl_signal(SIGINT, 'sigint');
pcntl_signal(SIGTERM, 'sigint');
}
class Test_Job
{
public function perform()
{
}
}
class Failing_Job_Exception extends Exception
{
}
class Failing_Job
{
public function perform()
{
throw new Failing_Job_Exception('Message!');
}
}
class Test_Job_Without_Perform_Method
{
}
class Test_Job_With_SetUp
{
public static $called = false;
public $args = false;
public function setUp()
{
self::$called = true;
}
public function perform()
{
}
}
class Test_Job_With_TearDown
{
public static $called = false;
public $args = false;
public function perform()
{
}
public function tearDown()
{
self::$called = true;
}
}

8
test/misc/redis.conf Normal file
View File

@ -0,0 +1,8 @@
daemonize yes
pidfile ./redis.pid
port 6379
bind 127.0.0.1
timeout 300
dbfilename dump.rdb
dir ./
loglevel debug