From cb4205d508c4f1c876e03e5b536ff8002ddb0fd5 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Sun, 18 Apr 2010 23:58:43 +1000 Subject: [PATCH 01/15] Initial commit --- CHANGELOG.markdown | 3 + LICENSE | 20 + README.markdown | 198 +++++++++ TODO.markdown | 6 + bin/resque | 1 + build.xml | 17 + demo/bad_job.php | 9 + demo/check_status.php | 20 + demo/job.php | 10 + demo/long_job.php | 9 + demo/php_error_job.php | 9 + demo/queue.php | 19 + demo/resque.php | 8 + lib/Redisent/LICENSE | 22 + lib/Redisent/README.markdown | 67 +++ lib/Redisent/Redisent.php | 137 +++++++ lib/Redisent/RedisentCluster.php | 138 +++++++ lib/Resque.php | 128 ++++++ lib/Resque/Exception.php | 13 + lib/Resque/Failure.php | 59 +++ lib/Resque/Failure/Interface.php | 22 + lib/Resque/Failure/Redis.php | 35 ++ lib/Resque/Job.php | 195 +++++++++ lib/Resque/Job/DirtyExitException.php | 13 + lib/Resque/Job/Status.php | 144 +++++++ lib/Resque/Redis.php | 101 +++++ lib/Resque/Stat.php | 57 +++ lib/Resque/Worker.php | 564 ++++++++++++++++++++++++++ phpunit.xml | 23 ++ resque.php | 68 ++++ test/Resque/Tests/JobStatusTest.php | 102 +++++ test/Resque/Tests/JobTest.php | 140 +++++++ test/Resque/Tests/StatTest.php | 52 +++ test/Resque/Tests/TestCase.php | 24 ++ test/Resque/Tests/WorkerTest.php | 251 ++++++++++++ test/Resque/Tests/bootstrap.php | 116 ++++++ test/misc/redis.conf | 8 + 37 files changed, 2808 insertions(+) create mode 100644 CHANGELOG.markdown create mode 100644 LICENSE create mode 100644 README.markdown create mode 100644 TODO.markdown create mode 100644 bin/resque create mode 100644 build.xml create mode 100644 demo/bad_job.php create mode 100644 demo/check_status.php create mode 100644 demo/job.php create mode 100644 demo/long_job.php create mode 100644 demo/php_error_job.php create mode 100644 demo/queue.php create mode 100644 demo/resque.php create mode 100644 lib/Redisent/LICENSE create mode 100644 lib/Redisent/README.markdown create mode 100644 lib/Redisent/Redisent.php create mode 100644 lib/Redisent/RedisentCluster.php create mode 100644 lib/Resque.php create mode 100644 lib/Resque/Exception.php create mode 100644 lib/Resque/Failure.php create mode 100644 lib/Resque/Failure/Interface.php create mode 100644 lib/Resque/Failure/Redis.php create mode 100644 lib/Resque/Job.php create mode 100644 lib/Resque/Job/DirtyExitException.php create mode 100644 lib/Resque/Job/Status.php create mode 100644 lib/Resque/Redis.php create mode 100644 lib/Resque/Stat.php create mode 100644 lib/Resque/Worker.php create mode 100644 phpunit.xml create mode 100644 resque.php create mode 100644 test/Resque/Tests/JobStatusTest.php create mode 100644 test/Resque/Tests/JobTest.php create mode 100644 test/Resque/Tests/StatTest.php create mode 100644 test/Resque/Tests/TestCase.php create mode 100644 test/Resque/Tests/WorkerTest.php create mode 100644 test/Resque/Tests/bootstrap.php create mode 100644 test/misc/redis.conf diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown new file mode 100644 index 0000000..9944cd8 --- /dev/null +++ b/CHANGELOG.markdown @@ -0,0 +1,3 @@ +## 1.0 (2010-04-18) ## + +* Initial release \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6513591 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +(c) 2010 Chris Boulton + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.markdown b/README.markdown new file mode 100644 index 0000000..a00c3e3 --- /dev/null +++ b/README.markdown @@ -0,0 +1,198 @@ +php-resque: PHP Resque Worker (and Enqueue) +=========================================== + +Resque is a Redis-backed library for creating background jobs, placing +those jobs on multiple queues, and processing them later. + +Resque was pioneered and is developed by the fine folks at GitHub (yes, +I am a kiss-ass), and written in Ruby. + +What you're seeing here is an almost direct port of the Resque worker +and enqueue system to PHP, which I've thrown together because I'm sure +my PHP developers would have a fit if they had to write a line of Ruby. + +For more information on Resque, visit the official GitHub project: + + +And for background information, the launch post on the GitHub blog: + + +The PHP port does NOT include its own web interface for viewing queue +stats, as the data is stored in the exact same expected format as the +Ruby version of Resque. + +The PHP port allows for much the same as the Ruby version of Rescue: + +* Workers can be distributed between multiple machines +* Includes support for priorities (queues) +* Resilient to memory leaks (fork) +* Expects failure + +In addition, it also: + +* Has the ability to track the status of jobs +* Will mark a job as failed, if a forked child running a job does +not exit with a status code as 0 + +## Jobs ## + +### Queueing Jobs ### + +Jobs are queued as follows: + + require_once 'Resque.php'; + + // Required if redis is located elsewhere + Resque::setBackend('localhost', 6379); + + $args = array( + 'name' => 'Chris' + ); + Resque::enqueue('default', 'My_Job', $args); + +### Defining Jobs ### + +Each job should be in it's own class, and include a `perform` method. +It's important to note that classes are called statically. + + class My_Job + { + public static function perform($args) + { + // Work work work + } + } + +Any exception thrown by a job will result in the job failing - be +careful here and make sure you handle the exceptions that shouldn't +result in a job failing. + +### Tracking Job Statuses ### + +php-resque has the ability to perform basic status tracking of a queued +job. The status information will allow you to check if a job is in the +queue, currently being run, has finished, or failed. + +To track the status of a job, pass `true` as the fourth argument to +`Resque::enqueue`. A token used for tracking the job status will be +returned: + + $token = Resque::enqueue('default', 'My_Job', $args); + echo $token; + +To fetch the status of a job: + + $status = new Resque_Job_Status($token); + echo $status->get(); // Outputs the status + +Job statuses are defined as constants in the `Resque_Job_Status` class. +Valid statuses include: + +* `Resque_Job_Status::STATUS_WAITING` - Job is still queued +* `Resque_Job_Status::STATUS_RUNNING` - Job is currently running +* `Resque_Job_Status::STATUS_FAILED` - Job has failed +* `Resque_Job_Status::STATUS_COMPLETE` - Job is complete +* `false` - Failed to fetch the status - is the token valid? + +Statuses are available for up to 24 hours after a job has completed +or failed, and are then automatically expired. A status can also +forcefully be expired by calling the `stop()` method on a status +class. + +## Workers ## + +Workers work in the exact same way as the Ruby workers. For complete +documentation on workers, see the original documentation. + +A basic "up-and-running" resque.php file is included that sets up a +running worker environment is included in the root directory. + +The exception to the similarities with the Ruby version of resque is +how a worker is initially setup. To work under all environments, +not having a single environment such as with Ruby, the PHP port makes +*no* assumptions about your setup. + +To start a worker, it's very similar to the Ruby version: + + $ QUEUE=file_serve php resque.php + +It's your responsibility to tell the worker which file to include to get +your application underway. You do so by setting the `APP_INCLUDE` environment +variable: + + $ QUEUE=file_serve APP_INCLUDE=../application/init.php php resque.php + +Getting your application underway also includes telling the worker your job +classes, by means of either an autoloader or including them. + +### Logging ### + +The port supports the same environment variables for logging to STDOUT. +Setting `VERBOSE` will print basic debugging information and `VVERBOSE` +will print detailed information. + + $ VERBOSE QUEUE=file_serve php resque.php + $ VVERBOSE QUEUE=file_serve php resque.php + +### Priorities and Queue Lists ### + +Similarly, priority and queue list functionality works exactly +the same as the Ruby workers. Multiple queues should be separated with +a comma, and the order that they're supplied in is the order that they're +checked in. + +As per the original example: + + $ QUEUES=file_serve,warm_cache php resque.php + +The `file_serve` queue will always be checked for new jobs on each +iteration before the `warm_cache` queue is checked. + +### Running All Queues ### + +All queues are supported in the same manner and processed in alphabetical +order: + + $ QUEUES=* php resque.php + +### Running Multiple Workers ### + +Multiple workers ca be launched and automatically worked by supplying +the `COUNT` environment variable: + + $ COUNT=5 php resque.php + +### Forking ### + +Similarly to the Ruby versions, supported platforms will immediately +fork after picking up a job. The forked child will exit as soon as +the job finishes. + +The difference with php-resque is that if a forked child does not +exit nicely (PHP error or such), php-resque will automatically fail +the job. + +### Signals ### + +Signals also work on supported platforms exactly as in the Ruby +version of Resque: + +* `QUIT` - Wait for child to finish processing then exit +* `TERM` / `INT` - Immediately kill child then exit +* `USR1` - Immediately kill child but don't exit +* `USR2` - Pause worker, no new jobs will be processed +* `CONT` - Resume worker. + +### Process Titles/Statuses ### + +The Ruby version of Resque has a nifty feature whereby the process +title of the worker is updated to indicate what the worker is doing, +and any forked children also set their process title with the job +being run. This helps identify running processes on the server and +their resque status. + +**PHP does not have this functionality by default.** + +A PECL module () exists that +adds this funcitonality to PHP, so if you'd like process titles updated, +install the PECL module as well. php-resque will detect and use it. \ No newline at end of file diff --git a/TODO.markdown b/TODO.markdown new file mode 100644 index 0000000..db6b6d6 --- /dev/null +++ b/TODO.markdown @@ -0,0 +1,6 @@ +* Write tests for: + * `Resque_Failure` + * `Resque_Failure_Redis` +* Plugin/hook type system similar to Ruby version +* Change to preforking worker model +* Clean up /bin and /demo \ No newline at end of file diff --git a/bin/resque b/bin/resque new file mode 100644 index 0000000..1a24852 --- /dev/null +++ b/bin/resque @@ -0,0 +1 @@ +#!/bin/sh diff --git a/build.xml b/build.xml new file mode 100644 index 0000000..7289999 --- /dev/null +++ b/build.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/demo/bad_job.php b/demo/bad_job.php new file mode 100644 index 0000000..bc12620 --- /dev/null +++ b/demo/bad_job.php @@ -0,0 +1,9 @@ + \ No newline at end of file diff --git a/demo/check_status.php b/demo/check_status.php new file mode 100644 index 0000000..c5c194c --- /dev/null +++ b/demo/check_status.php @@ -0,0 +1,20 @@ +isTracking()) { + die("Resque is not tracking the status of this job.\n"); +} + +echo "Tracking status of ".$argv[1].". Press [break] to stop.\n\n"; +while(true) { + fwrite(STDOUT, "Status of ".$argv[1]." is: ".$status->get()."\n"); + sleep(1); +} +?> \ No newline at end of file diff --git a/demo/job.php b/demo/job.php new file mode 100644 index 0000000..5b72c5c --- /dev/null +++ b/demo/job.php @@ -0,0 +1,10 @@ + \ No newline at end of file diff --git a/demo/long_job.php b/demo/long_job.php new file mode 100644 index 0000000..8c9f0f9 --- /dev/null +++ b/demo/long_job.php @@ -0,0 +1,9 @@ + \ No newline at end of file diff --git a/demo/php_error_job.php b/demo/php_error_job.php new file mode 100644 index 0000000..93bf2bc --- /dev/null +++ b/demo/php_error_job.php @@ -0,0 +1,9 @@ + \ No newline at end of file diff --git a/demo/queue.php b/demo/queue.php new file mode 100644 index 0000000..6ae4962 --- /dev/null +++ b/demo/queue.php @@ -0,0 +1,19 @@ +test = 'test'; + +$args = array( + time(), + $class +); +$jobId = Resque::enqueue('default', $argv[1], $args, true); +echo "Queued job ".$jobId."\n\n"; +?> \ No newline at end of file diff --git a/demo/resque.php b/demo/resque.php new file mode 100644 index 0000000..5af0cf1 --- /dev/null +++ b/demo/resque.php @@ -0,0 +1,8 @@ + \ No newline at end of file diff --git a/lib/Redisent/LICENSE b/lib/Redisent/LICENSE new file mode 100644 index 0000000..385910f --- /dev/null +++ b/lib/Redisent/LICENSE @@ -0,0 +1,22 @@ +Copyright (c) 2009 Justin Poliey + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without +restriction, including without limitation the rights to use, +copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/lib/Redisent/README.markdown b/lib/Redisent/README.markdown new file mode 100644 index 0000000..3edb843 --- /dev/null +++ b/lib/Redisent/README.markdown @@ -0,0 +1,67 @@ +# Redisent + +Redisent is a simple, no-nonsense interface to the [Redis](http://code.google.com/p/redis/) key-value store for modest developers. +Due to the way it is implemented, it is flexible and tolerant of changes to the Redis protocol. + +## Getting to work + +If you're at all familiar with the Redis protocol and PHP objects, you've already mastered Redisent. +All Redisent does is map the Redis protocol to a PHP object, abstract away the nitty-gritty, and make the return values PHP compatible. + + require 'redisent.php'; + $redis = new Redisent('localhost'); + $redis->set('awesome', 'absolutely'); + echo sprintf('Is Redisent awesome? %s.\n', $redis->get('awesome')); + +You use the exact same command names, and the exact same argument order. **How wonderful.** How about a more complex example? + + require 'redisent.php'; + $redis = new Redisent('localhost'); + $redis->rpush('particles', 'proton'); + $redis->rpush('particles', 'electron'); + $redis->rpush('particles', 'neutron'); + $particles = $redis->lrange('particles', 0, -1); + $particle_count = $redis->llen('particles'); + echo "

The {$particle_count} particles that make up atoms are:

"; + echo "
    "; + foreach ($particles as $particle) { + echo "
  • {$particle}
  • "; + } + echo "
"; + +Be aware that Redis error responses will be wrapped in a RedisException class and thrown, so do be sure to use proper coding techniques. + +## Clustering your servers + +Redisent also includes a way for developers to fully utilize the scalability of Redis with multiple servers and [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing). +Using the RedisentCluster class, you can use Redisent the same way, except that keys will be hashed across multiple servers. +Here is how to set up a cluster: + + include 'redisent_cluster.php'; + + $cluster = new RedisentCluster(array( + array('host' => '127.0.0.1', 'port' => 6379), + array('host' => '127.0.0.1', 'port' => 6380) + )); + +You can then use Redisent the way you normally would, i.e., `$cluster->set('key', 'value')` or `$cluster->lrange('particles', 0, -1)`. +But what about when you need to use commands that are server specific and do not operate on keys? You can use routing, with the `RedisentCluster::to` method. +To use routing, you need to assign a server an alias in the constructor of the Redis cluster. Aliases are not required on all servers, just the ones you want to be able to access directly. + + include 'redisent_cluster.php'; + + $cluster = new RedisentCluster(array( + 'alpha' => array('host' => '127.0.0.1', 'port' => 6379), + array('host' => '127.0.0.1', 'port' => 6380) + )); + +Now there is an alias of the server running on 127.0.0.1:6379 called **alpha**, and can be interacted with like this: + + // get server info + $cluster->to('alpha')->info(); + +Now you have complete programatic control over your Redis servers. + +## About + +© 2009 [Justin Poliey](http://justinpoliey.com) \ No newline at end of file diff --git a/lib/Redisent/Redisent.php b/lib/Redisent/Redisent.php new file mode 100644 index 0000000..0b9ea6e --- /dev/null +++ b/lib/Redisent/Redisent.php @@ -0,0 +1,137 @@ + + * @copyright 2009 Justin Poliey + * @license http://www.opensource.org/licenses/mit-license.php The MIT License + * @package Redisent + */ + +define('CRLF', sprintf('%s%s', chr(13), chr(10))); + +/** + * Wraps native Redis errors in friendlier PHP exceptions + */ +class RedisException extends Exception { +} + +/** + * Redisent, a Redis interface for the modest among us + */ +class Redisent { + + /** + * Socket connection to the Redis server + * @var resource + * @access private + */ + private $__sock; + + /** + * Redis bulk commands, they are sent in a slightly different format to the server + * @var array + * @access private + */ + private $bulk_cmds = array( + 'SET', 'GETSET', 'SETNX', 'ECHO', + 'RPUSH', 'LPUSH', 'LSET', 'LREM', + 'SADD', 'SREM', 'SMOVE', 'SISMEMBER' + ); + + /** + * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}. + * @param string $host The hostname of the Redis server + * @param integer $port The port number of the Redis server + */ + function __construct($host, $port = 6379) { + $this->__sock = fsockopen($host, $port, $errno, $errstr); + if (!$this->__sock) { + throw new Exception("{$errno} - {$errstr}"); + } + } + + function __destruct() { + fclose($this->__sock); + } + + function __call($name, $args) { + + /* Build the Redis protocol command */ + $name = strtoupper($name); + if (in_array($name, $this->bulk_cmds)) { + $value = array_pop($args); + $command = sprintf("%s %s %d%s%s%s", $name, trim(implode(' ', $args)), strlen($value), CRLF, $value, CRLF); + } + else { + $command = sprintf("%s %s%s", $name, trim(implode(' ', $args)), CRLF); + } + + /* Open a Redis connection and execute the command */ + fwrite($this->__sock, $command); + + /* Parse the response based on the reply identifier */ + $reply = trim(fgets($this->__sock, 512)); + switch (substr($reply, 0, 1)) { + /* Error reply */ + case '-': + echo $command."\n"; + throw new RedisException(substr(trim($reply), 4)); + break; + /* Inline reply */ + case '+': + $response = substr(trim($reply), 1); + break; + /* Bulk reply */ + case '$': + if ($reply == '$-1') { + $response = null; + break; + } + $read = 0; + $size = substr($reply, 1); + do { + $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read); + $response = fread($this->__sock, $block_size); + $read += $block_size; + } while ($read < $size); + fread($this->__sock, 2); /* discard crlf */ + break; + /* Multi-bulk reply */ + case '*': + $count = substr($reply, 1); + if ($count == '-1') { + return null; + } + $response = array(); + for ($i = 0; $i < $count; $i++) { + $bulk_head = trim(fgets($this->__sock, 512)); + $size = substr($bulk_head, 1); + if ($size == '-1') { + $response[] = null; + } + else { + $read = 0; + $block = ""; + do { + $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read); + $block .= fread($this->__sock, $block_size); + $read += $block_size; + } while ($read < $size); + fread($this->__sock, 2); /* discard crlf */ + $response[] = $block; + } + } + break; + /* Integer reply */ + case ':': + $response = substr(trim($reply), 1); + break; + default: + throw new RedisException("invalid server response: {$reply}"); + break; + } + /* Party on */ + return $response; + } + +} \ No newline at end of file diff --git a/lib/Redisent/RedisentCluster.php b/lib/Redisent/RedisentCluster.php new file mode 100644 index 0000000..55645f8 --- /dev/null +++ b/lib/Redisent/RedisentCluster.php @@ -0,0 +1,138 @@ + + * @copyright 2009 Justin Poliey + * @license http://www.opensource.org/licenses/mit-license.php The MIT License + * @package Redisent + */ + +require 'redisent.php'; + +/** + * A generalized Redisent interface for a cluster of Redis servers + */ +class RedisentCluster { + + /** + * Collection of Redisent objects attached to Redis servers + * @var array + * @access private + */ + private $redisents; + + /** + * Aliases of Redisent objects attached to Redis servers, used to route commands to specific servers + * @see RedisentCluster::to + * @var array + * @access private + */ + private $aliases; + + /** + * Hash ring of Redis server nodes + * @var array + * @access private + */ + private $ring; + + /** + * Individual nodes of pointers to Redis servers on the hash ring + * @var array + * @access private + */ + private $nodes; + + /** + * Number of replicas of each node to make around the hash ring + * @var integer + * @access private + */ + private $replicas = 128; + + /** + * The commands that are not subject to hashing + * @var array + * @access private + */ + private $dont_hash = array( + 'RANDOMKEY', 'DBSIZE', + 'SELECT', 'MOVE', 'FLUSHDB', 'FLUSHALL', + 'SAVE', 'BGSAVE', 'LASTSAVE', 'SHUTDOWN', + 'INFO', 'MONITOR', 'SLAVEOF' + ); + + /** + * Creates a Redisent interface to a cluster of Redis servers + * @param array $servers The Redis servers in the cluster. Each server should be in the format array('host' => hostname, 'port' => port) + */ + function __construct($servers) { + $this->ring = array(); + $this->aliases = array(); + foreach ($servers as $alias => $server) { + $this->redisents[] = new Redisent($server['host'], $server['port']); + if (is_string($alias)) { + $this->aliases[$alias] = $this->redisents[count($this->redisents)-1]; + } + for ($replica = 1; $replica <= $this->replicas; $replica++) { + $this->ring[crc32($server['host'].':'.$server['port'].'-'.$replica)] = $this->redisents[count($this->redisents)-1]; + } + } + ksort($this->ring, SORT_NUMERIC); + $this->nodes = array_keys($this->ring); + } + + /** + * Routes a command to a specific Redis server aliased by {$alias}. + * @param string $alias The alias of the Redis server + * @return Redisent The Redisent object attached to the Redis server + */ + function to($alias) { + if (isset($this->aliases[$alias])) { + return $this->aliases[$alias]; + } + else { + throw new Exception("That Redisent alias does not exist"); + } + } + + /* Execute a Redis command on the cluster */ + function __call($name, $args) { + + /* Pick a server node to send the command to */ + $name = strtoupper($name); + if (!in_array($name, $this->dont_hash)) { + $node = $this->nextNode(crc32($args[0])); + $redisent = $this->ring[$node]; + } + else { + $redisent = $this->redisents[0]; + } + + /* Execute the command on the server */ + return call_user_func_array(array($redisent, $name), $args); + } + + /** + * Routes to the proper server node + * @param integer $needle The hash value of the Redis command + * @return Redisent The Redisent object associated with the hash + */ + private function nextNode($needle) { + $haystack = $this->nodes; + while (count($haystack) > 2) { + $try = floor(count($haystack) / 2); + if ($haystack[$try] == $needle) { + return $needle; + } + if ($needle < $haystack[$try]) { + $haystack = array_slice($haystack, 0, $try + 1); + } + if ($needle > $haystack[$try]) { + $haystack = array_slice($haystack, $try + 1); + } + } + return $haystack[count($haystack)-1]; + } + +} \ No newline at end of file diff --git a/lib/Resque.php b/lib/Resque.php new file mode 100644 index 0000000..ebb1c8a --- /dev/null +++ b/lib/Resque.php @@ -0,0 +1,128 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque +{ + const VERSION = '1.0'; + + /** + * @var Resque_Redis Instance of Resque_Redis that talks to redis. + */ + public static $redis = null; + + /** + * Given a host/port combination separated by a colon, set it as + * the redis server that Resque will talk to. + * + * @param string $server Host/port combination separated by a colon. + */ + public static function setBackend($server) + { + list($host, $port) = explode(':', $server); + + require_once 'Resque/Redis.php'; + self::$redis = new Resque_Redis($host, $port); + } + + /** + * Return an instance of the Resque_Redis class instantiated for Resque. + * + * @return Resque_Redis Instance of Resque_Redis. + */ + public static function redis() + { + if(is_null(self::$redis)) { + self::setBackend('localhost:6379'); + } + + return self::$redis; + } + + /** + * Push a job to the end of a specific queue. If the queue does not + * exist, then create it as well. + * + * @param string $queue The name of the queue to add the job to. + * @param object $item Job description as an object to be JSON encoded. + */ + public static function push($queue, $item) + { + self::redis()->sadd('queues', $queue); + self::redis()->rpush('queue:' . $queue, json_encode($item)); + } + + /** + * Pop an item off the end of the specified queue, decode it and + * return it. + * + * @param string $queue The name of the queue to fetch an item from. + * @return object Decoded item from the queue. + */ + public static function pop($queue) + { + $item = self::redis()->lpop('queue:' . $queue); + if(!$item) { + return; + } + + return json_decode($item); + } + + /** + * Return the size (number of pending jobs) of the specified queue. + * + * @return int The size of the queue. + */ + public static function size($queue) + { + return self::redis()->llen('queue:' . $queue); + } + + /** + * Create a new job and save it to the specified queue. + * + * @param string $queue The name of the queue to place the job in. + * @param string $class The name of the class that contains the code to execute the job. + * @param array $args Any optional arguments that should be passed when the job is executed. + * @param boolean $monitor Set to true to be able to monitor the status of a job. + */ + public static function enqueue($queue, $class, $args = null, $trackStatus = false) + { + require_once 'Resque/Job.php'; + return Resque_Job::create($queue, $class, $args, $trackStatus); + } + + /** + * Reserve and return the next available job in the specified queue. + * + * @param string $queue Queue to fetch next available job from. + * @return Resque_Job Instance of Resque_Job to be processed, false if none or error. + */ + public static function reserve($queue) + { + require_once 'Resque/Job.php'; + return Resque_Job::reserve($queue); + } + + /** + * Get an array of all known queues. + * + * @return array Array of queues. + */ + public static function queues() + { + $queues = self::redis()->smembers('queues'); + if(!is_array($queues)) { + $queues = array(); + } + return $queues; + } +} \ No newline at end of file diff --git a/lib/Resque/Exception.php b/lib/Resque/Exception.php new file mode 100644 index 0000000..b288bf4 --- /dev/null +++ b/lib/Resque/Exception.php @@ -0,0 +1,13 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Exception extends Exception +{ +} +?> \ No newline at end of file diff --git a/lib/Resque/Failure.php b/lib/Resque/Failure.php new file mode 100644 index 0000000..cf10d49 --- /dev/null +++ b/lib/Resque/Failure.php @@ -0,0 +1,59 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Failure +{ + /** + * @var string Class name representing the backend to pass failed jobs off to. + */ + private static $backend; + + /** + * Create a new failed job on the backend. + * + * @param object $payload The contents of the job that has just failed. + * @param object $exception The exception generated when the job failed to run. + * @param object $worker Instance of Resque_Worker that was running this job when it failed. + * @param string $queue The name of the queue that this job was fetched from. + */ + public static function create($payload, Exception $exception, Resque_Worker $worker, $queue) + { + $backend = self::getBackend(); + new $backend($payload, $exception, $worker, $queue); + } + + /** + * Return an instance of the backend for saving job failures. + * + * @return object Instance of backend object. + */ + public function getBackend() + { + if(self::$backend === null) { + require 'Failure/Redis.php'; + self::$backend = 'Resque_Failure_Redis'; + } + + return self::$backend; + } + + /** + * Set the backend to use for raised job failures. The supplied backend + * should be the name of a class to be instantiated when a job fails. + * It is your responsibility to have the backend class loaded (or autoloaded) + * + * @param string $backend The class name of the backend to pipe failures to. + */ + public function setBackend($backend) + { + self::$backend = $backend; + } +} \ No newline at end of file diff --git a/lib/Resque/Failure/Interface.php b/lib/Resque/Failure/Interface.php new file mode 100644 index 0000000..863cd0b --- /dev/null +++ b/lib/Resque/Failure/Interface.php @@ -0,0 +1,22 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +interface Resque_Failure_Interface +{ + /** + * Initialize a failed job class and save it (where appropriate). + * + * @param object $payload Object containing details of the failed job. + * @param object $exception Instance of the exception that was thrown by the failed job. + * @param object $worker Instance of Resque_Worker that received the job. + * @param string $queue The name of the queue the job was fetched from. + */ + public function __construct($payload, $exception, $worker, $queue); +} +?> \ No newline at end of file diff --git a/lib/Resque/Failure/Redis.php b/lib/Resque/Failure/Redis.php new file mode 100644 index 0000000..c81bfc2 --- /dev/null +++ b/lib/Resque/Failure/Redis.php @@ -0,0 +1,35 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ + +class Resque_Failure_Redis implements Resque_Failure_Interface +{ + /** + * Initialize a failed job class and save it (where appropriate). + * + * @param object $payload Object containing details of the failed job. + * @param object $exception Instance of the exception that was thrown by the failed job. + * @param object $worker Instance of Resque_Worker that received the job. + * @param string $queue The name of the queue the job was fetched from. + */ + public function __construct($payload, $exception, $worker, $queue) + { + $data = new stdClass; + $data->failed_at = strftime('%a %b %d %H:%M:%S %Z %Y'); + $data->payload = $payload; + $data->exception = get_class($exception); + $data->error = $exception->getMessage(); + $data->backtrace = explode("\n", $exception->getTraceAsString()); + $data->worker = (string)$worker; + $data->queue = $queue; + $data = json_encode($data); + Resque::redis()->rpush('failed', $data); + } +} +?> \ No newline at end of file diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php new file mode 100644 index 0000000..9553e57 --- /dev/null +++ b/lib/Resque/Job.php @@ -0,0 +1,195 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job +{ + /** + * @var string The name of the queue that this job belongs to. + */ + public $queue; + + /** + * @var Resque_Worker Instance of the Resque worker running this job. + */ + public $worker; + + /** + * @var object Object containing details of the job. + */ + public $payload; + + /** + * Instantiate a new instance of a job. + * + * @param string $queue The queue that the job belongs to. + * @param object $payload Object containing details of the job. + */ + public function __construct($queue, $payload) + { + $this->queue = $queue; + $this->payload = $payload; + } + + /** + * Create a new job and save it to the specified queue. + * + * @param string $queue The name of the queue to place the job in. + * @param string $class The name of the class that contains the code to execute the job. + * @param object $args Any optional arguments that should be passed when the job is executed. Pass as a class. + * @param boolean $monitor Set to true to be able to monitor the status of a job. + */ + public static function create($queue, $class, $args = null, $monitor = false) + { + if($args !== null && !is_object($args)) { + throw new InvalidArgumentException( + 'Supplied $args must be an object and an instance of stdClass.' + ); + } + $id = md5(uniqid('', true)); + Resque::push($queue, array( + 'class' => $class, + 'args' => $args, + 'id' => $id, + )); + + if($monitor) { + Resque_Job_Status::create($id); + } + + return $id; + } + + /** + * Find the next available job from the specified queue and return an + * instance of Resque_Job for it. + * + * @param string $queue The name of the queue to check for a job in. + * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserve($queue) + { + $payload = Resque::pop($queue); + if(!$payload) { + return false; + } + + return new Resque_Job($queue, $payload); + } + + /** + * Update the status of the current job. + * + * @param int $status Status constant from Resque_Job_Status indicating the current status of a job. + */ + public function updateStatus($status) + { + if(empty($this->payload->id)) { + return; + } + + $statusInstance = new Resque_Job_Status($this->payload->id); + $statusInstance->update($status); + } + + /** + * Return the status of the current job. + * + * @return int The status of the job as one of the Resque_Job_Status constants. + */ + public function getStatus() + { + $status = new Resque_Job_Status($this->payload->id); + return $status->get(); + } + + /** + * Actually execute a job by calling the perform method on the class + * associated with the job with the supplied arguments. + * + * @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. + */ + public function perform() + { + if(!class_exists($this->payload->class)) { + throw new Resque_Exception( + 'Could not find job class ' . $this->payload->class . '.' + ); + } + + if(!method_exists($this->payload->class, 'perform')) { + throw new Resque_Exception( + 'Job class ' . $this->payload->class . ' does not contain a perform method.' + ); + } + + call_user_func(array($this->payload->class, 'perform'), $this->payload->args); + } + + /** + * Mark the current job as having failed. + */ + public function fail($exception) + { + $this->updateStatus(Resque_Job_Status::STATUS_FAILED); + require_once 'Failure.php'; + Resque_Failure::create( + $this->payload, + $exception, + $this->worker, + $this->queue + ); + Resque_Stat::incr('failed'); + Resque_Stat::incr('failed:' . $this->worker); + } + + /** + * Re-queue the current job. + */ + public function recreate() + { + $status = new Resque_Job_Status($this->payload->id); + $monitor = false; + if($status->isTracking()) { + $monitor = true; + } + + return self::create($this->queue, $this->payload->class, $this->payload->args, $monitor); + } + + /** + * Generate a string representation used to describe the current job. + * + * @return string The string representation of the job. + */ + public function __toString() + { + $args = array(); + if(isset($this->payload->args)) { + $args = $this->payload->args; + foreach($args as $k => $v) { + if(is_object($v)) { + $args[$k] = '{' . get_class($v) . ' - '.implode(',', get_object_vars($v)) . '}'; + } + } + } + + $name = array( + 'Job{' . $this->queue .'}' + ); + if(!empty($this->payload->id)) { + $name[] = 'ID: ' . $this->payload->id; + } + $name[] = $this->payload->class; + $name[] = implode(',', $args); + return '(' . implode(' | ', $name) . ')'; + } +} +?> \ No newline at end of file diff --git a/lib/Resque/Job/DirtyExitException.php b/lib/Resque/Job/DirtyExitException.php new file mode 100644 index 0000000..b69413a --- /dev/null +++ b/lib/Resque/Job/DirtyExitException.php @@ -0,0 +1,13 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_DirtyExitException extends RuntimeException +{ + +} \ No newline at end of file diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php new file mode 100644 index 0000000..8b45e98 --- /dev/null +++ b/lib/Resque/Job/Status.php @@ -0,0 +1,144 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_Status +{ + const STATUS_WAITING = 1; + const STATUS_RUNNING = 2; + const STATUS_FAILED = 3; + const STATUS_COMPLETE = 4; + + /** + * @var string The ID of the job this status class refers back to. + */ + private $id; + + /** + * @var mixed Cache variable if the status of this job is being monitored or not. + * True/false when checked at least once or null if not checked yet. + */ + private $isTracking = null; + + /** + * @var array Array of statuses that are considered final/complete. + */ + private static $completeStatuses = array( + self::STATUS_FAILED, + self::STATUS_COMPLETE + ); + + /** + * Setup a new instance of the job monitor class for the supplied job ID. + * + * @param string $id The ID of the job to manage the status for. + */ + public function __construct($id) + { + $this->id = $id; + } + + /** + * Create a new status monitor item for the supplied job ID. Will create + * all necessary keys in Redis to monitor the status of a job. + * + * @param string $id The ID of the job to monitor the status of. + */ + public static function create($id) + { + $statusPacket = array( + 'status' => self::STATUS_WAITING, + 'updated' => time(), + 'started' => time(), + ); + Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket)); + } + + /** + * Check if we're actually checking the status of the loaded job status + * instance. + * + * @return boolean True if the status is being monitored, false if not. + */ + public function isTracking() + { + if($this->isTracking === false) { + return false; + } + + if(!Resque::redis()->exists((string)$this)) { + $this->isTracking = false; + return false; + } + + $this->isTracking = true; + return true; + } + + /** + * Update the status indicator for the current job with a new status. + * + * @param int The status of the job (see constants in Resque_Job_Status) + */ + public function update($status) + { + if(!$this->isTracking()) { + return; + } + + $statusPacket = array( + 'status' => $status, + 'updated' => time(), + ); + Resque::redis()->set((string)$this, json_encode($statusPacket)); + + // Expire the status for completed jobs after 24 hours + if(in_array($status, self::$completeStatuses)) { + Resque::redis()->expire((string)$this, 86400); + } + } + + /** + * Fetch the status for the job being monitored. + * + * @return mixed False if the status is not being monitored, otherwise the status as + * as an integer, based on the Resque_Job_Status constants. + */ + public function get() + { + if(!$this->isTracking()) { + return false; + } + + $statusPacket = json_decode(Resque::redis()->get((string)$this)); + if(!$statusPacket) { + return false; + } + + return $statusPacket->status; + } + + /** + * Stop tracking the status of a job. + */ + public function stop() + { + Resque::redis()->del((string)$this); + } + + /** + * Generate a string representation of this object. + * + * @return string String representation of the current job status class. + */ + public function __toString() + { + return 'job:' . $this->id . ':status'; + } +} +?> \ No newline at end of file diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php new file mode 100644 index 0000000..874bf69 --- /dev/null +++ b/lib/Resque/Redis.php @@ -0,0 +1,101 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Redis extends Redisent +{ + /** + * @var array List of all commands in Redis that supply a key as their + * first argument. Used to prefix keys with the Resque namespace. + */ + private $keyCommands = array( + 'exists', + 'del', + 'type', + 'keys', + 'expire', + 'ttl', + 'move', + 'set', + 'get', + 'getset', + 'setnx', + 'incr', + 'incrby', + 'decrby', + 'decrby', + 'rpush', + 'lpush', + 'llen', + 'lrange', + 'ltrim', + 'lindex', + 'lset', + 'lrem', + 'lpop', + 'rpop', + 'sadd', + 'srem', + 'spop', + 'scard', + 'sismember', + 'smembers', + 'srandmember', + 'zadd', + 'zrem', + 'zrange', + 'zrevrange', + 'zrangebyscore', + 'zcard', + 'zscore', + 'zremrangebyscore', + 'sort' + ); + // sinterstore + // sunion + // sunionstore + // sdiff + // sdiffstore + // sinter + // smove + // rename + // rpoplpush + // mget + // msetnx + // mset + // renamenx + + /** + * Magic method to handle all function requests and prefix key based + * operations with the 'resque:' key prefix. + * + * @param string $name The name of the method called. + * @param array $args Array of supplied arguments to the method. + * @return mixed Return value from Resident::call() based on the command. + */ + public function __call($name, $args) { + $args = func_get_args(); + if(in_array($name, $this->keyCommands)) { + $args[1][0] = 'resque:' . $args[1][0]; + } + try { + return parent::__call($name, $args[1]); + } + catch(RedisException $e) { + return false; + } + } +} +?> \ No newline at end of file diff --git a/lib/Resque/Stat.php b/lib/Resque/Stat.php new file mode 100644 index 0000000..7956a4c --- /dev/null +++ b/lib/Resque/Stat.php @@ -0,0 +1,57 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Stat +{ + /** + * Get the value of the supplied statistic counter for the specified statistic. + * + * @param string $stat The name of the statistic to get the stats for. + * @return mixed Value of the statistic. + */ + public function get($stat) + { + return (int)Resque::redis()->get('stat:' . $stat); + } + + /** + * Increment the value of the specified statistic by a certain amount (default is 1) + * + * @param string $stat The name of the statistic to increment. + * @param int $by The amount to increment the statistic by. + * @return boolean True if successful, false if not. + */ + public function incr($stat, $by = 1) + { + return (bool)Resque::redis()->incrby('stat:' . $stat, $by); + } + + /** + * Decrement the value of the specified statistic by a certain amount (default is 1) + * + * @param string $stat The name of the statistic to decrement. + * @param int $by The amount to decrement the statistic by. + * @return boolean True if successful, false if not. + */ + public function decr($stat, $by = 1) + { + return (bool)Resque::redis()->decrby('stat:' . $stat, $by); + } + + /** + * Delete a statistic with the given name. + * + * @param string $stat The name of the statistic to delete. + * @return boolean True if successful, false if not. + */ + public function clear($stat) + { + return (bool)Resque::redis()->del('stat:' . $stat); + } +} \ No newline at end of file diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php new file mode 100644 index 0000000..e2516d3 --- /dev/null +++ b/lib/Resque/Worker.php @@ -0,0 +1,564 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Worker +{ + const LOG_NONE = 0; + const LOG_NORMAL = 1; + const LOG_VERBOSE = 2; + + /** + * @var int Current log level of this worker. + */ + public $logLevel = 0; + + /** + * @var array Array of all associated queues for this worker. + */ + private $queues = array(); + + /** + * @var string The hostname of this worker. + */ + private $hostname; + + /** + * @var boolean True if on the next iteration, the worker should shutdown. + */ + private $shutdown = false; + + /** + * @var boolean True if this worker is paused. + */ + private $paused = false; + + /** + * @var string String identifying this worker. + */ + private $id; + + /** + * @var Resque_Job Current job, if any, being processed by this worker. + */ + private $currentJob = null; + + /** + * Return all workers known to Resque as instantiated instances. + */ + public static function all() + { + $workers = Resque::redis()->smembers('workers'); + if(!is_array($workers)) { + $workers = array(); + } + + $instances = array(); + foreach($workers as $workerId) { + $instances[] = self::find($workerId); + } + return $instances; + } + + /** + * Given a worker ID, check if it is registered/valid. + * + * @param string $workerId ID of the worker. + * @return boolean True if the worker exists, false if not. + */ + public static function exists($workerId) + { + return (bool)Resque::redis()->sismember('workers', $workerId); + } + + /** + * Given a worker ID, find it and return an instantiated worker class for it. + * + * @param string $workerId The ID of the worker. + * @return Resque_Worker Instance of the worker. False if the worker does not exist. + */ + public static function find($workerId) + { + if(!self::exists($workerId)) { + return false; + } + + list($hostname, $pid, $queues) = explode(':', $workerId, 3); + $queues = explode(',', $queues); + $worker = new self($queues); + $worker->setId($workerId); + return $worker; + } + + /** + * Set the ID of this worker to a given ID string. + * + * @param string $workerId ID for the worker. + */ + public function setId($workerId) + { + $this->id = $workerId; + } + + /** + * Instantiate a new worker, given a list of queues that it should be working + * on. The list of queues should be supplied in the priority that they should + * be checked for jobs (first come, first served) + * + * Passing a single '*' allows the worker to work on all queues in alphabetical + * order. You can easily add new queues dynamically and have them worked on using + * this method. + * + * @param string|array $queues String with a single queue name, array with multiple. + */ + public function __construct($queues) + { + if(!is_array($queues)) { + $queues = array($queues); + } + + $this->queues = $queues; + if(function_exists('gethostname')) { + $hostname = gethostname(); + } + else { + $hostname = php_uname('n'); + } + $this->hostname = $hostname; + $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); + } + + /** + * The primary loop for a worker which when called on an instance starts + * the worker's life cycle. + * + * Queues are checked every $interval (seconds) for new jobs. + * + * @param int $interval How often to check for new jobs across the queues. + */ + public function work($interval = 5) + { + $this->updateProcLine('Starting'); + $this->startup(); + + while(true) { + if($this->shutdown) { + break; + } + + // Attempt to find and reserve a job + $job = false; + if(!$this->paused) { + $job = $this->reserve(); + } + + if(!$job) { + // For an interval of 0, break now - helps with unit testing etc + if($interval == 0) { + break; + } + // If no job was found, we sleep for $interval before continuing and checking again + $this->log('Sleeping for ' . $interval, true); + if($this->paused) { + $this->updateProcLine('Paused'); + } + else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + } + sleep($interval); + continue; + } + + $this->log('got ' . $job); + $this->workingOn($job); + + $this->child = $this->fork(); + + // Forked and we're the child. Run the job. + if($this->child === 0 || $this->child === false) { + $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->log($status, self::LOG_VERBOSE); + $this->perform($job); + if($this->child === 0) { + exit(0); + } + } + + if($this->child > 0) { + // Parent process, sit and wait + $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); + $this->updateProcLine($status); + $this->log($status, self::LOG_VERBOSE); + + // Wait until the child process finishes before continuing + pcntl_wait($status); + $exitStatus = pcntl_wexitstatus($status); + if($exitStatus !== 0) { + $job->fail(new Resque_Job_DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + } + } + + $this->child = null; + $this->doneWorking(); + } + + $this->unregisterWorker(); + } + + /** + * Process a single job. + * + * @param object|null $job The job to be processed. + */ + public function perform(Resque_Job $job) + { + try { + $job->perform(); + } + catch(Exception $e) { + $this->log($job . ' failed: ' . $e->getMessage()); + $job->fail($e); + return; + } + + $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); + $this->log('done ' . $job); + } + + /** + * Attempt to find a job from the top of one of the queues for this worker. + * + * @return object|boolean Instance of Resque_Job if a job is found, false if not. + */ + public function reserve() + { + $queues = $this->queues(); + if(!is_array($queues)) { + return; + } + foreach($queues as $queue) { + $this->log('Checking ' . $queue, self::LOG_VERBOSE); + $job = Resque_Job::reserve($queue); + if($job) { + $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + return $job; + } + } + + return false; + } + + /** + * Return an array containing all of the queues that this worker should use + * when searching for jobs. + * + * If * is found in the list of queues, every queue will be searched in + * alphabetic order. + * + * @return array Array of associated queues. + */ + public function queues() + { + if(!in_array('*', $this->queues)) { + return $this->queues; + } + + $queues = Resque::queues(); + sort($queues); + return $queues; + } + + /** + * Attempt to fork a child process from the parent to run a job in. + * + * Return values are those of pcntl_fork(). + * + * @return int -1 if the fork failed, 0 for the forked child, the PID of the child for the parent. + */ + private function fork() + { + if(!function_exists('pcntl_fork')) { + return false; + } + + $pid = pcntl_fork(); + if($pid === -1) { + throw new RuntimeException('Unable to fork child worker.'); + } + + return $pid; + } + + /** + * Perform necessary actions to start a worker. + */ + private function startup() + { + $this->registerSigHandlers(); + $this->pruneDeadWorkers(); + $this->registerWorker(); + } + + /** + * On supported systems (with the PECL proctitle module installed), update + * the name of the currently running process to indicate the current state + * of a worker. + * + * @param string $status The updated process title. + */ + private function updateProcLine($status) + { + if(function_exists('setproctitle')) { + setproctitle('resque-' . Resque::VERSION . ': ' . $status); + } + } + + /** + * Register signal handlers that a worker should respond to. + * + * TERM: Shutdown immediately and stop processing jobs. + * INT: Shutdown immediately and stop processing jobs. + * QUIT: Shutdown after the current job finishes processing. + * USR1: Kill the forked child immediately and continue processing jobs. + */ + private function registerSigHandlers() + { + if(!function_exists('pcntl_signal')) { + return; + } + + declare(ticks = 1); + pcntl_signal(SIGTERM, array($this, 'shutDownNow')); + pcntl_signal(SIGINT, array($this, 'shutDownNow')); + pcntl_signal(SIGQUIT, array($this, 'shutdown')); + pcntl_signal(SIGUSR1, array($this, 'killChild')); + pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); + pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); + $this->log('Registered signals', self::LOG_VERBOSE); + } + + /** + * Signal handler callback for USR2, pauses processing of new jobs. + */ + public function pauseProcessing() + { + $this->log('USR2 received; pausing job processing'); + $this->paused = true; + } + + /** + * Signal handler callback for CONT, resumes worker allowing it to pick + * up new jobs. + */ + public function unPauseProcessing() + { + $this->log('CONT received; resuming job processing'); + $this->paused = false; + } + + /** + * Schedule a worker for shutdown. Will finish processing the current job + * and when the timeout interval is reached, the worker will shut down. + */ + public function shutdown() + { + $this->shutdown = true; + $this->log('Exiting...'); + } + + /** + * Force an immediate shutdown of the worker, killing any child jobs + * currently running. + */ + public function shutdownNow() + { + $this->shutdown(); + $this->killChild(); + } + + /** + * Kill a forked child job immediately. The job it is processing will not + * be completed. + */ + public function killChild() + { + if(!$this->child) { + $this->log('No child to kill.', self::LOG_VERBOSE); + return; + } + + $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); + if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { + $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); + posix_kill($this->child, SIGKILL); + $this->child = null; + } + else { + $this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE); + $this->shutdown(); + } + } + + /** + * Look for any workers which should be running on this server and if + * they're not, remove them from Redis. + * + * This is a form of garbage collection to handle cases where the + * server may have been killed and the Resque workers did not die gracefully + * and therefore leave state information in Redis. + */ + public function pruneDeadWorkers() + { + $workerPids = $this->workerPids(); + if(empty($workerPids)) { + continue; + } + $workers = self::all(); + foreach($workers as $worker) { + list($host, $pid, $queues) = explode(':', (string)$worker, 3); + if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { + continue; + } + $this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE); + $worker->unregisterWorker(); + } + } + + /** + * Return an array of process IDs for all of the Resque workers currently + * running on this machine. + * + * @return array Array of Resque worker process IDs. + */ + public function workerPids() + { + $pids = array(); + exec('ps -A -o pid,command | grep [r]esque', $cmdOutput); + foreach($cmdOutput as $line) { + list($pids[],) = explode(' ', $line, 2); + } + return $pids; + } + + /** + * Register this worker in Redis. + */ + public function registerWorker() + { + Resque::redis()->sadd('workers', $this); + Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y')); + } + + /** + * Unregister this worker in Redis. (shutdown etc) + */ + public function unregisterWorker() + { + if(is_object($this->currentJob)) { + $this->currentJob->fail(new Resque_Job_DirtyExitException); + } + + $id = (string)$this; + Resque::redis()->srem('workers', $id); + Resque::redis()->del('worker:' . $id); + Resque::redis()->del('worker:' . $id . ':started'); + Resque_Stat::clear('processed:' . $id); + Resque_Stat::clear('failed:' . $id); + } + + /** + * Tell Redis which job we're currently working on. + * + * @param object $job Resque_Job instance containing the job we're working on. + */ + public function workingOn(Resque_Job $job) + { + $job->worker = $this; + $this->currentJob = $job; + $job->updateStatus(Resque_Job_Status::STATUS_RUNNING); + $data = json_encode(array( + 'queue' => $job->queue, + 'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'), + 'payload' => $job->payload + )); + Resque::redis()->set('worker:' . $job->worker, $data); + } + + /** + * Notify Redis that we've finished working on a job, clearing the working + * state and incrementing the job stats. + */ + public function doneWorking() + { + $this->currentJob = null; + Resque_Stat::incr('processed'); + Resque_Stat::incr('processed:' . (string)$this); + Resque::redis()->del('worker:' . (string)$this); + } + + /** + * Generate a string representation of this worker. + * + * @return string String identifier for this worker instance. + */ + public function __toString() + { + return $this->id; + } + + /** + * Output a given log message to STDOUT. + * + * @param string $message Message to output. + */ + public function log($message) + { + if($this->logLevel == self::LOG_NORMAL) { + fwrite(STDOUT, "*** " . $message . "\n"); + } + else if($this->logLevel == self::LOG_VERBOSE) { + fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n"); + } + } + + /** + * Return an object describing the job this worker is currently working on. + * + * @return object Object with details of current job. + */ + public function job() + { + $job = Resque::redis()->get('worker:' . $this); + if(!$job) { + return new stdClass; + } + else { + return json_decode($job); + } + } + + /** + * Get a statistic belonging to this worker. + * + * @param string $stat Statistic to fetch. + * @return int Statistic value. + */ + public function getStat($stat) + { + return Resque_Stat::get($stat . ':' . $this); + } +} +?> \ No newline at end of file diff --git a/phpunit.xml b/phpunit.xml new file mode 100644 index 0000000..efbc7f2 --- /dev/null +++ b/phpunit.xml @@ -0,0 +1,23 @@ + + + + + ./test/Resque/ + + + + + + ./lib/Resque/ + + + \ No newline at end of file diff --git a/resque.php b/resque.php new file mode 100644 index 0000000..9643346 --- /dev/null +++ b/resque.php @@ -0,0 +1,68 @@ + 1) { + $count = $_ENV['COUNT']; +} + +if($count > 1) { + for($i = 0; $i < $count; ++$i) { + $pid = pcntl_fork(); + if($pid == -1) { + die("Could not fork worker ".$i."\n"); + } + // Child, start the worker + else if(!$pid) { + $queues = explode(',', $_ENV['QUEUE']); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->work($interval); + break; + } + } +} +// Start a single worker +else { + $queues = explode(',', $_ENV['QUEUE']); + $worker = new Resque_Worker($queues); + $worker->logLevel = $logLevel; + fwrite(STDOUT, '*** Starting worker '.$worker."\n"); + $worker->work($interval); +} +?> \ No newline at end of file diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php new file mode 100644 index 0000000..5f0fd0f --- /dev/null +++ b/test/Resque/Tests/JobStatusTest.php @@ -0,0 +1,102 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase +{ + public function setUp() + { + parent::setUp(); + + // Register a worker to test with + $this->worker = new Resque_Worker('jobs'); + } + + public function testJobStatusCanBeTracked() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $status = new Resque_Job_Status($token); + $this->assertTrue($status->isTracking()); + } + + public function testJobStatusIsReturnedViaJobInstance() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $job = Resque_Job::reserve('jobs'); + $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $job->getStatus()); + } + + public function testQueuedJobReturnsQueuedStatus() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $status = new Resque_Job_Status($token); + $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get()); + } + public function testRunningJobReturnsRunningStatus() + { + $token = Resque::enqueue('jobs', 'Failing_Job', null, true); + $job = $this->worker->reserve(); + $this->worker->workingOn($job); + $status = new Resque_Job_Status($token); + $this->assertEquals(Resque_Job_Status::STATUS_RUNNING, $status->get()); + } + + public function testFailedJobReturnsFailedStatus() + { + $token = Resque::enqueue('jobs', 'Failing_Job', null, true); + $this->worker->work(0); + $status = new Resque_Job_Status($token); + $this->assertEquals(Resque_Job_Status::STATUS_FAILED, $status->get()); + } + + public function testCompletedJobReturnsCompletedStatus() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $this->worker->work(0); + $status = new Resque_Job_Status($token); + $this->assertEquals(Resque_Job_Status::STATUS_COMPLETE, $status->get()); + } + + public function testStatusIsNotTrackedWhenToldNotTo() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, false); + $status = new Resque_Job_Status($token); + $this->assertFalse($status->isTracking()); + } + + public function testStatusTrackingCanBeStopped() + { + Resque_Job_Status::create('test'); + $status = new Resque_Job_Status('test'); + $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get()); + $status->stop(); + $this->assertFalse($status->get()); + } + + public function testRecreatedJobWithTrackingStillTracksStatus() + { + $originalToken = Resque::enqueue('jobs', 'Test_Job', null, true); + $job = $this->worker->reserve(); + + // Mark this job as being worked on to ensure that the new status is still + // waiting. + $this->worker->workingOn($job); + + // Now recreate it + $newToken = $job->recreate(); + + // Make sure we've got a new job returned + $this->assertNotEquals($originalToken, $newToken); + + // Now check the status of the new job + $newJob = Resque_Job::reserve('jobs'); + $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $newJob->getStatus()); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php new file mode 100644 index 0000000..061dda3 --- /dev/null +++ b/test/Resque/Tests/JobTest.php @@ -0,0 +1,140 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_JobTest extends Resque_Tests_TestCase +{ + protected $worker; + + public function setUp() + { + parent::setUp(); + + // Register a worker to test with + $this->worker = new Resque_Worker('jobs'); + $this->worker->registerWorker(); + } + + public function tearDown() + { + $this->worker->unregisterWorker(); + } + + public function testJobCanBeQueued() + { + $this->assertTrue((bool)Resque::enqueue('jobs', 'Test_Job')); + } + + public function testQeueuedJobCanBeReserved() + { + Resque::enqueue('jobs', 'Test_Job'); + + $job = Resque_Job::reserve('jobs'); + if($job == false) { + $this->fail('Job could not be reserved.'); + } + $this->assertEquals('jobs', $job->queue); + $this->assertEquals('Test_Job', $job->payload->class); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testArrayArgumentsCannotBePassedToJob() + { + Resque::enqueue('jobs', 'Test_Job', array( + 'test' + )); + } + + public function testQueuedJobReturnsExactSamePassedInArguments() + { + $args = new stdClass; + $args->int = 123; + $args->numArray = array( + 1, + 2, + ); + $args->assocArray = new stdClass; + $args->assocArray->key1 = 'value1'; + $args->assocArray->key2 = 'value2'; + + Resque::enqueue('jobs', 'Test_Job', $args); + $job = Resque_Job::reserve('jobs'); + + $this->assertEquals($args, $job->payload->args); + } + + public function testAfterJobIsReservedItIsRemoved() + { + Resque::enqueue('jobs', 'Test_Job'); + Resque_Job::reserve('jobs'); + $this->assertFalse(Resque_Job::reserve('jobs')); + } + + public function testRecreatedJobMatchesExistingJob() + { + $args = new stdClass; + $args->int = 123; + $args->numArray = array( + 1, + 2, + ); + $args->assocArray = new stdClass; + $args->assocArray->key1 = 'value1'; + $args->assocArray->key2 = 'value2'; + + Resque::enqueue('jobs', 'Test_Job', $args); + $job = Resque_Job::reserve('jobs'); + + // Now recreate it + $job->recreate(); + + $newJob = Resque_Job::reserve('jobs'); + $this->assertEquals($job->payload->class, $newJob->payload->class); + $this->assertEquals($job->payload->args, $newJob->payload->args); + } + + public function testFailedJobExceptionsAreCaught() + { + $payload = new stdClass; + $payload->class = 'Failing_Job'; + $payload->args = null; + $job = new Resque_Job('jobs', $payload); + $job->worker = $this->worker; + + $this->worker->perform($job); + + $this->assertEquals(1, Resque_Stat::get('failed')); + $this->assertEquals(1, Resque_Stat::get('failed:'.$this->worker)); + } + + /** + * @expectedException Resque_Exception + */ + public function testJobWithoutPerformMethodThrowsException() + { + Resque::enqueue('jobs', 'Test_Job_Without_Perform_Method'); + $job = $this->worker->reserve(); + $job->worker = $this->worker; + $job->perform(); + } + + /** + * @expectedException Resque_Exception + */ + public function testInvalidJobThrowsException() + { + Resque::enqueue('jobs', 'Invalid_Job'); + $job = $this->worker->reserve(); + $job->worker = $this->worker; + $job->perform(); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/StatTest.php b/test/Resque/Tests/StatTest.php new file mode 100644 index 0000000..6404794 --- /dev/null +++ b/test/Resque/Tests/StatTest.php @@ -0,0 +1,52 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_StatTest extends Resque_Tests_TestCase +{ + public function testStatCanBeIncremented() + { + Resque_Stat::incr('test_incr'); + Resque_Stat::incr('test_incr'); + $this->assertEquals(2, $this->redis->get('resque:stat:test_incr')); + } + + public function testStatCanBeIncrementedByX() + { + Resque_Stat::incr('test_incrX', 10); + Resque_Stat::incr('test_incrX', 11); + $this->assertEquals(21, $this->redis->get('resque:stat:test_incrX')); + } + + public function testStatCanBeDecremented() + { + Resque_Stat::incr('test_decr', 22); + Resque_Stat::decr('test_decr'); + $this->assertEquals(21, $this->redis->get('resque:stat:test_decr')); + } + + public function testStatCanBeDecrementedByX() + { + Resque_Stat::incr('test_decrX', 22); + Resque_Stat::decr('test_decrX', 11); + $this->assertEquals(11, $this->redis->get('resque:stat:test_decrX')); + } + + public function testGetStatByName() + { + Resque_Stat::incr('test_get', 100); + $this->assertEquals(100, Resque_Stat::get('test_get')); + } + + public function testGetUnknownStatReturns0() + { + $this->assertEquals(0, Resque_Stat::get('test_get_unknown')); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/TestCase.php b/test/Resque/Tests/TestCase.php new file mode 100644 index 0000000..f4c00df --- /dev/null +++ b/test/Resque/Tests/TestCase.php @@ -0,0 +1,24 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_TestCase extends PHPUnit_Framework_TestCase +{ + protected $resque; + protected $redis; + + public function setUp() + { + $config = file_get_contents(REDIS_CONF); + preg_match('#^\s*port\s+([0-9]+)#m', $config, $matches); + $this->redis = new Redisent('localhost', $matches[1]); + + // Flush redis + $this->redis->flushAll(); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php new file mode 100644 index 0000000..04f7c23 --- /dev/null +++ b/test/Resque/Tests/WorkerTest.php @@ -0,0 +1,251 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_WorkerTest extends Resque_Tests_TestCase +{ + public function testWorkerRegistersInList() + { + $worker = new Resque_Worker('*'); + $worker->registerWorker(); + + // Make sure the worker is in the list + $this->assertTrue((bool)$this->redis->sismember('resque:workers', (string)$worker)); + } + + public function testGetAllWorkers() + { + $num = 3; + // Register a few workers + for($i = 0; $i < $num; ++$i) { + $worker = new Resque_Worker('queue_' . $i); + $worker->registerWorker(); + } + + // Now try to get them + $this->assertEquals($num, count(Resque_Worker::all())); + } + + public function testGetWorkerById() + { + $worker = new Resque_Worker('*'); + $worker->registerWorker(); + + $newWorker = Resque_Worker::find((string)$worker); + $this->assertEquals((string)$worker, (string)$newWorker); + } + + public function testInvalidWorkerDoesNotExist() + { + $this->assertFalse(Resque_Worker::exists('blah')); + } + + public function testWorkerCanUnregister() + { + $worker = new Resque_Worker('*'); + $worker->registerWorker(); + $worker->unregisterWorker(); + + $this->assertFalse(Resque_Worker::exists((string)$worker)); + $this->assertEquals(array(), Resque_Worker::all()); + $this->assertNull($this->redis->smembers('redis:workers')); + } + + public function testPausedWorkerDoesNotPickUpJobs() + { + $worker = new Resque_Worker('*'); + $worker->pauseProcessing(); + Resque::enqueue('jobs', 'Test_Job'); + $worker->work(0); + $worker->work(0); + $this->assertEquals(0, Resque_Stat::get('processed')); + } + + public function testResumedWorkerPicksUpJobs() + { + $worker = new Resque_Worker('*'); + $worker->pauseProcessing(); + Resque::enqueue('jobs', 'Test_Job'); + $worker->work(0); + $this->assertEquals(0, Resque_Stat::get('processed')); + $worker->unPauseProcessing(); + $worker->work(0); + $this->assertEquals(1, Resque_Stat::get('processed')); + } + + public function testWorkerCanWorkOverMultipleQueues() + { + $worker = new Resque_Worker(array( + 'queue1', + 'queue2' + )); + $worker->registerWorker(); + Resque::enqueue('queue1', 'Test_Job_1'); + Resque::enqueue('queue2', 'Test_Job_2'); + + $job = $worker->reserve(); + $this->assertEquals('queue1', $job->queue); + + $job = $worker->reserve(); + $this->assertEquals('queue2', $job->queue); + } + + public function testWorkerWorksQueuesInSpecifiedOrder() + { + $worker = new Resque_Worker(array( + 'high', + 'medium', + 'low' + )); + $worker->registerWorker(); + + // Queue the jobs in a different order + Resque::enqueue('low', 'Test_Job_1'); + Resque::enqueue('high', 'Test_Job_2'); + Resque::enqueue('medium', 'Test_Job_3'); + + // Now check we get the jobs back in the right order + $job = $worker->reserve(); + $this->assertEquals('high', $job->queue); + + $job = $worker->reserve(); + $this->assertEquals('medium', $job->queue); + + $job = $worker->reserve(); + $this->assertEquals('low', $job->queue); + } + + public function testWildcardQueueWorkerWorksAllQueues() + { + $worker = new Resque_Worker('*'); + $worker->registerWorker(); + + Resque::enqueue('queue1', 'Test_Job_1'); + Resque::enqueue('queue2', 'Test_Job_2'); + + $job = $worker->reserve(); + $this->assertEquals('queue1', $job->queue); + + $job = $worker->reserve(); + $this->assertEquals('queue2', $job->queue); + } + + public function testWorkerDoesNotWorkOnUnknownQueues() + { + $worker = new Resque_Worker('queue1'); + $worker->registerWorker(); + Resque::enqueue('queue2', 'Test_Job'); + + $this->assertFalse($worker->reserve()); + } + + public function testWorkerClearsItsStatusWhenNotWorking() + { + Resque::enqueue('jobs', 'Test_Job'); + $worker = new Resque_Worker('jobs'); + $job = $worker->reserve(); + $worker->workingOn($job); + $worker->doneWorking(); + $this->assertEquals(new stdClass, $worker->job()); + } + + public function testWorkerRecordsWhatItIsWorkingOn() + { + $worker = new Resque_Worker('jobs'); + $worker->registerWorker(); + + $payload = new stdClass; + $payload->class = 'Test_Job'; + $job = new Resque_Job('jobs', $payload); + $worker->workingOn($job); + + $job = $worker->job(); + $this->assertEquals('jobs', $job->queue); + if(!isset($job->run_at)) { + $this->fail('Job does not have run_at time'); + } + $this->assertEquals($payload, $job->payload); + } + + public function testWorkerErasesItsStatsWhenShutdown() + { + Resque::enqueue('jobs', 'Test_Job'); + Resque::enqueue('jobs', 'Invalid_Job'); + + $worker = new Resque_Worker('jobs'); + $worker->work(0); + $worker->work(0); + + $this->assertEquals(0, $worker->getStat('processed')); + $this->assertEquals(0, $worker->getStat('failed')); + } + + public function testWorkerCleansUpDeadWorkersOnStartup() + { + // Register a good worker + $goodWorker = new Resque_Worker('jobs'); + $goodWorker->registerWorker(); + $workerId = explode(':', $goodWorker); + + // Register some bad workers + $worker = new Resque_Worker('jobs'); + $worker->setId($workerId[0].':1:jobs'); + $worker->registerWorker(); + + $worker = new Resque_Worker(array('high', 'low')); + $worker->setId($workerId[0].':2:high,low'); + $worker->registerWorker(); + + $this->assertEquals(3, count(Resque_Worker::all())); + + $goodWorker->pruneDeadWorkers(); + + // There should only be $goodWorker left now + $this->assertEquals(1, count(Resque_Worker::all())); + } + + public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() + { + // Register a bad worker on this machine + $worker = new Resque_Worker('jobs'); + $workerId = explode(':', $worker); + $worker->setId($workerId[0].':1:jobs'); + $worker->registerWorker(); + + // Register some other false workers + $worker = new Resque_Worker('jobs'); + $worker->setId('my.other.host:1:jobs'); + $worker->registerWorker(); + + $this->assertEquals(2, count(Resque_Worker::all())); + + $worker->pruneDeadWorkers(); + + // my.other.host should be left + $workers = Resque_Worker::all(); + $this->assertEquals(1, count($workers)); + $this->assertEquals((string)$worker, (string)$workers[0]); + } + + public function testWorkerFailsUncompletedJobsOnExit() + { + $worker = new Resque_Worker('jobs'); + $worker->registerWorker(); + + $payload = new stdClass; + $payload->class = 'Test_Job'; + $job = new Resque_Job('jobs', $payload); + + $worker->workingOn($job); + $worker->unregisterWorker(); + + $this->assertEquals(1, Resque_Stat::get('failed')); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/bootstrap.php b/test/Resque/Tests/bootstrap.php new file mode 100644 index 0000000..502ade6 --- /dev/null +++ b/test/Resque/Tests/bootstrap.php @@ -0,0 +1,116 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +define('CWD', dirname(__FILE__)); +define('RESQUE_LIB', CWD . '/../../../lib/'); + +define('TEST_MISC', realpath(CWD . '/../../misc/')); +define('REDIS_CONF', TEST_MISC . '/redis.conf'); + +// Change to the directory this file lives in. This is important, due to +// how we'll be running redis. + +require_once CWD . '/TestCase.php'; + +// Include Resque +require_once RESQUE_LIB . 'Resque.php'; +require_once RESQUE_LIB . 'Resque/Worker.php'; + +// Attempt to start our own redis instance for tesitng. +exec('which redis-server', $output, $returnVar); +if($returnVar != 0) { + echo "Cannot find redis-server in path. Please make sure redis is installed.\n"; + exit(1); +} + +exec('cd ' . TEST_MISC . '; redis-server ' . REDIS_CONF, $output, $returnVar); +if($returnVar != 0) { + echo "Cannot start redis-server.\n"; + exit(1); + +} + +// Get redis port from conf +$config = file_get_contents(REDIS_CONF); +if(!preg_match('#^\s*port\s+([0-9]+)#m', $config, $matches)) { + echo "Could not determine redis port from redis.conf"; + exit(1); +} + +Resque::setBackend('localhost:' . $matches[1]); + +// Shutdown +function killRedis() +{ + $config = file_get_contents(REDIS_CONF); + if(!preg_match('#^\s*pidfile\s+([^\s]+)#m', $config, $matches)) { + return; + } + + $pidFile = TEST_MISC . '/' . $matches[1]; + $pid = trim(file_get_contents($pidFile)); + posix_kill($pid, 9); + + if(is_file($pidFile)) { + unlink($pidFile); + } + + // Remove the redis database + if(!preg_match('#^\s*dir\s+([^\s]+)#m', $config, $matches)) { + return; + } + $dir = $matches[1]; + + if(!preg_match('#^\s*dbfilename\s+([^\s]+)#m', $config, $matches)) { + return; + } + + $filename = TEST_MISC . '/' . $dir . '/' . $matches[1]; + if(is_file($filename)) { + unlink($filename); + } +} +register_shutdown_function('killRedis'); + +if(function_exists('pcntl_signal')) { + // Override INT and TERM signals, so they do a clean shutdown and also + // clean up redis-server as well. + function sigint() + { + exit; + } + pcntl_signal(SIGINT, 'sigint'); + pcntl_signal(SIGTERM, 'sigint'); +} + +class Test_Job +{ + public function perform() + { + + } +} + +class Failing_Job_Exception extends Exception +{ + +} + +class Failing_Job +{ + public function perform() + { + throw new Failing_Job_Exception('Message!'); + } +} + +class Test_Job_Without_Perform_Method +{ + +} \ No newline at end of file diff --git a/test/misc/redis.conf b/test/misc/redis.conf new file mode 100644 index 0000000..971f66e --- /dev/null +++ b/test/misc/redis.conf @@ -0,0 +1,8 @@ +daemonize yes +pidfile ./redis.pid +port 6379 +bind 127.0.0.1 +timeout 300 +dbfilename dump.rdb +dir ./ +loglevel debug \ No newline at end of file From e3c953baf1e9811eb35bf41c44fba2529d2c916f Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 19 Apr 2010 00:12:14 +1000 Subject: [PATCH 02/15] Fix up enqueue example arguments - they're now an object. Add example of how to access arguments in the example job --- README.markdown | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.markdown b/README.markdown index a00c3e3..838102b 100644 --- a/README.markdown +++ b/README.markdown @@ -45,9 +45,8 @@ Jobs are queued as follows: // Required if redis is located elsewhere Resque::setBackend('localhost', 6379); - $args = array( - 'name' => 'Chris' - ); + $args = new stdClass; + $args->name = 'Chris'; Resque::enqueue('default', 'My_Job', $args); ### Defining Jobs ### @@ -60,6 +59,7 @@ It's important to note that classes are called statically. public static function perform($args) { // Work work work + echo $args->name; } } From 7ef1ebbd97d6feb876e0a5a7ae9c3c13d2a08ffc Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 19 Apr 2010 00:12:35 +1000 Subject: [PATCH 03/15] Fix up demo/queue.php job arguments --- demo/queue.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/demo/queue.php b/demo/queue.php index 6ae4962..29ac3ad 100644 --- a/demo/queue.php +++ b/demo/queue.php @@ -10,10 +10,10 @@ Resque::setBackend('127.0.0.1:6379'); $class = new stdClass; $class->test = 'test'; -$args = array( - time(), - $class -); +$args = new stdClass; +$args->time = time(); +$args->class = $class; + $jobId = Resque::enqueue('default', $argv[1], $args, true); echo "Queued job ".$jobId."\n\n"; ?> \ No newline at end of file From c5396f4e8626d99db64aaddef4c95be835c00bb2 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 19 Apr 2010 10:35:50 +1000 Subject: [PATCH 04/15] Change arguments for jobs to an array instead of an object. Also change other json encoded items to decode to an array rather than objects --- CHANGELOG.markdown | 4 +++ README.markdown | 9 ++--- demo/queue.php | 12 +++---- lib/Resque.php | 2 +- lib/Resque/Job.php | 44 ++++++++++------------- lib/Resque/Job/Status.php | 4 +-- lib/Resque/Worker.php | 4 +-- resque.php | 2 +- test/Resque/Tests/JobTest.php | 60 +++++++++++++++++--------------- test/Resque/Tests/WorkerTest.php | 18 +++++----- 10 files changed, 81 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index 9944cd8..59b9d10 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -1,3 +1,7 @@ +## 1.1 (????-??-??) ## +* Change arguments for jobs to be an array as they're easier to work with in +PHP + ## 1.0 (2010-04-18) ## * Initial release \ No newline at end of file diff --git a/README.markdown b/README.markdown index 838102b..b8aa207 100644 --- a/README.markdown +++ b/README.markdown @@ -45,8 +45,9 @@ Jobs are queued as follows: // Required if redis is located elsewhere Resque::setBackend('localhost', 6379); - $args = new stdClass; - $args->name = 'Chris'; + $args = array( + 'name' => 'Chris' + ); Resque::enqueue('default', 'My_Job', $args); ### Defining Jobs ### @@ -59,7 +60,7 @@ It's important to note that classes are called statically. public static function perform($args) { // Work work work - echo $args->name; + echo $args['name']; } } @@ -77,7 +78,7 @@ To track the status of a job, pass `true` as the fourth argument to `Resque::enqueue`. A token used for tracking the job status will be returned: - $token = Resque::enqueue('default', 'My_Job', $args); + $token = Resque::enqueue('default', 'My_Job', $args, true); echo $token; To fetch the status of a job: diff --git a/demo/queue.php b/demo/queue.php index 29ac3ad..6a94e44 100644 --- a/demo/queue.php +++ b/demo/queue.php @@ -7,12 +7,12 @@ require '../lib/Resque.php'; date_default_timezone_set('GMT'); Resque::setBackend('127.0.0.1:6379'); -$class = new stdClass; -$class->test = 'test'; - -$args = new stdClass; -$args->time = time(); -$args->class = $class; +$args = array( + 'time' => time(), + 'array' => array( + 'test' => 'test', + ), +); $jobId = Resque::enqueue('default', $argv[1], $args, true); echo "Queued job ".$jobId."\n\n"; diff --git a/lib/Resque.php b/lib/Resque.php index ebb1c8a..705f1eb 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -73,7 +73,7 @@ class Resque return; } - return json_decode($item); + return json_decode($item, true); } /** diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 9553e57..f165f2f 100644 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -43,14 +43,14 @@ class Resque_Job * * @param string $queue The name of the queue to place the job in. * @param string $class The name of the class that contains the code to execute the job. - * @param object $args Any optional arguments that should be passed when the job is executed. Pass as a class. + * @param array $args Any optional arguments that should be passed when the job is executed. * @param boolean $monitor Set to true to be able to monitor the status of a job. */ public static function create($queue, $class, $args = null, $monitor = false) { - if($args !== null && !is_object($args)) { + if($args !== null && !is_array($args)) { throw new InvalidArgumentException( - 'Supplied $args must be an object and an instance of stdClass.' + 'Supplied $args must be an array.' ); } $id = md5(uniqid('', true)); @@ -95,7 +95,7 @@ class Resque_Job return; } - $statusInstance = new Resque_Job_Status($this->payload->id); + $statusInstance = new Resque_Job_Status($this->payload['id']); $statusInstance->update($status); } @@ -106,7 +106,7 @@ class Resque_Job */ public function getStatus() { - $status = new Resque_Job_Status($this->payload->id); + $status = new Resque_Job_Status($this->payload['id']); return $status->get(); } @@ -118,19 +118,19 @@ class Resque_Job */ public function perform() { - if(!class_exists($this->payload->class)) { + if(!class_exists($this->payload['class'])) { throw new Resque_Exception( - 'Could not find job class ' . $this->payload->class . '.' + 'Could not find job class ' . $this->payload['class'] . '.' ); } - if(!method_exists($this->payload->class, 'perform')) { + if(!method_exists($this->payload['class'], 'perform')) { throw new Resque_Exception( - 'Job class ' . $this->payload->class . ' does not contain a perform method.' + 'Job class ' . $this->payload['class'] . ' does not contain a perform method.' ); } - call_user_func(array($this->payload->class, 'perform'), $this->payload->args); + call_user_func(array($this->payload['class'], 'perform'), $this->payload['args']); } /** @@ -155,13 +155,13 @@ class Resque_Job */ public function recreate() { - $status = new Resque_Job_Status($this->payload->id); + $status = new Resque_Job_Status($this->payload['id']); $monitor = false; if($status->isTracking()) { $monitor = true; } - return self::create($this->queue, $this->payload->class, $this->payload->args, $monitor); + return self::create($this->queue, $this->payload['class'], $this->payload['args'], $monitor); } /** @@ -171,24 +171,16 @@ class Resque_Job */ public function __toString() { - $args = array(); - if(isset($this->payload->args)) { - $args = $this->payload->args; - foreach($args as $k => $v) { - if(is_object($v)) { - $args[$k] = '{' . get_class($v) . ' - '.implode(',', get_object_vars($v)) . '}'; - } - } - } - $name = array( 'Job{' . $this->queue .'}' ); - if(!empty($this->payload->id)) { - $name[] = 'ID: ' . $this->payload->id; + if(!empty($this->payload['id'])) { + $name[] = 'ID: ' . $this->payload['id']; + } + $name[] = $this->payload['class']; + if(!empty($this->payload['args'])) { + $name[] = json_encode($this->payload['args']); } - $name[] = $this->payload->class; - $name[] = implode(',', $args); return '(' . implode(' | ', $name) . ')'; } } diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php index 8b45e98..e1554b0 100644 --- a/lib/Resque/Job/Status.php +++ b/lib/Resque/Job/Status.php @@ -115,12 +115,12 @@ class Resque_Job_Status return false; } - $statusPacket = json_decode(Resque::redis()->get((string)$this)); + $statusPacket = json_decode(Resque::redis()->get((string)$this), true); if(!$statusPacket) { return false; } - return $statusPacket->status; + return $statusPacket['status']; } /** diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index e2516d3..1ee24a9 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -543,10 +543,10 @@ class Resque_Worker { $job = Resque::redis()->get('worker:' . $this); if(!$job) { - return new stdClass; + return array(); } else { - return json_decode($job); + return json_decode($job, true); } } diff --git a/resque.php b/resque.php index 9643346..eb6aec9 100644 --- a/resque.php +++ b/resque.php @@ -16,7 +16,7 @@ if(!empty($_ENV['APP_INCLUDE'])) { } require 'lib/Resque.php'; -require 'Resque/Worker.php'; +require 'lib/Resque/Worker.php'; if(!empty($_ENV['REDIS_BACKEND'])) { Resque::setBackend($_ENV['REDIS_BACKEND']); diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index 061dda3..4505edf 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -41,35 +41,36 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $this->fail('Job could not be reserved.'); } $this->assertEquals('jobs', $job->queue); - $this->assertEquals('Test_Job', $job->payload->class); + $this->assertEquals('Test_Job', $job->payload['class']); } /** * @expectedException InvalidArgumentException */ - public function testArrayArgumentsCannotBePassedToJob() + public function testObjectArgumentsCannotBePassedToJob() { - Resque::enqueue('jobs', 'Test_Job', array( - 'test' - )); + $args = new stdClass; + $args->test = 'somevalue'; + Resque::enqueue('jobs', 'Test_Job', $args); } public function testQueuedJobReturnsExactSamePassedInArguments() { - $args = new stdClass; - $args->int = 123; - $args->numArray = array( - 1, - 2, + $args = array( + 'int' => 123, + 'numArray' => array( + 1, + 2, + ), + 'assocArray' => array( + 'key1' => 'value1', + 'key2' => 'value2' + ), ); - $args->assocArray = new stdClass; - $args->assocArray->key1 = 'value1'; - $args->assocArray->key2 = 'value2'; - Resque::enqueue('jobs', 'Test_Job', $args); $job = Resque_Job::reserve('jobs'); - $this->assertEquals($args, $job->payload->args); + $this->assertEquals($args, $job->payload['args']); } public function testAfterJobIsReservedItIsRemoved() @@ -81,15 +82,17 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase public function testRecreatedJobMatchesExistingJob() { - $args = new stdClass; - $args->int = 123; - $args->numArray = array( - 1, - 2, + $args = array( + 'int' => 123, + 'numArray' => array( + 1, + 2, + ), + 'assocArray' => array( + 'key1' => 'value1', + 'key2' => 'value2' + ), ); - $args->assocArray = new stdClass; - $args->assocArray->key1 = 'value1'; - $args->assocArray->key2 = 'value2'; Resque::enqueue('jobs', 'Test_Job', $args); $job = Resque_Job::reserve('jobs'); @@ -98,15 +101,16 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $job->recreate(); $newJob = Resque_Job::reserve('jobs'); - $this->assertEquals($job->payload->class, $newJob->payload->class); - $this->assertEquals($job->payload->args, $newJob->payload->args); + $this->assertEquals($job->payload['class'], $newJob->payload['class']); + $this->assertEquals($job->payload['args'], $newJob->payload['args']); } public function testFailedJobExceptionsAreCaught() { - $payload = new stdClass; - $payload->class = 'Failing_Job'; - $payload->args = null; + $payload = array( + 'class' => 'Failing_Job', + 'args' => null + ); $job = new Resque_Job('jobs', $payload); $job->worker = $this->worker; diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index 04f7c23..e9ab120 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -153,7 +153,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase $job = $worker->reserve(); $worker->workingOn($job); $worker->doneWorking(); - $this->assertEquals(new stdClass, $worker->job()); + $this->assertEquals(array(), $worker->job()); } public function testWorkerRecordsWhatItIsWorkingOn() @@ -161,17 +161,18 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase $worker = new Resque_Worker('jobs'); $worker->registerWorker(); - $payload = new stdClass; - $payload->class = 'Test_Job'; + $payload = array( + 'class' => 'Test_Job' + ); $job = new Resque_Job('jobs', $payload); $worker->workingOn($job); $job = $worker->job(); - $this->assertEquals('jobs', $job->queue); - if(!isset($job->run_at)) { + $this->assertEquals('jobs', $job['queue']); + if(!isset($job['run_at'])) { $this->fail('Job does not have run_at time'); } - $this->assertEquals($payload, $job->payload); + $this->assertEquals($payload, $job['payload']); } public function testWorkerErasesItsStatsWhenShutdown() @@ -239,8 +240,9 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase $worker = new Resque_Worker('jobs'); $worker->registerWorker(); - $payload = new stdClass; - $payload->class = 'Test_Job'; + $payload = array( + 'class' => 'Test_Job' + ); $job = new Resque_Job('jobs', $payload); $worker->workingOn($job); From 86a5d8830992092dccf2ca4c1b32754b3ae67fb7 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 19 Apr 2010 10:36:01 +1000 Subject: [PATCH 05/15] Update todo list --- TODO.markdown | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/TODO.markdown b/TODO.markdown index db6b6d6..2aaca82 100644 --- a/TODO.markdown +++ b/TODO.markdown @@ -3,4 +3,6 @@ * `Resque_Failure_Redis` * Plugin/hook type system similar to Ruby version * Change to preforking worker model -* Clean up /bin and /demo \ No newline at end of file +* Clean up /bin and /demo +* Add a way to store arbitrary text in job statuses (for things like progress +indicators) \ No newline at end of file From 5ac7e90374c86f99956890f4a28d98a39feb5f03 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 19 Apr 2010 14:57:14 +1000 Subject: [PATCH 06/15] Fix cannot break/continue error when pruning dead workers and none are found. continue should have been return --- lib/Resque/Worker.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 1ee24a9..13a281f 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -424,7 +424,7 @@ class Resque_Worker { $workerPids = $this->workerPids(); if(empty($workerPids)) { - continue; + return; } $workers = self::all(); foreach($workers as $worker) { From 3f04a0a971598d3c016908984d56eb52c25d0839 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 19 Apr 2010 15:01:47 +1000 Subject: [PATCH 07/15] Remove unnecessary call to worker->unregisterWorker in the JobTest teardown method --- test/Resque/Tests/JobTest.php | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index 4505edf..46311e5 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -22,11 +22,6 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $this->worker->registerWorker(); } - public function tearDown() - { - $this->worker->unregisterWorker(); - } - public function testJobCanBeQueued() { $this->assertTrue((bool)Resque::enqueue('jobs', 'Test_Job')); From b71031908d17c1887398270d6af8905882dd3a74 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 19 Apr 2010 15:07:11 +1000 Subject: [PATCH 08/15] Fix typo in testWorkerCanUnregister test - redis should have been resque --- test/Resque/Tests/WorkerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index e9ab120..47b0208 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -55,7 +55,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase $this->assertFalse(Resque_Worker::exists((string)$worker)); $this->assertEquals(array(), Resque_Worker::all()); - $this->assertNull($this->redis->smembers('redis:workers')); + $this->assertEquals(array(), $this->redis->smembers('resque:workers')); } public function testPausedWorkerDoesNotPickUpJobs() From 94fed1cfb41a8ecbd54a80299371604b80d51f51 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Mon, 19 Apr 2010 15:44:46 +1000 Subject: [PATCH 09/15] Don't return from pruneDeadWorkers if there are no worker pids --- lib/Resque/Worker.php | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 13a281f..6779470 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -423,9 +423,6 @@ class Resque_Worker public function pruneDeadWorkers() { $workerPids = $this->workerPids(); - if(empty($workerPids)) { - return; - } $workers = self::all(); foreach($workers as $worker) { list($host, $pid, $queues) = explode(':', (string)$worker, 3); From 6e6d7ad85938fe0001de3621b19731439dd07664 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Tue, 20 Apr 2010 10:02:34 +1000 Subject: [PATCH 10/15] Add setUp and tearDown callbacks for jobs --- CHANGELOG.markdown | 4 +++- README.markdown | 32 ++++++++++++++++++++++++++++++ TODO.markdown | 6 ++++-- lib/Resque/Job.php | 8 ++++++++ test/Resque/Tests/JobTest.php | 32 ++++++++++++++++++++++++++++++ test/Resque/Tests/bootstrap.php | 35 +++++++++++++++++++++++++++++++++ 6 files changed, 114 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index 59b9d10..8c77002 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -1,6 +1,8 @@ ## 1.1 (????-??-??) ## * Change arguments for jobs to be an array as they're easier to work with in -PHP +PHP. +* Implement ability to have setUp and tearDown methods for jobs, called before +and after every single run. ## 1.0 (2010-04-18) ## diff --git a/README.markdown b/README.markdown index b8aa207..1daf4a2 100644 --- a/README.markdown +++ b/README.markdown @@ -33,6 +33,8 @@ In addition, it also: * Has the ability to track the status of jobs * Will mark a job as failed, if a forked child running a job does not exit with a status code as 0 +* Has built in support for `setUp` and `tearDown` methods, called +pre and post jobs ## Jobs ## @@ -68,6 +70,36 @@ Any exception thrown by a job will result in the job failing - be careful here and make sure you handle the exceptions that shouldn't result in a job failing. +Jobs can also have `setUp` and `tearDown` methods. If a `setUp` method +is defined, it will be called along with `$args` before the `perform` +method is run. The `tearDown` method if defined, will be called with +`$args` also, after the job finishes. + + class My_Job + { + public static function setUp($args) + { + // ... Set up environment for this job + } + + public static function perform($args) + { + // .. Run job + } + + public static function tearDown($args) + { + // ... Remove environment for this job + } + } + +It is **IMPORTANT** to note, that on operating systems where Resque +cannot fork to run a job (Mac OS X, or other platforms where the PHP +process control functions are unavailable), that because job classes +are static, their state will be retained between job calls. **ALWAYS** +reset the environment back to how you got it if you're using a `setUp` +method, by resetting changes in a `tearDown` method. + ### Tracking Job Statuses ### php-resque has the ability to perform basic status tracking of a queued diff --git a/TODO.markdown b/TODO.markdown index 2aaca82..d8ee07a 100644 --- a/TODO.markdown +++ b/TODO.markdown @@ -1,8 +1,10 @@ * Write tests for: * `Resque_Failure` * `Resque_Failure_Redis` -* Plugin/hook type system similar to Ruby version +* Plugin/hook type system similar to Ruby version (when done, implement the +setUp and tearDown methods as a plugin) * Change to preforking worker model * Clean up /bin and /demo * Add a way to store arbitrary text in job statuses (for things like progress -indicators) \ No newline at end of file +indicators) +* Write plugin for Ruby resque that calls setUp and tearDown methods \ No newline at end of file diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index f165f2f..375240a 100644 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -129,8 +129,16 @@ class Resque_Job 'Job class ' . $this->payload['class'] . ' does not contain a perform method.' ); } + + if(method_exists($this->payload['class'], 'setUp')) { + call_user_func(array($this->payload['class'], 'setUp'), $this->payload['args']); + } call_user_func(array($this->payload['class'], 'perform'), $this->payload['args']); + + if(method_exists($this->payload['class'], 'tearDown')) { + call_user_func(array($this->payload['class'], 'tearDown'), $this->payload['args']); + } } /** diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index 46311e5..9e3b461 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -136,4 +136,36 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $job->worker = $this->worker; $job->perform(); } + + public function testJobWithSetUpCallbackFiresSetUp() + { + $payload = array( + 'class' => 'Test_Job_With_SetUp', + 'args' => array( + 'somevar', + 'somevar2', + ), + ); + $job = new Resque_Job('jobs', $payload); + $job->perform(); + + $this->assertTrue(Test_Job_With_SetUp::$called); + $this->assertEquals($payload['args'], Test_Job_With_SetUp::$data); + } + + public function testJobWithTearDownCallbackFiresSetUp() + { + $payload = array( + 'class' => 'Test_Job_With_TearDown', + 'args' => array( + 'somevar', + 'somevar2', + ), + ); + $job = new Resque_Job('jobs', $payload); + $job->perform(); + + $this->assertTrue(Test_Job_With_TearDown::$called); + $this->assertEquals($payload['args'], Test_Job_With_TearDown::$data); + } } \ No newline at end of file diff --git a/test/Resque/Tests/bootstrap.php b/test/Resque/Tests/bootstrap.php index 502ade6..d2efc27 100644 --- a/test/Resque/Tests/bootstrap.php +++ b/test/Resque/Tests/bootstrap.php @@ -113,4 +113,39 @@ class Failing_Job class Test_Job_Without_Perform_Method { +} + +class Test_Job_With_SetUp +{ + public static $called = false; + public static $data = false; + + public function setUp($data) + { + self::$called = true; + self::$data = $data; + } + + public function perform($data) + { + + } +} + + +class Test_Job_With_TearDown +{ + public static $called = false; + public static $data = false; + + public function perform($data) + { + + } + + public function tearDown($data) + { + self::$called = true; + self::$data = $data; + } } \ No newline at end of file From 9ce7cfb370a23e9ffd9e5b2b86a0ca95abab2b06 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Tue, 20 Apr 2010 10:59:06 +1000 Subject: [PATCH 11/15] Change all require statements to include the full path. --- README.markdown | 2 +- lib/Resque.php | 8 ++++---- lib/Resque/Failure.php | 4 ++-- lib/Resque/Job.php | 4 ++-- lib/Resque/Worker.php | 6 +++--- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/README.markdown b/README.markdown index 1daf4a2..0f2289c 100644 --- a/README.markdown +++ b/README.markdown @@ -42,7 +42,7 @@ pre and post jobs Jobs are queued as follows: - require_once 'Resque.php'; + require_once 'lib/Resque.php'; // Required if redis is located elsewhere Resque::setBackend('localhost', 6379); diff --git a/lib/Resque.php b/lib/Resque.php index 705f1eb..3a04f18 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -1,5 +1,5 @@ updateStatus(Resque_Job_Status::STATUS_FAILED); - require_once 'Failure.php'; + require_once dirname(__FILE__) . '/Failure.php'; Resque_Failure::create( $this->payload, $exception, diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 6779470..2f4a1e0 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -1,7 +1,7 @@ Date: Fri, 28 May 2010 04:26:37 +0800 Subject: [PATCH 12/15] adding redis cluster support --- lib/Resque.php | 15 ++++-- lib/Resque/RedisCluster.php | 101 ++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 lib/Resque/RedisCluster.php diff --git a/lib/Resque.php b/lib/Resque.php index 3a04f18..df91fc3 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -26,10 +26,17 @@ class Resque */ public static function setBackend($server) { - list($host, $port) = explode(':', $server); + if(is_array($server)) { - require_once dirname(__FILE__) . '/Resque/Redis.php'; - self::$redis = new Resque_Redis($host, $port); + require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; + self::$redis = new Resque_RedisCluster($server); + + }else{ + list($host, $port) = explode(':', $server); + + require_once dirname(__FILE__) . '/Resque/Redis.php'; + self::$redis = new Resque_Redis($host, $port); + } } /** @@ -125,4 +132,4 @@ class Resque } return $queues; } -} \ No newline at end of file +} diff --git a/lib/Resque/RedisCluster.php b/lib/Resque/RedisCluster.php new file mode 100644 index 0000000..6ac15e3 --- /dev/null +++ b/lib/Resque/RedisCluster.php @@ -0,0 +1,101 @@ + + * @copyright (c) 2010 Chris Boulton + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_RedisCluster extends RedisentCluster +{ + /** + * @var array List of all commands in Redis that supply a key as their + * first argument. Used to prefix keys with the Resque namespace. + */ + private $keyCommands = array( + 'exists', + 'del', + 'type', + 'keys', + 'expire', + 'ttl', + 'move', + 'set', + 'get', + 'getset', + 'setnx', + 'incr', + 'incrby', + 'decrby', + 'decrby', + 'rpush', + 'lpush', + 'llen', + 'lrange', + 'ltrim', + 'lindex', + 'lset', + 'lrem', + 'lpop', + 'rpop', + 'sadd', + 'srem', + 'spop', + 'scard', + 'sismember', + 'smembers', + 'srandmember', + 'zadd', + 'zrem', + 'zrange', + 'zrevrange', + 'zrangebyscore', + 'zcard', + 'zscore', + 'zremrangebyscore', + 'sort' + ); + // sinterstore + // sunion + // sunionstore + // sdiff + // sdiffstore + // sinter + // smove + // rename + // rpoplpush + // mget + // msetnx + // mset + // renamenx + + /** + * Magic method to handle all function requests and prefix key based + * operations with the 'resque:' key prefix. + * + * @param string $name The name of the method called. + * @param array $args Array of supplied arguments to the method. + * @return mixed Return value from Resident::call() based on the command. + */ + public function __call($name, $args) { + $args = func_get_args(); + if(in_array($name, $this->keyCommands)) { + $args[1][0] = 'resque:' . $args[1][0]; + } + try { + return parent::__call($name, $args[1]); + } + catch(RedisException $e) { + return false; + } + } +} +?> From ae89f290577d5d71238637b2f210306e28043d95 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Sun, 1 Aug 2010 15:03:28 +1000 Subject: [PATCH 13/15] Update changelog. Formatting fixes --- CHANGELOG.markdown | 2 ++ lib/Resque.php | 23 +++++++++++------------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index 8c77002..a773751 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -3,6 +3,8 @@ PHP. * Implement ability to have setUp and tearDown methods for jobs, called before and after every single run. +* Ability to specify a cluster/multiple redis servers and consistent hash +between them (Thanks dceballos) ## 1.0 (2010-04-18) ## diff --git a/lib/Resque.php b/lib/Resque.php index df91fc3..aa48f8d 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -22,21 +22,20 @@ class Resque * Given a host/port combination separated by a colon, set it as * the redis server that Resque will talk to. * - * @param string $server Host/port combination separated by a colon. + * @param mixed $server Host/port combination separated by a colon, or + * a nested array of servers with host/port pairs. */ public static function setBackend($server) { - if(is_array($server)) { - - require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; - self::$redis = new Resque_RedisCluster($server); - - }else{ - list($host, $port) = explode(':', $server); - - require_once dirname(__FILE__) . '/Resque/Redis.php'; - self::$redis = new Resque_Redis($host, $port); - } + if(is_array($server)) { + require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; + self::$redis = new Resque_RedisCluster($server); + } + else { + list($host, $port) = explode(':', $server); + require_once dirname(__FILE__) . '/Resque/Redis.php'; + self::$redis = new Resque_Redis($host, $port); + } } /** From 5dc24ebbe4955dad346d85d9f30b2be3b8230d36 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Sun, 1 Aug 2010 15:07:14 +1000 Subject: [PATCH 14/15] Fix APP_INCLUDE typo --- CHANGELOG.markdown | 1 + resque.php | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index a773751..98de83f 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -5,6 +5,7 @@ PHP. and after every single run. * Ability to specify a cluster/multiple redis servers and consistent hash between them (Thanks dceballos) +* Fix `APP_INCLUDE` environment variable not loading correctly. ## 1.0 (2010-04-18) ## diff --git a/resque.php b/resque.php index eb6aec9..4b7ab44 100644 --- a/resque.php +++ b/resque.php @@ -12,7 +12,7 @@ if(!empty($_ENV['APP_INCLUDE'])) { die('APP_INCLUDE ('.$_ENV['APP_INCLUDE'].") does not exist.\n"); } - require_once APP_INCLUDE; + require_once $_ENV['APP_INCLUDE']; } require 'lib/Resque.php'; From 5f64653149c17f6e09d54e66b362d37ebf8e2e26 Mon Sep 17 00:00:00 2001 From: Chris Boulton Date: Sun, 1 Aug 2010 15:23:41 +1000 Subject: [PATCH 15/15] Change job classes to be instantiated rather than calling methods statically. This obviously makes it easier for state information to be destroyed after a job runs. This change is NOT backwards compatible and requests job classes be rewritten. Jobs also no longer receive arguments in the perform/setUp/tearDown methods but instead are passed as a $args variable to the instantiated job --- CHANGELOG.markdown | 6 ++++++ README.markdown | 27 +++++++++++---------------- lib/Resque/Job.php | 17 ++++++++++------- test/Resque/Tests/JobTest.php | 2 -- test/Resque/Tests/bootstrap.php | 14 ++++++-------- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown index 98de83f..de4182d 100644 --- a/CHANGELOG.markdown +++ b/CHANGELOG.markdown @@ -6,6 +6,12 @@ and after every single run. * Ability to specify a cluster/multiple redis servers and consistent hash between them (Thanks dceballos) * Fix `APP_INCLUDE` environment variable not loading correctly. +* Jobs are no longer defined as static methods, and classes are instantiated +first. This change is NOT backwards compatible and requires job classes are +updated. +* Job arguments are passed to the job class when it is instantiated, and +are accessible by $this->args. This change will break existing job classes +that rely on arguments that have not been updated. ## 1.0 (2010-04-18) ## diff --git a/README.markdown b/README.markdown index 0f2289c..7a19252 100644 --- a/README.markdown +++ b/README.markdown @@ -55,50 +55,45 @@ Jobs are queued as follows: ### Defining Jobs ### Each job should be in it's own class, and include a `perform` method. -It's important to note that classes are called statically. class My_Job { - public static function perform($args) + public function perform() { // Work work work - echo $args['name']; + echo $this->args['name']; } } +When the job is run, the class will be instantiated and any arguments +will be set as an array on the instantiated object, and are accessible +via `$this->args`. + Any exception thrown by a job will result in the job failing - be careful here and make sure you handle the exceptions that shouldn't result in a job failing. Jobs can also have `setUp` and `tearDown` methods. If a `setUp` method -is defined, it will be called along with `$args` before the `perform` -method is run. The `tearDown` method if defined, will be called with -`$args` also, after the job finishes. +is defined, it will be called before the `perform` method is run. +The `tearDown` method if defined, will be called after the job finishes. class My_Job { - public static function setUp($args) + public function setUp() { // ... Set up environment for this job } - public static function perform($args) + public function perform() { // .. Run job } - public static function tearDown($args) + public function tearDown() { // ... Remove environment for this job } } - -It is **IMPORTANT** to note, that on operating systems where Resque -cannot fork to run a job (Mac OS X, or other platforms where the PHP -process control functions are unavailable), that because job classes -are static, their state will be retained between job calls. **ALWAYS** -reset the environment back to how you got it if you're using a `setUp` -method, by resetting changes in a `tearDown` method. ### Tracking Job Statuses ### diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index afcea17..2831bf4 100644 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -129,15 +129,18 @@ class Resque_Job 'Job class ' . $this->payload['class'] . ' does not contain a perform method.' ); } - - if(method_exists($this->payload['class'], 'setUp')) { - call_user_func(array($this->payload['class'], 'setUp'), $this->payload['args']); + + $instance = new $this->payload['class']; + $isntance->args = $this->payload['args']; + + if(method_exists($instance, 'setUp')) { + $instance->setUp(); } - call_user_func(array($this->payload['class'], 'perform'), $this->payload['args']); - - if(method_exists($this->payload['class'], 'tearDown')) { - call_user_func(array($this->payload['class'], 'tearDown'), $this->payload['args']); + $instance->perform(); + + if(method_exists($instance, 'tearDown')) { + $instance->tearDown(); } } diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index 9e3b461..1d1db56 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -150,7 +150,6 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $job->perform(); $this->assertTrue(Test_Job_With_SetUp::$called); - $this->assertEquals($payload['args'], Test_Job_With_SetUp::$data); } public function testJobWithTearDownCallbackFiresSetUp() @@ -166,6 +165,5 @@ class Resque_Tests_JobTest extends Resque_Tests_TestCase $job->perform(); $this->assertTrue(Test_Job_With_TearDown::$called); - $this->assertEquals($payload['args'], Test_Job_With_TearDown::$data); } } \ No newline at end of file diff --git a/test/Resque/Tests/bootstrap.php b/test/Resque/Tests/bootstrap.php index d2efc27..e21707c 100644 --- a/test/Resque/Tests/bootstrap.php +++ b/test/Resque/Tests/bootstrap.php @@ -118,15 +118,14 @@ class Test_Job_Without_Perform_Method class Test_Job_With_SetUp { public static $called = false; - public static $data = false; + public $args = false; - public function setUp($data) + public function setUp() { self::$called = true; - self::$data = $data; } - public function perform($data) + public function perform() { } @@ -136,16 +135,15 @@ class Test_Job_With_SetUp class Test_Job_With_TearDown { public static $called = false; - public static $data = false; + public $args = false; - public function perform($data) + public function perform() { } - public function tearDown($data) + public function tearDown() { self::$called = true; - self::$data = $data; } } \ No newline at end of file