diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown new file mode 100644 index 0000000..de4182d --- /dev/null +++ b/CHANGELOG.markdown @@ -0,0 +1,18 @@ +## 1.1 (????-??-??) ## +* Change arguments for jobs to be an array as they're easier to work with in +PHP. +* Implement ability to have setUp and tearDown methods for jobs, called before +and after every single run. +* Ability to specify a cluster/multiple redis servers and consistent hash +between them (Thanks dceballos) +* Fix `APP_INCLUDE` environment variable not loading correctly. +* Jobs are no longer defined as static methods, and classes are instantiated +first. This change is NOT backwards compatible and requires job classes are +updated. +* Job arguments are passed to the job class when it is instantiated, and +are accessible by $this->args. This change will break existing job classes +that rely on arguments that have not been updated. + +## 1.0 (2010-04-18) ## + +* Initial release \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6513591 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +(c) 2010 Chris Boulton + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.markdown b/README.markdown new file mode 100644 index 0000000..7a19252 --- /dev/null +++ b/README.markdown @@ -0,0 +1,226 @@ +php-resque: PHP Resque Worker (and Enqueue) +=========================================== + +Resque is a Redis-backed library for creating background jobs, placing +those jobs on multiple queues, and processing them later. + +Resque was pioneered and is developed by the fine folks at GitHub (yes, +I am a kiss-ass), and written in Ruby. + +What you're seeing here is an almost direct port of the Resque worker +and enqueue system to PHP, which I've thrown together because I'm sure +my PHP developers would have a fit if they had to write a line of Ruby. + +For more information on Resque, visit the official GitHub project: + + +And for background information, the launch post on the GitHub blog: + + +The PHP port does NOT include its own web interface for viewing queue +stats, as the data is stored in the exact same expected format as the +Ruby version of Resque. + +The PHP port allows for much the same as the Ruby version of Rescue: + +* Workers can be distributed between multiple machines +* Includes support for priorities (queues) +* Resilient to memory leaks (fork) +* Expects failure + +In addition, it also: + +* Has the ability to track the status of jobs +* Will mark a job as failed, if a forked child running a job does +not exit with a status code as 0 +* Has built in support for `setUp` and `tearDown` methods, called +pre and post jobs + +## Jobs ## + +### Queueing Jobs ### + +Jobs are queued as follows: + + require_once 'lib/Resque.php'; + + // Required if redis is located elsewhere + Resque::setBackend('localhost', 6379); + + $args = array( + 'name' => 'Chris' + ); + Resque::enqueue('default', 'My_Job', $args); + +### Defining Jobs ### + +Each job should be in it's own class, and include a `perform` method. + + class My_Job + { + public function perform() + { + // Work work work + echo $this->args['name']; + } + } + +When the job is run, the class will be instantiated and any arguments +will be set as an array on the instantiated object, and are accessible +via `$this->args`. + +Any exception thrown by a job will result in the job failing - be +careful here and make sure you handle the exceptions that shouldn't +result in a job failing. + +Jobs can also have `setUp` and `tearDown` methods. If a `setUp` method +is defined, it will be called before the `perform` method is run. +The `tearDown` method if defined, will be called after the job finishes. + + class My_Job + { + public function setUp() + { + // ... Set up environment for this job + } + + public function perform() + { + // .. Run job + } + + public function tearDown() + { + // ... Remove environment for this job + } + } + +### Tracking Job Statuses ### + +php-resque has the ability to perform basic status tracking of a queued +job. The status information will allow you to check if a job is in the +queue, currently being run, has finished, or failed. + +To track the status of a job, pass `true` as the fourth argument to +`Resque::enqueue`. A token used for tracking the job status will be +returned: + + $token = Resque::enqueue('default', 'My_Job', $args, true); + echo $token; + +To fetch the status of a job: + + $status = new Resque_Job_Status($token); + echo $status->get(); // Outputs the status + +Job statuses are defined as constants in the `Resque_Job_Status` class. +Valid statuses include: + +* `Resque_Job_Status::STATUS_WAITING` - Job is still queued +* `Resque_Job_Status::STATUS_RUNNING` - Job is currently running +* `Resque_Job_Status::STATUS_FAILED` - Job has failed +* `Resque_Job_Status::STATUS_COMPLETE` - Job is complete +* `false` - Failed to fetch the status - is the token valid? + +Statuses are available for up to 24 hours after a job has completed +or failed, and are then automatically expired. A status can also +forcefully be expired by calling the `stop()` method on a status +class. + +## Workers ## + +Workers work in the exact same way as the Ruby workers. For complete +documentation on workers, see the original documentation. + +A basic "up-and-running" resque.php file is included that sets up a +running worker environment is included in the root directory. + +The exception to the similarities with the Ruby version of resque is +how a worker is initially setup. To work under all environments, +not having a single environment such as with Ruby, the PHP port makes +*no* assumptions about your setup. + +To start a worker, it's very similar to the Ruby version: + + $ QUEUE=file_serve php resque.php + +It's your responsibility to tell the worker which file to include to get +your application underway. You do so by setting the `APP_INCLUDE` environment +variable: + + $ QUEUE=file_serve APP_INCLUDE=../application/init.php php resque.php + +Getting your application underway also includes telling the worker your job +classes, by means of either an autoloader or including them. + +### Logging ### + +The port supports the same environment variables for logging to STDOUT. +Setting `VERBOSE` will print basic debugging information and `VVERBOSE` +will print detailed information. + + $ VERBOSE QUEUE=file_serve php resque.php + $ VVERBOSE QUEUE=file_serve php resque.php + +### Priorities and Queue Lists ### + +Similarly, priority and queue list functionality works exactly +the same as the Ruby workers. Multiple queues should be separated with +a comma, and the order that they're supplied in is the order that they're +checked in. + +As per the original example: + + $ QUEUES=file_serve,warm_cache php resque.php + +The `file_serve` queue will always be checked for new jobs on each +iteration before the `warm_cache` queue is checked. + +### Running All Queues ### + +All queues are supported in the same manner and processed in alphabetical +order: + + $ QUEUES=* php resque.php + +### Running Multiple Workers ### + +Multiple workers ca be launched and automatically worked by supplying +the `COUNT` environment variable: + + $ COUNT=5 php resque.php + +### Forking ### + +Similarly to the Ruby versions, supported platforms will immediately +fork after picking up a job. The forked child will exit as soon as +the job finishes. + +The difference with php-resque is that if a forked child does not +exit nicely (PHP error or such), php-resque will automatically fail +the job. + +### Signals ### + +Signals also work on supported platforms exactly as in the Ruby +version of Resque: + +* `QUIT` - Wait for child to finish processing then exit +* `TERM` / `INT` - Immediately kill child then exit +* `USR1` - Immediately kill child but don't exit +* `USR2` - Pause worker, no new jobs will be processed +* `CONT` - Resume worker. + +### Process Titles/Statuses ### + +The Ruby version of Resque has a nifty feature whereby the process +title of the worker is updated to indicate what the worker is doing, +and any forked children also set their process title with the job +being run. This helps identify running processes on the server and +their resque status. + +**PHP does not have this functionality by default.** + +A PECL module () exists that +adds this funcitonality to PHP, so if you'd like process titles updated, +install the PECL module as well. php-resque will detect and use it. \ No newline at end of file diff --git a/TODO.markdown b/TODO.markdown new file mode 100644 index 0000000..d8ee07a --- /dev/null +++ b/TODO.markdown @@ -0,0 +1,10 @@ +* Write tests for: + * `Resque_Failure` + * `Resque_Failure_Redis` +* Plugin/hook type system similar to Ruby version (when done, implement the +setUp and tearDown methods as a plugin) +* Change to preforking worker model +* Clean up /bin and /demo +* Add a way to store arbitrary text in job statuses (for things like progress +indicators) +* Write plugin for Ruby resque that calls setUp and tearDown methods \ No newline at end of file diff --git a/bin/resque b/bin/resque new file mode 100644 index 0000000..1a24852 --- /dev/null +++ b/bin/resque @@ -0,0 +1 @@ +#!/bin/sh diff --git a/build.xml b/build.xml new file mode 100644 index 0000000..7289999 --- /dev/null +++ b/build.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/demo/bad_job.php b/demo/bad_job.php new file mode 100644 index 0000000..bc12620 --- /dev/null +++ b/demo/bad_job.php @@ -0,0 +1,9 @@ + \ No newline at end of file diff --git a/demo/check_status.php b/demo/check_status.php new file mode 100644 index 0000000..c5c194c --- /dev/null +++ b/demo/check_status.php @@ -0,0 +1,20 @@ +isTracking()) { + die("Resque is not tracking the status of this job.\n"); +} + +echo "Tracking status of ".$argv[1].". Press [break] to stop.\n\n"; +while(true) { + fwrite(STDOUT, "Status of ".$argv[1]." is: ".$status->get()."\n"); + sleep(1); +} +?> \ No newline at end of file diff --git a/demo/job.php b/demo/job.php new file mode 100644 index 0000000..5b72c5c --- /dev/null +++ b/demo/job.php @@ -0,0 +1,10 @@ + \ No newline at end of file diff --git a/demo/long_job.php b/demo/long_job.php new file mode 100644 index 0000000..8c9f0f9 --- /dev/null +++ b/demo/long_job.php @@ -0,0 +1,9 @@ + \ No newline at end of file diff --git a/demo/php_error_job.php b/demo/php_error_job.php new file mode 100644 index 0000000..93bf2bc --- /dev/null +++ b/demo/php_error_job.php @@ -0,0 +1,9 @@ + \ No newline at end of file diff --git a/demo/queue.php b/demo/queue.php new file mode 100644 index 0000000..6a94e44 --- /dev/null +++ b/demo/queue.php @@ -0,0 +1,19 @@ + time(), + 'array' => array( + 'test' => 'test', + ), +); + +$jobId = Resque::enqueue('default', $argv[1], $args, true); +echo "Queued job ".$jobId."\n\n"; +?> \ No newline at end of file diff --git a/demo/resque.php b/demo/resque.php new file mode 100644 index 0000000..5af0cf1 --- /dev/null +++ b/demo/resque.php @@ -0,0 +1,8 @@ + \ No newline at end of file diff --git a/lib/Redisent/LICENSE b/lib/Redisent/LICENSE new file mode 100644 index 0000000..385910f --- /dev/null +++ b/lib/Redisent/LICENSE @@ -0,0 +1,22 @@ +Copyright (c) 2009 Justin Poliey + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without +restriction, including without limitation the rights to use, +copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/lib/Redisent/README.markdown b/lib/Redisent/README.markdown new file mode 100644 index 0000000..3edb843 --- /dev/null +++ b/lib/Redisent/README.markdown @@ -0,0 +1,67 @@ +# Redisent + +Redisent is a simple, no-nonsense interface to the [Redis](http://code.google.com/p/redis/) key-value store for modest developers. +Due to the way it is implemented, it is flexible and tolerant of changes to the Redis protocol. + +## Getting to work + +If you're at all familiar with the Redis protocol and PHP objects, you've already mastered Redisent. +All Redisent does is map the Redis protocol to a PHP object, abstract away the nitty-gritty, and make the return values PHP compatible. + + require 'redisent.php'; + $redis = new Redisent('localhost'); + $redis->set('awesome', 'absolutely'); + echo sprintf('Is Redisent awesome? %s.\n', $redis->get('awesome')); + +You use the exact same command names, and the exact same argument order. **How wonderful.** How about a more complex example? + + require 'redisent.php'; + $redis = new Redisent('localhost'); + $redis->rpush('particles', 'proton'); + $redis->rpush('particles', 'electron'); + $redis->rpush('particles', 'neutron'); + $particles = $redis->lrange('particles', 0, -1); + $particle_count = $redis->llen('particles'); + echo "

The {$particle_count} particles that make up atoms are:

"; + echo "
    "; + foreach ($particles as $particle) { + echo "
  • {$particle}
  • "; + } + echo "
"; + +Be aware that Redis error responses will be wrapped in a RedisException class and thrown, so do be sure to use proper coding techniques. + +## Clustering your servers + +Redisent also includes a way for developers to fully utilize the scalability of Redis with multiple servers and [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing). +Using the RedisentCluster class, you can use Redisent the same way, except that keys will be hashed across multiple servers. +Here is how to set up a cluster: + + include 'redisent_cluster.php'; + + $cluster = new RedisentCluster(array( + array('host' => '127.0.0.1', 'port' => 6379), + array('host' => '127.0.0.1', 'port' => 6380) + )); + +You can then use Redisent the way you normally would, i.e., `$cluster->set('key', 'value')` or `$cluster->lrange('particles', 0, -1)`. +But what about when you need to use commands that are server specific and do not operate on keys? You can use routing, with the `RedisentCluster::to` method. +To use routing, you need to assign a server an alias in the constructor of the Redis cluster. Aliases are not required on all servers, just the ones you want to be able to access directly. + + include 'redisent_cluster.php'; + + $cluster = new RedisentCluster(array( + 'alpha' => array('host' => '127.0.0.1', 'port' => 6379), + array('host' => '127.0.0.1', 'port' => 6380) + )); + +Now there is an alias of the server running on 127.0.0.1:6379 called **alpha**, and can be interacted with like this: + + // get server info + $cluster->to('alpha')->info(); + +Now you have complete programatic control over your Redis servers. + +## About + +© 2009 [Justin Poliey](http://justinpoliey.com) \ No newline at end of file diff --git a/lib/Redisent/Redisent.php b/lib/Redisent/Redisent.php new file mode 100644 index 0000000..0b9ea6e --- /dev/null +++ b/lib/Redisent/Redisent.php @@ -0,0 +1,137 @@ + + * @copyright 2009 Justin Poliey + * @license http://www.opensource.org/licenses/mit-license.php The MIT License + * @package Redisent + */ + +define('CRLF', sprintf('%s%s', chr(13), chr(10))); + +/** + * Wraps native Redis errors in friendlier PHP exceptions + */ +class RedisException extends Exception { +} + +/** + * Redisent, a Redis interface for the modest among us + */ +class Redisent { + + /** + * Socket connection to the Redis server + * @var resource + * @access private + */ + private $__sock; + + /** + * Redis bulk commands, they are sent in a slightly different format to the server + * @var array + * @access private + */ + private $bulk_cmds = array( + 'SET', 'GETSET', 'SETNX', 'ECHO', + 'RPUSH', 'LPUSH', 'LSET', 'LREM', + 'SADD', 'SREM', 'SMOVE', 'SISMEMBER' + ); + + /** + * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}. + * @param string $host The hostname of the Redis server + * @param integer $port The port number of the Redis server + */ + function __construct($host, $port = 6379) { + $this->__sock = fsockopen($host, $port, $errno, $errstr); + if (!$this->__sock) { + throw new Exception("{$errno} - {$errstr}"); + } + } + + function __destruct() { + fclose($this->__sock); + } + + function __call($name, $args) { + + /* Build the Redis protocol command */ + $name = strtoupper($name); + if (in_array($name, $this->bulk_cmds)) { + $value = array_pop($args); + $command = sprintf("%s %s %d%s%s%s", $name, trim(implode(' ', $args)), strlen($value), CRLF, $value, CRLF); + } + else { + $command = sprintf("%s %s%s", $name, trim(implode(' ', $args)), CRLF); + } + + /* Open a Redis connection and execute the command */ + fwrite($this->__sock, $command); + + /* Parse the response based on the reply identifier */ + $reply = trim(fgets($this->__sock, 512)); + switch (substr($reply, 0, 1)) { + /* Error reply */ + case '-': + echo $command."\n"; + throw new RedisException(substr(trim($reply), 4)); + break; + /* Inline reply */ + case '+': + $response = substr(trim($reply), 1); + break; + /* Bulk reply */ + case '$': + if ($reply == '$-1') { + $response = null; + break; + } + $read = 0; + $size = substr($reply, 1); + do { + $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read); + $response = fread($this->__sock, $block_size); + $read += $block_size; + } while ($read < $size); + fread($this->__sock, 2); /* discard crlf */ + break; + /* Multi-bulk reply */ + case '*': + $count = substr($reply, 1); + if ($count == '-1') { + return null; + } + $response = array(); + for ($i = 0; $i < $count; $i++) { + $bulk_head = trim(fgets($this->__sock, 512)); + $size = substr($bulk_head, 1); + if ($size == '-1') { + $response[] = null; + } + else { + $read = 0; + $block = ""; + do { + $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read); + $block .= fread($this->__sock, $block_size); + $read += $block_size; + } while ($read < $size); + fread($this->__sock, 2); /* discard crlf */ + $response[] = $block; + } + } + break; + /* Integer reply */ + case ':': + $response = substr(trim($reply), 1); + break; + default: + throw new RedisException("invalid server response: {$reply}"); + break; + } + /* Party on */ + return $response; + } + +} \ No newline at end of file diff --git a/lib/Redisent/RedisentCluster.php b/lib/Redisent/RedisentCluster.php new file mode 100644 index 0000000..55645f8 --- /dev/null +++ b/lib/Redisent/RedisentCluster.php @@ -0,0 +1,138 @@ + + * @copyright 2009 Justin Poliey + * @license http://www.opensource.org/licenses/mit-license.php The MIT License + * @package Redisent + */ + +require 'redisent.php'; + +/** + * A generalized Redisent interface for a cluster of Redis servers + */ +class RedisentCluster { + + /** + * Collection of Redisent objects attached to Redis servers + * @var array + * @access private + */ + private $redisents; + + /** + * Aliases of Redisent objects attached to Redis servers, used to route commands to specific servers + * @see RedisentCluster::to + * @var array + * @access private + */ + private $aliases; + + /** + * Hash ring of Redis server nodes + * @var array + * @access private + */ + private $ring; + + /** + * Individual nodes of pointers to Redis servers on the hash ring + * @var array + * @access private + */ + private $nodes; + + /** + * Number of replicas of each node to make around the hash ring + * @var integer + * @access private + */ + private $replicas = 128; + + /** + * The commands that are not subject to hashing + * @var array + * @access private + */ + private $dont_hash = array( + 'RANDOMKEY', 'DBSIZE', + 'SELECT', 'MOVE', 'FLUSHDB', 'FLUSHALL', + 'SAVE', 'BGSAVE', 'LASTSAVE', 'SHUTDOWN', + 'INFO', 'MONITOR', 'SLAVEOF' + ); + + /** + * Creates a Redisent interface to a cluster of Redis servers + * @param array $servers The Redis servers in the cluster. Each server should be in the format array('host' => hostname, 'port' => port) + */ + function __construct($servers) { + $this->ring = array(); + $this->aliases = array(); + foreach ($servers as $alias => $server) { + $this->redisents[] = new Redisent($server['host'], $server['port']); + if (is_string($alias)) { + $this->aliases[$alias] = $this->redisents[count($this->redisents)-1]; + } + for ($replica = 1; $replica <= $this->replicas; $replica++) { + $this->ring[crc32($server['host'].':'.$server['port'].'-'.$replica)] = $this->redisents[count($this->redisents)-1]; + } + } + ksort($this->ring, SORT_NUMERIC); + $this->nodes = array_keys($this->ring); + } + + /** + * Routes a command to a specific Redis server aliased by {$alias}. + * @param string $alias The alias of the Redis server + * @return Redisent The Redisent object attached to the Redis server + */ + function to($alias) { + if (isset($this->aliases[$alias])) { + return $this->aliases[$alias]; + } + else { + throw new Exception("That Redisent alias does not exist"); + } + } + + /* Execute a Redis command on the cluster */ + function __call($name, $args) { + + /* Pick a server node to send the command to */ + $name = strtoupper($name); + if (!in_array($name, $this->dont_hash)) { + $node = $this->nextNode(crc32($args[0])); + $redisent = $this->ring[$node]; + } + else { + $redisent = $this->redisents[0]; + } + + /* Execute the command on the server */ + return call_user_func_array(array($redisent, $name), $args); + } + + /** + * Routes to the proper server node + * @param integer $needle The hash value of the Redis command + * @return Redisent The Redisent object associated with the hash + */ + private function nextNode($needle) { + $haystack = $this->nodes; + while (count($haystack) > 2) { + $try = floor(count($haystack) / 2); + if ($haystack[$try] == $needle) { + return $needle; + } + if ($needle < $haystack[$try]) { + $haystack = array_slice($haystack, 0, $try + 1); + } + if ($needle > $haystack[$try]) { + $haystack = array_slice($haystack, $try + 1); + } + } + return $haystack[count($haystack)-1]; + } + +} \ No newline at end of file diff --git a/lib/Resque.php b/lib/Resque.php new file mode 100644 index 0000000..aa48f8d --- /dev/null +++ b/lib/Resque.php @@ -0,0 +1,134 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque +{ + const VERSION = '1.0'; + + /** + * @var Resque_Redis Instance of Resque_Redis that talks to redis. + */ + public static $redis = null; + + /** + * Given a host/port combination separated by a colon, set it as + * the redis server that Resque will talk to. + * + * @param mixed $server Host/port combination separated by a colon, or + * a nested array of servers with host/port pairs. + */ + public static function setBackend($server) + { + if(is_array($server)) { + require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; + self::$redis = new Resque_RedisCluster($server); + } + else { + list($host, $port) = explode(':', $server); + require_once dirname(__FILE__) . '/Resque/Redis.php'; + self::$redis = new Resque_Redis($host, $port); + } + } + + /** + * Return an instance of the Resque_Redis class instantiated for Resque. + * + * @return Resque_Redis Instance of Resque_Redis. + */ + public static function redis() + { + if(is_null(self::$redis)) { + self::setBackend('localhost:6379'); + } + + return self::$redis; + } + + /** + * Push a job to the end of a specific queue. If the queue does not + * exist, then create it as well. + * + * @param string $queue The name of the queue to add the job to. + * @param object $item Job description as an object to be JSON encoded. + */ + public static function push($queue, $item) + { + self::redis()->sadd('queues', $queue); + self::redis()->rpush('queue:' . $queue, json_encode($item)); + } + + /** + * Pop an item off the end of the specified queue, decode it and + * return it. + * + * @param string $queue The name of the queue to fetch an item from. + * @return object Decoded item from the queue. + */ + public static function pop($queue) + { + $item = self::redis()->lpop('queue:' . $queue); + if(!$item) { + return; + } + + return json_decode($item, true); + } + + /** + * Return the size (number of pending jobs) of the specified queue. + * + * @return int The size of the queue. + */ + public static function size($queue) + { + return self::redis()->llen('queue:' . $queue); + } + + /** + * Create a new job and save it to the specified queue. + * + * @param string $queue The name of the queue to place the job in. + * @param string $class The name of the class that contains the code to execute the job. + * @param array $args Any optional arguments that should be passed when the job is executed. + * @param boolean $monitor Set to true to be able to monitor the status of a job. + */ + public static function enqueue($queue, $class, $args = null, $trackStatus = false) + { + require_once dirname(__FILE__) . '/Resque/Job.php'; + return Resque_Job::create($queue, $class, $args, $trackStatus); + } + + /** + * Reserve and return the next available job in the specified queue. + * + * @param string $queue Queue to fetch next available job from. + * @return Resque_Job Instance of Resque_Job to be processed, false if none or error. + */ + public static function reserve($queue) + { + require_once dirname(__FILE__) . '/Resque/Job.php'; + return Resque_Job::reserve($queue); + } + + /** + * Get an array of all known queues. + * + * @return array Array of queues. + */ + public static function queues() + { + $queues = self::redis()->smembers('queues'); + if(!is_array($queues)) { + $queues = array(); + } + return $queues; + } +} diff --git a/lib/Resque/Exception.php b/lib/Resque/Exception.php new file mode 100644 index 0000000..b288bf4 --- /dev/null +++ b/lib/Resque/Exception.php @@ -0,0 +1,13 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Exception extends Exception +{ +} +?> \ No newline at end of file diff --git a/lib/Resque/Failure.php b/lib/Resque/Failure.php new file mode 100644 index 0000000..56e0068 --- /dev/null +++ b/lib/Resque/Failure.php @@ -0,0 +1,59 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Failure +{ + /** + * @var string Class name representing the backend to pass failed jobs off to. + */ + private static $backend; + + /** + * Create a new failed job on the backend. + * + * @param object $payload The contents of the job that has just failed. + * @param object $exception The exception generated when the job failed to run. + * @param object $worker Instance of Resque_Worker that was running this job when it failed. + * @param string $queue The name of the queue that this job was fetched from. + */ + public static function create($payload, Exception $exception, Resque_Worker $worker, $queue) + { + $backend = self::getBackend(); + new $backend($payload, $exception, $worker, $queue); + } + + /** + * Return an instance of the backend for saving job failures. + * + * @return object Instance of backend object. + */ + public function getBackend() + { + if(self::$backend === null) { + require dirname(__FILE__) . '/Failure/Redis.php'; + self::$backend = 'Resque_Failure_Redis'; + } + + return self::$backend; + } + + /** + * Set the backend to use for raised job failures. The supplied backend + * should be the name of a class to be instantiated when a job fails. + * It is your responsibility to have the backend class loaded (or autoloaded) + * + * @param string $backend The class name of the backend to pipe failures to. + */ + public function setBackend($backend) + { + self::$backend = $backend; + } +} \ No newline at end of file diff --git a/lib/Resque/Failure/Interface.php b/lib/Resque/Failure/Interface.php new file mode 100644 index 0000000..863cd0b --- /dev/null +++ b/lib/Resque/Failure/Interface.php @@ -0,0 +1,22 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +interface Resque_Failure_Interface +{ + /** + * Initialize a failed job class and save it (where appropriate). + * + * @param object $payload Object containing details of the failed job. + * @param object $exception Instance of the exception that was thrown by the failed job. + * @param object $worker Instance of Resque_Worker that received the job. + * @param string $queue The name of the queue the job was fetched from. + */ + public function __construct($payload, $exception, $worker, $queue); +} +?> \ No newline at end of file diff --git a/lib/Resque/Failure/Redis.php b/lib/Resque/Failure/Redis.php new file mode 100644 index 0000000..c81bfc2 --- /dev/null +++ b/lib/Resque/Failure/Redis.php @@ -0,0 +1,35 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ + +class Resque_Failure_Redis implements Resque_Failure_Interface +{ + /** + * Initialize a failed job class and save it (where appropriate). + * + * @param object $payload Object containing details of the failed job. + * @param object $exception Instance of the exception that was thrown by the failed job. + * @param object $worker Instance of Resque_Worker that received the job. + * @param string $queue The name of the queue the job was fetched from. + */ + public function __construct($payload, $exception, $worker, $queue) + { + $data = new stdClass; + $data->failed_at = strftime('%a %b %d %H:%M:%S %Z %Y'); + $data->payload = $payload; + $data->exception = get_class($exception); + $data->error = $exception->getMessage(); + $data->backtrace = explode("\n", $exception->getTraceAsString()); + $data->worker = (string)$worker; + $data->queue = $queue; + $data = json_encode($data); + Resque::redis()->rpush('failed', $data); + } +} +?> \ No newline at end of file diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php new file mode 100644 index 0000000..2831bf4 --- /dev/null +++ b/lib/Resque/Job.php @@ -0,0 +1,198 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job +{ + /** + * @var string The name of the queue that this job belongs to. + */ + public $queue; + + /** + * @var Resque_Worker Instance of the Resque worker running this job. + */ + public $worker; + + /** + * @var object Object containing details of the job. + */ + public $payload; + + /** + * Instantiate a new instance of a job. + * + * @param string $queue The queue that the job belongs to. + * @param object $payload Object containing details of the job. + */ + public function __construct($queue, $payload) + { + $this->queue = $queue; + $this->payload = $payload; + } + + /** + * Create a new job and save it to the specified queue. + * + * @param string $queue The name of the queue to place the job in. + * @param string $class The name of the class that contains the code to execute the job. + * @param array $args Any optional arguments that should be passed when the job is executed. + * @param boolean $monitor Set to true to be able to monitor the status of a job. + */ + public static function create($queue, $class, $args = null, $monitor = false) + { + if($args !== null && !is_array($args)) { + throw new InvalidArgumentException( + 'Supplied $args must be an array.' + ); + } + $id = md5(uniqid('', true)); + Resque::push($queue, array( + 'class' => $class, + 'args' => $args, + 'id' => $id, + )); + + if($monitor) { + Resque_Job_Status::create($id); + } + + return $id; + } + + /** + * Find the next available job from the specified queue and return an + * instance of Resque_Job for it. + * + * @param string $queue The name of the queue to check for a job in. + * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserve($queue) + { + $payload = Resque::pop($queue); + if(!$payload) { + return false; + } + + return new Resque_Job($queue, $payload); + } + + /** + * Update the status of the current job. + * + * @param int $status Status constant from Resque_Job_Status indicating the current status of a job. + */ + public function updateStatus($status) + { + if(empty($this->payload->id)) { + return; + } + + $statusInstance = new Resque_Job_Status($this->payload['id']); + $statusInstance->update($status); + } + + /** + * Return the status of the current job. + * + * @return int The status of the job as one of the Resque_Job_Status constants. + */ + public function getStatus() + { + $status = new Resque_Job_Status($this->payload['id']); + return $status->get(); + } + + /** + * Actually execute a job by calling the perform method on the class + * associated with the job with the supplied arguments. + * + * @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. + */ + public function perform() + { + if(!class_exists($this->payload['class'])) { + throw new Resque_Exception( + 'Could not find job class ' . $this->payload['class'] . '.' + ); + } + + if(!method_exists($this->payload['class'], 'perform')) { + throw new Resque_Exception( + 'Job class ' . $this->payload['class'] . ' does not contain a perform method.' + ); + } + + $instance = new $this->payload['class']; + $isntance->args = $this->payload['args']; + + if(method_exists($instance, 'setUp')) { + $instance->setUp(); + } + + $instance->perform(); + + if(method_exists($instance, 'tearDown')) { + $instance->tearDown(); + } + } + + /** + * Mark the current job as having failed. + */ + public function fail($exception) + { + $this->updateStatus(Resque_Job_Status::STATUS_FAILED); + require_once dirname(__FILE__) . '/Failure.php'; + Resque_Failure::create( + $this->payload, + $exception, + $this->worker, + $this->queue + ); + Resque_Stat::incr('failed'); + Resque_Stat::incr('failed:' . $this->worker); + } + + /** + * Re-queue the current job. + */ + public function recreate() + { + $status = new Resque_Job_Status($this->payload['id']); + $monitor = false; + if($status->isTracking()) { + $monitor = true; + } + + return self::create($this->queue, $this->payload['class'], $this->payload['args'], $monitor); + } + + /** + * Generate a string representation used to describe the current job. + * + * @return string The string representation of the job. + */ + public function __toString() + { + $name = array( + 'Job{' . $this->queue .'}' + ); + if(!empty($this->payload['id'])) { + $name[] = 'ID: ' . $this->payload['id']; + } + $name[] = $this->payload['class']; + if(!empty($this->payload['args'])) { + $name[] = json_encode($this->payload['args']); + } + return '(' . implode(' | ', $name) . ')'; + } +} +?> \ No newline at end of file diff --git a/lib/Resque/Job/DirtyExitException.php b/lib/Resque/Job/DirtyExitException.php new file mode 100644 index 0000000..b69413a --- /dev/null +++ b/lib/Resque/Job/DirtyExitException.php @@ -0,0 +1,13 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_DirtyExitException extends RuntimeException +{ + +} \ No newline at end of file diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php new file mode 100644 index 0000000..e1554b0 --- /dev/null +++ b/lib/Resque/Job/Status.php @@ -0,0 +1,144 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_Status +{ + const STATUS_WAITING = 1; + const STATUS_RUNNING = 2; + const STATUS_FAILED = 3; + const STATUS_COMPLETE = 4; + + /** + * @var string The ID of the job this status class refers back to. + */ + private $id; + + /** + * @var mixed Cache variable if the status of this job is being monitored or not. + * True/false when checked at least once or null if not checked yet. + */ + private $isTracking = null; + + /** + * @var array Array of statuses that are considered final/complete. + */ + private static $completeStatuses = array( + self::STATUS_FAILED, + self::STATUS_COMPLETE + ); + + /** + * Setup a new instance of the job monitor class for the supplied job ID. + * + * @param string $id The ID of the job to manage the status for. + */ + public function __construct($id) + { + $this->id = $id; + } + + /** + * Create a new status monitor item for the supplied job ID. Will create + * all necessary keys in Redis to monitor the status of a job. + * + * @param string $id The ID of the job to monitor the status of. + */ + public static function create($id) + { + $statusPacket = array( + 'status' => self::STATUS_WAITING, + 'updated' => time(), + 'started' => time(), + ); + Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket)); + } + + /** + * Check if we're actually checking the status of the loaded job status + * instance. + * + * @return boolean True if the status is being monitored, false if not. + */ + public function isTracking() + { + if($this->isTracking === false) { + return false; + } + + if(!Resque::redis()->exists((string)$this)) { + $this->isTracking = false; + return false; + } + + $this->isTracking = true; + return true; + } + + /** + * Update the status indicator for the current job with a new status. + * + * @param int The status of the job (see constants in Resque_Job_Status) + */ + public function update($status) + { + if(!$this->isTracking()) { + return; + } + + $statusPacket = array( + 'status' => $status, + 'updated' => time(), + ); + Resque::redis()->set((string)$this, json_encode($statusPacket)); + + // Expire the status for completed jobs after 24 hours + if(in_array($status, self::$completeStatuses)) { + Resque::redis()->expire((string)$this, 86400); + } + } + + /** + * Fetch the status for the job being monitored. + * + * @return mixed False if the status is not being monitored, otherwise the status as + * as an integer, based on the Resque_Job_Status constants. + */ + public function get() + { + if(!$this->isTracking()) { + return false; + } + + $statusPacket = json_decode(Resque::redis()->get((string)$this), true); + if(!$statusPacket) { + return false; + } + + return $statusPacket['status']; + } + + /** + * Stop tracking the status of a job. + */ + public function stop() + { + Resque::redis()->del((string)$this); + } + + /** + * Generate a string representation of this object. + * + * @return string String representation of the current job status class. + */ + public function __toString() + { + return 'job:' . $this->id . ':status'; + } +} +?> \ No newline at end of file diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php new file mode 100644 index 0000000..874bf69 --- /dev/null +++ b/lib/Resque/Redis.php @@ -0,0 +1,101 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Redis extends Redisent +{ + /** + * @var array List of all commands in Redis that supply a key as their + * first argument. Used to prefix keys with the Resque namespace. + */ + private $keyCommands = array( + 'exists', + 'del', + 'type', + 'keys', + 'expire', + 'ttl', + 'move', + 'set', + 'get', + 'getset', + 'setnx', + 'incr', + 'incrby', + 'decrby', + 'decrby', + 'rpush', + 'lpush', + 'llen', + 'lrange', + 'ltrim', + 'lindex', + 'lset', + 'lrem', + 'lpop', + 'rpop', + 'sadd', + 'srem', + 'spop', + 'scard', + 'sismember', + 'smembers', + 'srandmember', + 'zadd', + 'zrem', + 'zrange', + 'zrevrange', + 'zrangebyscore', + 'zcard', + 'zscore', + 'zremrangebyscore', + 'sort' + ); + // sinterstore + // sunion + // sunionstore + // sdiff + // sdiffstore + // sinter + // smove + // rename + // rpoplpush + // mget + // msetnx + // mset + // renamenx + + /** + * Magic method to handle all function requests and prefix key based + * operations with the 'resque:' key prefix. + * + * @param string $name The name of the method called. + * @param array $args Array of supplied arguments to the method. + * @return mixed Return value from Resident::call() based on the command. + */ + public function __call($name, $args) { + $args = func_get_args(); + if(in_array($name, $this->keyCommands)) { + $args[1][0] = 'resque:' . $args[1][0]; + } + try { + return parent::__call($name, $args[1]); + } + catch(RedisException $e) { + return false; + } + } +} +?> \ No newline at end of file diff --git a/lib/Resque/RedisCluster.php b/lib/Resque/RedisCluster.php new file mode 100644 index 0000000..6ac15e3 --- /dev/null +++ b/lib/Resque/RedisCluster.php @@ -0,0 +1,101 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_RedisCluster extends RedisentCluster +{ + /** + * @var array List of all commands in Redis that supply a key as their + * first argument. Used to prefix keys with the Resque namespace. + */ + private $keyCommands = array( + 'exists', + 'del', + 'type', + 'keys', + 'expire', + 'ttl', + 'move', + 'set', + 'get', + 'getset', + 'setnx', + 'incr', + 'incrby', + 'decrby', + 'decrby', + 'rpush', + 'lpush', + 'llen', + 'lrange', + 'ltrim', + 'lindex', + 'lset', + 'lrem', + 'lpop', + 'rpop', + 'sadd', + 'srem', + 'spop', + 'scard', + 'sismember', + 'smembers', + 'srandmember', + 'zadd', + 'zrem', + 'zrange', + 'zrevrange', + 'zrangebyscore', + 'zcard', + 'zscore', + 'zremrangebyscore', + 'sort' + ); + // sinterstore + // sunion + // sunionstore + // sdiff + // sdiffstore + // sinter + // smove + // rename + // rpoplpush + // mget + // msetnx + // mset + // renamenx + + /** + * Magic method to handle all function requests and prefix key based + * operations with the 'resque:' key prefix. + * + * @param string $name The name of the method called. + * @param array $args Array of supplied arguments to the method. + * @return mixed Return value from Resident::call() based on the command. + */ + public function __call($name, $args) { + $args = func_get_args(); + if(in_array($name, $this->keyCommands)) { + $args[1][0] = 'resque:' . $args[1][0]; + } + try { + return parent::__call($name, $args[1]); + } + catch(RedisException $e) { + return false; + } + } +} +?> diff --git a/lib/Resque/Stat.php b/lib/Resque/Stat.php new file mode 100644 index 0000000..7956a4c --- /dev/null +++ b/lib/Resque/Stat.php @@ -0,0 +1,57 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Stat +{ + /** + * Get the value of the supplied statistic counter for the specified statistic. + * + * @param string $stat The name of the statistic to get the stats for. + * @return mixed Value of the statistic. + */ + public function get($stat) + { + return (int)Resque::redis()->get('stat:' . $stat); + } + + /** + * Increment the value of the specified statistic by a certain amount (default is 1) + * + * @param string $stat The name of the statistic to increment. + * @param int $by The amount to increment the statistic by. + * @return boolean True if successful, false if not. + */ + public function incr($stat, $by = 1) + { + return (bool)Resque::redis()->incrby('stat:' . $stat, $by); + } + + /** + * Decrement the value of the specified statistic by a certain amount (default is 1) + * + * @param string $stat The name of the statistic to decrement. + * @param int $by The amount to decrement the statistic by. + * @return boolean True if successful, false if not. + */ + public function decr($stat, $by = 1) + { + return (bool)Resque::redis()->decrby('stat:' . $stat, $by); + } + + /** + * Delete a statistic with the given name. + * + * @param string $stat The name of the statistic to delete. + * @return boolean True if successful, false if not. + */ + public function clear($stat) + { + return (bool)Resque::redis()->del('stat:' . $stat); + } +} \ No newline at end of file diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php new file mode 100644 index 0000000..2f4a1e0 --- /dev/null +++ b/lib/Resque/Worker.php @@ -0,0 +1,561 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Worker +{ + const LOG_NONE = 0; + const LOG_NORMAL = 1; + const LOG_VERBOSE = 2; + + /** + * @var int Current log level of this worker. + */ + public $logLevel = 0; + + /** + * @var array Array of all associated queues for this worker. + */ + private $queues = array(); + + /** + * @var string The hostname of this worker. + */ + private $hostname; + + /** + * @var boolean True if on the next iteration, the worker should shutdown. + */ + private $shutdown = false; + + /** + * @var boolean True if this worker is paused. + */ + private $paused = false; + + /** + * @var string String identifying this worker. + */ + private $id; + + /** + * @var Resque_Job Current job, if any, being processed by this worker. + */ + private $currentJob = null; + + /** + * Return all workers known to Resque as instantiated instances. + */ + public static function all() + { + $workers = Resque::redis()->smembers('workers'); + if(!is_array($workers)) { + $workers = array(); + } + + $instances = array(); + foreach($workers as $workerId) { + $instances[] = self::find($workerId); + } + return $instances; + } + + /** + * Given a worker ID, check if it is registered/valid. + * + * @param string $workerId ID of the worker. + * @return boolean True if the worker exists, false if not. + */ + public static function exists($workerId) + { + return (bool)Resque::redis()->sismember('workers', $workerId); + } + + /** + * Given a worker ID, find it and return an instantiated worker class for it. + * + * @param string $workerId The ID of the worker. + * @return Resque_Worker Instance of the worker. False if the worker does not exist. + */ + public static function find($workerId) + { + if(!self::exists($workerId)) { + return false; + } + + list($hostname, $pid, $queues) = explode(':', $workerId, 3); + $queues = explode(',', $queues); + $worker = new self($queues); + $worker->setId($workerId); + return $worker; + } + + /** + * Set the ID of this worker to a given ID string. + * + * @param string $workerId ID for the worker. + */ + public function setId($workerId) + { + $this->id = $workerId; + } + + /** + * Instantiate a new worker, given a list of queues that it should be working + * on. The list of queues should be supplied in the priority that they should + * be checked for jobs (first come, first served) + * + * Passing a single '*' allows the worker to work on all queues in alphabetical + * order. You can easily add new queues dynamically and have them worked on using + * this method. + * + * @param string|array $queues String with a single queue name, array with multiple. + */ + public function __construct($queues) + { + if(!is_array($queues)) { + $queues = array($queues); + } + + $this->queues = $queues; + if(function_exists('gethostname')) { + $hostname = gethostname(); + } + else { + $hostname = php_uname('n'); + } + $this->hostname = $hostname; + $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); + } + + /** + * The primary loop for a worker which when called on an instance starts + * the worker's life cycle. + * + * Queues are checked every $interval (seconds) for new jobs. + * + * @param int $interval How often to check for new jobs across the queues. + */ + public function work($interval = 5) + { + $this->updateProcLine('Starting'); + $this->startup(); + + while(true) { + if($this->shutdown) { + break; + } + + // Attempt to find and reserve a job + $job = false; + if(!$this->paused) { + $job = $this->reserve(); + } + + if(!$job) { + // For an interval of 0, break now - helps with unit testing etc + if($interval == 0) { + break; + } + // If no job was found, we sleep for $interval before continuing and checking again + $this->log('Sleeping for ' . $interval, true); + if($this->paused) { + $this->updateProcLine('Paused'); + } + else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + } + sleep($interval); + continue; + } + + $this->log('got ' . $job); + $this->workingOn($job); + + $this->child = $this->fork(); + + // Forked and we're the child. Run the job. + if($this->child === 0 || $this->child === false) { + $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->log($status, self::LOG_VERBOSE); + $this->perform($job); + if($this->child === 0) { + exit(0); + } + } + + if($this->child > 0) { + // Parent process, sit and wait + $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->log($status, self::LOG_VERBOSE); + + // Wait until the child process finishes before continuing + pcntl_wait($status); + $exitStatus = pcntl_wexitstatus($status); + if($exitStatus !== 0) { + $job->fail(new Resque_Job_DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + } + } + + $this->child = null; + $this->doneWorking(); + } + + $this->unregisterWorker(); + } + + /** + * Process a single job. + * + * @param object|null $job The job to be processed. + */ + public function perform(Resque_Job $job) + { + try { + $job->perform(); + } + catch(Exception $e) { + $this->log($job . ' failed: ' . $e->getMessage()); + $job->fail($e); + return; + } + + $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); + $this->log('done ' . $job); + } + + /** + * Attempt to find a job from the top of one of the queues for this worker. + * + * @return object|boolean Instance of Resque_Job if a job is found, false if not. + */ + public function reserve() + { + $queues = $this->queues(); + if(!is_array($queues)) { + return; + } + foreach($queues as $queue) { + $this->log('Checking ' . $queue, self::LOG_VERBOSE); + $job = Resque_Job::reserve($queue); + if($job) { + $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + return $job; + } + } + + return false; + } + + /** + * Return an array containing all of the queues that this worker should use + * when searching for jobs. + * + * If * is found in the list of queues, every queue will be searched in + * alphabetic order. + * + * @return array Array of associated queues. + */ + public function queues() + { + if(!in_array('*', $this->queues)) { + return $this->queues; + } + + $queues = Resque::queues(); + sort($queues); + return $queues; + } + + /** + * Attempt to fork a child process from the parent to run a job in. + * + * Return values are those of pcntl_fork(). + * + * @return int -1 if the fork failed, 0 for the forked child, the PID of the child for the parent. + */ + private function fork() + { + if(!function_exists('pcntl_fork')) { + return false; + } + + $pid = pcntl_fork(); + if($pid === -1) { + throw new RuntimeException('Unable to fork child worker.'); + } + + return $pid; + } + + /** + * Perform necessary actions to start a worker. + */ + private function startup() + { + $this->registerSigHandlers(); + $this->pruneDeadWorkers(); + $this->registerWorker(); + } + + /** + * On supported systems (with the PECL proctitle module installed), update + * the name of the currently running process to indicate the current state + * of a worker. + * + * @param string $status The updated process title. + */ + private function updateProcLine($status) + { + if(function_exists('setproctitle')) { + setproctitle('resque-' . Resque::VERSION . ': ' . $status); + } + } + + /** + * Register signal handlers that a worker should respond to. + * + * TERM: Shutdown immediately and stop processing jobs. + * INT: Shutdown immediately and stop processing jobs. + * QUIT: Shutdown after the current job finishes processing. + * USR1: Kill the forked child immediately and continue processing jobs. + */ + private function registerSigHandlers() + { + if(!function_exists('pcntl_signal')) { + return; + } + + declare(ticks = 1); + pcntl_signal(SIGTERM, array($this, 'shutDownNow')); + pcntl_signal(SIGINT, array($this, 'shutDownNow')); + pcntl_signal(SIGQUIT, array($this, 'shutdown')); + pcntl_signal(SIGUSR1, array($this, 'killChild')); + pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); + pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); + $this->log('Registered signals', self::LOG_VERBOSE); + } + + /** + * Signal handler callback for USR2, pauses processing of new jobs. + */ + public function pauseProcessing() + { + $this->log('USR2 received; pausing job processing'); + $this->paused = true; + } + + /** + * Signal handler callback for CONT, resumes worker allowing it to pick + * up new jobs. + */ + public function unPauseProcessing() + { + $this->log('CONT received; resuming job processing'); + $this->paused = false; + } + + /** + * Schedule a worker for shutdown. Will finish processing the current job + * and when the timeout interval is reached, the worker will shut down. + */ + public function shutdown() + { + $this->shutdown = true; + $this->log('Exiting...'); + } + + /** + * Force an immediate shutdown of the worker, killing any child jobs + * currently running. + */ + public function shutdownNow() + { + $this->shutdown(); + $this->killChild(); + } + + /** + * Kill a forked child job immediately. The job it is processing will not + * be completed. + */ + public function killChild() + { + if(!$this->child) { + $this->log('No child to kill.', self::LOG_VERBOSE); + return; + } + + $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); + if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { + $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); + posix_kill($this->child, SIGKILL); + $this->child = null; + } + else { + $this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE); + $this->shutdown(); + } + } + + /** + * Look for any workers which should be running on this server and if + * they're not, remove them from Redis. + * + * This is a form of garbage collection to handle cases where the + * server may have been killed and the Resque workers did not die gracefully + * and therefore leave state information in Redis. + */ + public function pruneDeadWorkers() + { + $workerPids = $this->workerPids(); + $workers = self::all(); + foreach($workers as $worker) { + list($host, $pid, $queues) = explode(':', (string)$worker, 3); + if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { + continue; + } + $this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE); + $worker->unregisterWorker(); + } + } + + /** + * Return an array of process IDs for all of the Resque workers currently + * running on this machine. + * + * @return array Array of Resque worker process IDs. + */ + public function workerPids() + { + $pids = array(); + exec('ps -A -o pid,command | grep [r]esque', $cmdOutput); + foreach($cmdOutput as $line) { + list($pids[],) = explode(' ', $line, 2); + } + return $pids; + } + + /** + * Register this worker in Redis. + */ + public function registerWorker() + { + Resque::redis()->sadd('workers', $this); + Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y')); + } + + /** + * Unregister this worker in Redis. (shutdown etc) + */ + public function unregisterWorker() + { + if(is_object($this->currentJob)) { + $this->currentJob->fail(new Resque_Job_DirtyExitException); + } + + $id = (string)$this; + Resque::redis()->srem('workers', $id); + Resque::redis()->del('worker:' . $id); + Resque::redis()->del('worker:' . $id . ':started'); + Resque_Stat::clear('processed:' . $id); + Resque_Stat::clear('failed:' . $id); + } + + /** + * Tell Redis which job we're currently working on. + * + * @param object $job Resque_Job instance containing the job we're working on. + */ + public function workingOn(Resque_Job $job) + { + $job->worker = $this; + $this->currentJob = $job; + $job->updateStatus(Resque_Job_Status::STATUS_RUNNING); + $data = json_encode(array( + 'queue' => $job->queue, + 'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'), + 'payload' => $job->payload + )); + Resque::redis()->set('worker:' . $job->worker, $data); + } + + /** + * Notify Redis that we've finished working on a job, clearing the working + * state and incrementing the job stats. + */ + public function doneWorking() + { + $this->currentJob = null; + Resque_Stat::incr('processed'); + Resque_Stat::incr('processed:' . (string)$this); + Resque::redis()->del('worker:' . (string)$this); + } + + /** + * Generate a string representation of this worker. + * + * @return string String identifier for this worker instance. + */ + public function __toString() + { + return $this->id; + } + + /** + * Output a given log message to STDOUT. + * + * @param string $message Message to output. + */ + public function log($message) + { + if($this->logLevel == self::LOG_NORMAL) { + fwrite(STDOUT, "*** " . $message . "\n"); + } + else if($this->logLevel == self::LOG_VERBOSE) { + fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n"); + } + } + + /** + * Return an object describing the job this worker is currently working on. + * + * @return object Object with details of current job. + */ + public function job() + { + $job = Resque::redis()->get('worker:' . $this); + if(!$job) { + return array(); + } + else { + return json_decode($job, true); + } + } + + /** + * Get a statistic belonging to this worker. + * + * @param string $stat Statistic to fetch. + * @return int Statistic value. + */ + public function getStat($stat) + { + return Resque_Stat::get($stat . ':' . $this); + } +} +?> \ No newline at end of file diff --git a/phpunit.xml b/phpunit.xml new file mode 100644 index 0000000..efbc7f2 --- /dev/null +++ b/phpunit.xml @@ -0,0 +1,23 @@ + + + + + ./test/Resque/ + + + + + + ./lib/Resque/ + + + \ No newline at end of file diff --git a/resque.php b/resque.php new file mode 100644 index 0000000..4b7ab44 --- /dev/null +++ b/resque.php @@ -0,0 +1,68 @@ + 1) { + $count = $_ENV['COUNT']; +} + +if($count > 1) { + for($i = 0; $i < $count; ++$i) { + $pid = pcntl_fork(); + if($pid == -1) { + die("Could not fork worker ".$i."\n"); + } + // Child, start the worker + else if(!$pid) { + $queues = explode(',', $_ENV['QUEUE']); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->work($interval); + break; + } + } +} +// Start a single worker +else { + $queues = explode(',', $_ENV['QUEUE']); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->work($interval); +} +?> \ No newline at end of file diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php new file mode 100644 index 0000000..5f0fd0f --- /dev/null +++ b/test/Resque/Tests/JobStatusTest.php @@ -0,0 +1,102 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase +{ + public function setUp() + { + parent::setUp(); + + // Register a worker to test with + $this->worker = new Resque_Worker('jobs'); + } + + public function testJobStatusCanBeTracked() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $status = new Resque_Job_Status($token); + $this->assertTrue($status->isTracking()); + } + + public function testJobStatusIsReturnedViaJobInstance() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $job = Resque_Job::reserve('jobs'); + $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $job->getStatus()); + } + + public function testQueuedJobReturnsQueuedStatus() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $status = new Resque_Job_Status($token); + $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get()); + } + public function testRunningJobReturnsRunningStatus() + { + $token = Resque::enqueue('jobs', 'Failing_Job', null, true); + $job = $this->worker->reserve(); + $this->worker->workingOn($job); + $status = new Resque_Job_Status($token); + $this->assertEquals(Resque_Job_Status::STATUS_RUNNING, $status->get()); + } + + public function testFailedJobReturnsFailedStatus() + { + $token = Resque::enqueue('jobs', 'Failing_Job', null, true); + $this->worker->work(0); + $status = new Resque_Job_Status($token); + $this->assertEquals(Resque_Job_Status::STATUS_FAILED, $status->get()); + } + + public function testCompletedJobReturnsCompletedStatus() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $this->worker->work(0); + $status = new Resque_Job_Status($token); + $this->assertEquals(Resque_Job_Status::STATUS_COMPLETE, $status->get()); + } + + public function testStatusIsNotTrackedWhenToldNotTo() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, false); + $status = new Resque_Job_Status($token); + $this->assertFalse($status->isTracking()); + } + + public function testStatusTrackingCanBeStopped() + { + Resque_Job_Status::create('test'); + $status = new Resque_Job_Status('test'); + $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get()); + $status->stop(); + $this->assertFalse($status->get()); + } + + public function testRecreatedJobWithTrackingStillTracksStatus() + { + $originalToken = Resque::enqueue('jobs', 'Test_Job', null, true); + $job = $this->worker->reserve(); + + // Mark this job as being worked on to ensure that the new status is still + // waiting. + $this->worker->workingOn($job); + + // Now recreate it + $newToken = $job->recreate(); + + // Make sure we've got a new job returned + $this->assertNotEquals($originalToken, $newToken); + + // Now check the status of the new job + $newJob = Resque_Job::reserve('jobs'); + $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $newJob->getStatus()); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php new file mode 100644 index 0000000..1d1db56 --- /dev/null +++ b/test/Resque/Tests/JobTest.php @@ -0,0 +1,169 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_JobTest extends Resque_Tests_TestCase +{ + protected $worker; + + public function setUp() + { + parent::setUp(); + + // Register a worker to test with + $this->worker = new Resque_Worker('jobs'); + $this->worker->registerWorker(); + } + + public function testJobCanBeQueued() + { + $this->assertTrue((bool)Resque::enqueue('jobs', 'Test_Job')); + } + + public function testQeueuedJobCanBeReserved() + { + Resque::enqueue('jobs', 'Test_Job'); + + $job = Resque_Job::reserve('jobs'); + if($job == false) { + $this->fail('Job could not be reserved.'); + } + $this->assertEquals('jobs', $job->queue); + $this->assertEquals('Test_Job', $job->payload['class']); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testObjectArgumentsCannotBePassedToJob() + { + $args = new stdClass; + $args->test = 'somevalue'; + Resque::enqueue('jobs', 'Test_Job', $args); + } + + public function testQueuedJobReturnsExactSamePassedInArguments() + { + $args = array( + 'int' => 123, + 'numArray' => array( + 1, + 2, + ), + 'assocArray' => array( + 'key1' => 'value1', + 'key2' => 'value2' + ), + ); + Resque::enqueue('jobs', 'Test_Job', $args); + $job = Resque_Job::reserve('jobs'); + + $this->assertEquals($args, $job->payload['args']); + } + + public function testAfterJobIsReservedItIsRemoved() + { + Resque::enqueue('jobs', 'Test_Job'); + Resque_Job::reserve('jobs'); + $this->assertFalse(Resque_Job::reserve('jobs')); + } + + public function testRecreatedJobMatchesExistingJob() + { + $args = array( + 'int' => 123, + 'numArray' => array( + 1, + 2, + ), + 'assocArray' => array( + 'key1' => 'value1', + 'key2' => 'value2' + ), + ); + + Resque::enqueue('jobs', 'Test_Job', $args); + $job = Resque_Job::reserve('jobs'); + + // Now recreate it + $job->recreate(); + + $newJob = Resque_Job::reserve('jobs'); + $this->assertEquals($job->payload['class'], $newJob->payload['class']); + $this->assertEquals($job->payload['args'], $newJob->payload['args']); + } + + public function testFailedJobExceptionsAreCaught() + { + $payload = array( + 'class' => 'Failing_Job', + 'args' => null + ); + $job = new Resque_Job('jobs', $payload); + $job->worker = $this->worker; + + $this->worker->perform($job); + + $this->assertEquals(1, Resque_Stat::get('failed')); + $this->assertEquals(1, Resque_Stat::get('failed:'.$this->worker)); + } + + /** + * @expectedException Resque_Exception + */ + public function testJobWithoutPerformMethodThrowsException() + { + Resque::enqueue('jobs', 'Test_Job_Without_Perform_Method'); + $job = $this->worker->reserve(); + $job->worker = $this->worker; + $job->perform(); + } + + /** + * @expectedException Resque_Exception + */ + public function testInvalidJobThrowsException() + { + Resque::enqueue('jobs', 'Invalid_Job'); + $job = $this->worker->reserve(); + $job->worker = $this->worker; + $job->perform(); + } + + public function testJobWithSetUpCallbackFiresSetUp() + { + $payload = array( + 'class' => 'Test_Job_With_SetUp', + 'args' => array( + 'somevar', + 'somevar2', + ), + ); + $job = new Resque_Job('jobs', $payload); + $job->perform(); + + $this->assertTrue(Test_Job_With_SetUp::$called); + } + + public function testJobWithTearDownCallbackFiresSetUp() + { + $payload = array( + 'class' => 'Test_Job_With_TearDown', + 'args' => array( + 'somevar', + 'somevar2', + ), + ); + $job = new Resque_Job('jobs', $payload); + $job->perform(); + + $this->assertTrue(Test_Job_With_TearDown::$called); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/StatTest.php b/test/Resque/Tests/StatTest.php new file mode 100644 index 0000000..6404794 --- /dev/null +++ b/test/Resque/Tests/StatTest.php @@ -0,0 +1,52 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_StatTest extends Resque_Tests_TestCase +{ + public function testStatCanBeIncremented() + { + Resque_Stat::incr('test_incr'); + Resque_Stat::incr('test_incr'); + $this->assertEquals(2, $this->redis->get('resque:stat:test_incr')); + } + + public function testStatCanBeIncrementedByX() + { + Resque_Stat::incr('test_incrX', 10); + Resque_Stat::incr('test_incrX', 11); + $this->assertEquals(21, $this->redis->get('resque:stat:test_incrX')); + } + + public function testStatCanBeDecremented() + { + Resque_Stat::incr('test_decr', 22); + Resque_Stat::decr('test_decr'); + $this->assertEquals(21, $this->redis->get('resque:stat:test_decr')); + } + + public function testStatCanBeDecrementedByX() + { + Resque_Stat::incr('test_decrX', 22); + Resque_Stat::decr('test_decrX', 11); + $this->assertEquals(11, $this->redis->get('resque:stat:test_decrX')); + } + + public function testGetStatByName() + { + Resque_Stat::incr('test_get', 100); + $this->assertEquals(100, Resque_Stat::get('test_get')); + } + + public function testGetUnknownStatReturns0() + { + $this->assertEquals(0, Resque_Stat::get('test_get_unknown')); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/TestCase.php b/test/Resque/Tests/TestCase.php new file mode 100644 index 0000000..f4c00df --- /dev/null +++ b/test/Resque/Tests/TestCase.php @@ -0,0 +1,24 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_TestCase extends PHPUnit_Framework_TestCase +{ + protected $resque; + protected $redis; + + public function setUp() + { + $config = file_get_contents(REDIS_CONF); + preg_match('#^\s*port\s+([0-9]+)#m', $config, $matches); + $this->redis = new Redisent('localhost', $matches[1]); + + // Flush redis + $this->redis->flushAll(); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php new file mode 100644 index 0000000..47b0208 --- /dev/null +++ b/test/Resque/Tests/WorkerTest.php @@ -0,0 +1,253 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_WorkerTest extends Resque_Tests_TestCase +{ + public function testWorkerRegistersInList() + { + $worker = new Resque_Worker('*'); + $worker->registerWorker(); + + // Make sure the worker is in the list + $this->assertTrue((bool)$this->redis->sismember('resque:workers', (string)$worker)); + } + + public function testGetAllWorkers() + { + $num = 3; + // Register a few workers + for($i = 0; $i < $num; ++$i) { + $worker = new Resque_Worker('queue_' . $i); + $worker->registerWorker(); + } + + // Now try to get them + $this->assertEquals($num, count(Resque_Worker::all())); + } + + public function testGetWorkerById() + { + $worker = new Resque_Worker('*'); + $worker->registerWorker(); + + $newWorker = Resque_Worker::find((string)$worker); + $this->assertEquals((string)$worker, (string)$newWorker); + } + + public function testInvalidWorkerDoesNotExist() + { + $this->assertFalse(Resque_Worker::exists('blah')); + } + + public function testWorkerCanUnregister() + { + $worker = new Resque_Worker('*'); + $worker->registerWorker(); + $worker->unregisterWorker(); + + $this->assertFalse(Resque_Worker::exists((string)$worker)); + $this->assertEquals(array(), Resque_Worker::all()); + $this->assertEquals(array(), $this->redis->smembers('resque:workers')); + } + + public function testPausedWorkerDoesNotPickUpJobs() + { + $worker = new Resque_Worker('*'); + $worker->pauseProcessing(); + Resque::enqueue('jobs', 'Test_Job'); + $worker->work(0); + $worker->work(0); + $this->assertEquals(0, Resque_Stat::get('processed')); + } + + public function testResumedWorkerPicksUpJobs() + { + $worker = new Resque_Worker('*'); + $worker->pauseProcessing(); + Resque::enqueue('jobs', 'Test_Job'); + $worker->work(0); + $this->assertEquals(0, Resque_Stat::get('processed')); + $worker->unPauseProcessing(); + $worker->work(0); + $this->assertEquals(1, Resque_Stat::get('processed')); + } + + public function testWorkerCanWorkOverMultipleQueues() + { + $worker = new Resque_Worker(array( + 'queue1', + 'queue2' + )); + $worker->registerWorker(); + Resque::enqueue('queue1', 'Test_Job_1'); + Resque::enqueue('queue2', 'Test_Job_2'); + + $job = $worker->reserve(); + $this->assertEquals('queue1', $job->queue); + + $job = $worker->reserve(); + $this->assertEquals('queue2', $job->queue); + } + + public function testWorkerWorksQueuesInSpecifiedOrder() + { + $worker = new Resque_Worker(array( + 'high', + 'medium', + 'low' + )); + $worker->registerWorker(); + + // Queue the jobs in a different order + Resque::enqueue('low', 'Test_Job_1'); + Resque::enqueue('high', 'Test_Job_2'); + Resque::enqueue('medium', 'Test_Job_3'); + + // Now check we get the jobs back in the right order + $job = $worker->reserve(); + $this->assertEquals('high', $job->queue); + + $job = $worker->reserve(); + $this->assertEquals('medium', $job->queue); + + $job = $worker->reserve(); + $this->assertEquals('low', $job->queue); + } + + public function testWildcardQueueWorkerWorksAllQueues() + { + $worker = new Resque_Worker('*'); + $worker->registerWorker(); + + Resque::enqueue('queue1', 'Test_Job_1'); + Resque::enqueue('queue2', 'Test_Job_2'); + + $job = $worker->reserve(); + $this->assertEquals('queue1', $job->queue); + + $job = $worker->reserve(); + $this->assertEquals('queue2', $job->queue); + } + + public function testWorkerDoesNotWorkOnUnknownQueues() + { + $worker = new Resque_Worker('queue1'); + $worker->registerWorker(); + Resque::enqueue('queue2', 'Test_Job'); + + $this->assertFalse($worker->reserve()); + } + + public function testWorkerClearsItsStatusWhenNotWorking() + { + Resque::enqueue('jobs', 'Test_Job'); + $worker = new Resque_Worker('jobs'); + $job = $worker->reserve(); + $worker->workingOn($job); + $worker->doneWorking(); + $this->assertEquals(array(), $worker->job()); + } + + public function testWorkerRecordsWhatItIsWorkingOn() + { + $worker = new Resque_Worker('jobs'); + $worker->registerWorker(); + + $payload = array( + 'class' => 'Test_Job' + ); + $job = new Resque_Job('jobs', $payload); + $worker->workingOn($job); + + $job = $worker->job(); + $this->assertEquals('jobs', $job['queue']); + if(!isset($job['run_at'])) { + $this->fail('Job does not have run_at time'); + } + $this->assertEquals($payload, $job['payload']); + } + + public function testWorkerErasesItsStatsWhenShutdown() + { + Resque::enqueue('jobs', 'Test_Job'); + Resque::enqueue('jobs', 'Invalid_Job'); + + $worker = new Resque_Worker('jobs'); + $worker->work(0); + $worker->work(0); + + $this->assertEquals(0, $worker->getStat('processed')); + $this->assertEquals(0, $worker->getStat('failed')); + } + + public function testWorkerCleansUpDeadWorkersOnStartup() + { + // Register a good worker + $goodWorker = new Resque_Worker('jobs'); + $goodWorker->registerWorker(); + $workerId = explode(':', $goodWorker); + + // Register some bad workers + $worker = new Resque_Worker('jobs'); + $worker->setId($workerId[0].':1:jobs'); + $worker->registerWorker(); + + $worker = new Resque_Worker(array('high', 'low')); + $worker->setId($workerId[0].':2:high,low'); + $worker->registerWorker(); + + $this->assertEquals(3, count(Resque_Worker::all())); + + $goodWorker->pruneDeadWorkers(); + + // There should only be $goodWorker left now + $this->assertEquals(1, count(Resque_Worker::all())); + } + + public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() + { + // Register a bad worker on this machine + $worker = new Resque_Worker('jobs'); + $workerId = explode(':', $worker); + $worker->setId($workerId[0].':1:jobs'); + $worker->registerWorker(); + + // Register some other false workers + $worker = new Resque_Worker('jobs'); + $worker->setId('my.other.host:1:jobs'); + $worker->registerWorker(); + + $this->assertEquals(2, count(Resque_Worker::all())); + + $worker->pruneDeadWorkers(); + + // my.other.host should be left + $workers = Resque_Worker::all(); + $this->assertEquals(1, count($workers)); + $this->assertEquals((string)$worker, (string)$workers[0]); + } + + public function testWorkerFailsUncompletedJobsOnExit() + { + $worker = new Resque_Worker('jobs'); + $worker->registerWorker(); + + $payload = array( + 'class' => 'Test_Job' + ); + $job = new Resque_Job('jobs', $payload); + + $worker->workingOn($job); + $worker->unregisterWorker(); + + $this->assertEquals(1, Resque_Stat::get('failed')); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/bootstrap.php b/test/Resque/Tests/bootstrap.php new file mode 100644 index 0000000..e21707c --- /dev/null +++ b/test/Resque/Tests/bootstrap.php @@ -0,0 +1,149 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +define('CWD', dirname(__FILE__)); +define('RESQUE_LIB', CWD . '/../../../lib/'); + +define('TEST_MISC', realpath(CWD . '/../../misc/')); +define('REDIS_CONF', TEST_MISC . '/redis.conf'); + +// Change to the directory this file lives in. This is important, due to +// how we'll be running redis. + +require_once CWD . '/TestCase.php'; + +// Include Resque +require_once RESQUE_LIB . 'Resque.php'; +require_once RESQUE_LIB . 'Resque/Worker.php'; + +// Attempt to start our own redis instance for tesitng. +exec('which redis-server', $output, $returnVar); +if($returnVar != 0) { + echo "Cannot find redis-server in path. Please make sure redis is installed.\n"; + exit(1); +} + +exec('cd ' . TEST_MISC . '; redis-server ' . REDIS_CONF, $output, $returnVar); +if($returnVar != 0) { + echo "Cannot start redis-server.\n"; + exit(1); + +} + +// Get redis port from conf +$config = file_get_contents(REDIS_CONF); +if(!preg_match('#^\s*port\s+([0-9]+)#m', $config, $matches)) { + echo "Could not determine redis port from redis.conf"; + exit(1); +} + +Resque::setBackend('localhost:' . $matches[1]); + +// Shutdown +function killRedis() +{ + $config = file_get_contents(REDIS_CONF); + if(!preg_match('#^\s*pidfile\s+([^\s]+)#m', $config, $matches)) { + return; + } + + $pidFile = TEST_MISC . '/' . $matches[1]; + $pid = trim(file_get_contents($pidFile)); + posix_kill($pid, 9); + + if(is_file($pidFile)) { + unlink($pidFile); + } + + // Remove the redis database + if(!preg_match('#^\s*dir\s+([^\s]+)#m', $config, $matches)) { + return; + } + $dir = $matches[1]; + + if(!preg_match('#^\s*dbfilename\s+([^\s]+)#m', $config, $matches)) { + return; + } + + $filename = TEST_MISC . '/' . $dir . '/' . $matches[1]; + if(is_file($filename)) { + unlink($filename); + } +} +register_shutdown_function('killRedis'); + +if(function_exists('pcntl_signal')) { + // Override INT and TERM signals, so they do a clean shutdown and also + // clean up redis-server as well. + function sigint() + { + exit; + } + pcntl_signal(SIGINT, 'sigint'); + pcntl_signal(SIGTERM, 'sigint'); +} + +class Test_Job +{ + public function perform() + { + + } +} + +class Failing_Job_Exception extends Exception +{ + +} + +class Failing_Job +{ + public function perform() + { + throw new Failing_Job_Exception('Message!'); + } +} + +class Test_Job_Without_Perform_Method +{ + +} + +class Test_Job_With_SetUp +{ + public static $called = false; + public $args = false; + + public function setUp() + { + self::$called = true; + } + + public function perform() + { + + } +} + + +class Test_Job_With_TearDown +{ + public static $called = false; + public $args = false; + + public function perform() + { + + } + + public function tearDown() + { + self::$called = true; + } +} \ No newline at end of file diff --git a/test/misc/redis.conf b/test/misc/redis.conf new file mode 100644 index 0000000..971f66e --- /dev/null +++ b/test/misc/redis.conf @@ -0,0 +1,8 @@ +daemonize yes +pidfile ./redis.pid +port 6379 +bind 127.0.0.1 +timeout 300 +dbfilename dump.rdb +dir ./ +loglevel debug \ No newline at end of file