Merge branch 'master' of git://github.com/chrisboulton/php-resque into blocking-list-pop

Conflicts:
	lib/Resque.php
	lib/Resque/RedisCluster.php
	lib/Resque/Worker.php
This commit is contained in:
Ruud Kamphuis 2013-03-11 22:38:30 +01:00
commit 5687c8fe82
41 changed files with 455 additions and 767 deletions

View file

@ -1,22 +0,0 @@
Copyright (c) 2009 Justin Poliey <jdp34@njit.edu>
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.

View file

@ -1,67 +0,0 @@
# Redisent
Redisent is a simple, no-nonsense interface to the [Redis](http://code.google.com/p/redis/) key-value store for modest developers.
Due to the way it is implemented, it is flexible and tolerant of changes to the Redis protocol.
## Getting to work
If you're at all familiar with the Redis protocol and PHP objects, you've already mastered Redisent.
All Redisent does is map the Redis protocol to a PHP object, abstract away the nitty-gritty, and make the return values PHP compatible.
require 'redisent.php';
$redis = new Redisent('localhost');
$redis->set('awesome', 'absolutely');
echo sprintf('Is Redisent awesome? %s.\n', $redis->get('awesome'));
You use the exact same command names, and the exact same argument order. **How wonderful.** How about a more complex example?
require 'redisent.php';
$redis = new Redisent('localhost');
$redis->rpush('particles', 'proton');
$redis->rpush('particles', 'electron');
$redis->rpush('particles', 'neutron');
$particles = $redis->lrange('particles', 0, -1);
$particle_count = $redis->llen('particles');
echo "<p>The {$particle_count} particles that make up atoms are:</p>";
echo "<ul>";
foreach ($particles as $particle) {
echo "<li>{$particle}</li>";
}
echo "</ul>";
Be aware that Redis error responses will be wrapped in a RedisException class and thrown, so do be sure to use proper coding techniques.
## Clustering your servers
Redisent also includes a way for developers to fully utilize the scalability of Redis with multiple servers and [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing).
Using the RedisentCluster class, you can use Redisent the same way, except that keys will be hashed across multiple servers.
Here is how to set up a cluster:
include 'redisent_cluster.php';
$cluster = new RedisentCluster(array(
array('host' => '127.0.0.1', 'port' => 6379),
array('host' => '127.0.0.1', 'port' => 6380)
));
You can then use Redisent the way you normally would, i.e., `$cluster->set('key', 'value')` or `$cluster->lrange('particles', 0, -1)`.
But what about when you need to use commands that are server specific and do not operate on keys? You can use routing, with the `RedisentCluster::to` method.
To use routing, you need to assign a server an alias in the constructor of the Redis cluster. Aliases are not required on all servers, just the ones you want to be able to access directly.
include 'redisent_cluster.php';
$cluster = new RedisentCluster(array(
'alpha' => array('host' => '127.0.0.1', 'port' => 6379),
array('host' => '127.0.0.1', 'port' => 6380)
));
Now there is an alias of the server running on 127.0.0.1:6379 called **alpha**, and can be interacted with like this:
// get server info
$cluster->to('alpha')->info();
Now you have complete programatic control over your Redis servers.
## About
&copy; 2009 [Justin Poliey](http://justinpoliey.com)

View file

@ -1,150 +0,0 @@
<?php
/**
* Redisent, a Redis interface for the modest
* @author Justin Poliey <jdp34@njit.edu>
* @copyright 2009 Justin Poliey <jdp34@njit.edu>
* @license http://www.opensource.org/licenses/mit-license.php The MIT License
* @package Redisent
*/
define('CRLF', sprintf('%s%s', chr(13), chr(10)));
/**
* Wraps native Redis errors in friendlier PHP exceptions
* Only declared if class doesn't already exist to ensure compatibility with php-redis
*/
if (! class_exists('RedisException')) {
class RedisException extends Exception {
}
}
/**
* Redisent, a Redis interface for the modest among us
*/
class Redisent {
/**
* Socket connection to the Redis server
* @var resource
* @access private
*/
private $__sock;
/**
* Host of the Redis server
* @var string
* @access public
*/
public $host;
/**
* Port on which the Redis server is running
* @var integer
* @access public
*/
public $port;
/**
* Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
* @param string $host The hostname of the Redis server
* @param integer $port The port number of the Redis server
*/
function __construct($host, $port = 6379) {
$this->host = $host;
$this->port = $port;
$this->establishConnection();
}
function establishConnection() {
$this->__sock = fsockopen($this->host, $this->port, $errno, $errstr);
if (!$this->__sock) {
throw new Exception("{$errno} - {$errstr}");
}
}
function __destruct() {
fclose($this->__sock);
}
function __call($name, $args) {
/* Build the Redis unified protocol command */
array_unshift($args, strtoupper($name));
$command = sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(array($this, 'formatArgument'), $args), CRLF), CRLF);
/* Open a Redis connection and execute the command */
for ($written = 0; $written < strlen($command); $written += $fwrite) {
$fwrite = fwrite($this->__sock, substr($command, $written));
if ($fwrite === FALSE) {
throw new Exception('Failed to write entire command to stream');
}
}
/* Parse the response based on the reply identifier */
$reply = trim(fgets($this->__sock, 512));
switch (substr($reply, 0, 1)) {
/* Error reply */
case '-':
throw new RedisException(substr(trim($reply), 4));
break;
/* Inline reply */
case '+':
$response = substr(trim($reply), 1);
break;
/* Bulk reply */
case '$':
$response = null;
if ($reply == '$-1') {
break;
}
$read = 0;
$size = substr($reply, 1);
do {
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
$response .= fread($this->__sock, $block_size);
$read += $block_size;
} while ($read < $size);
fread($this->__sock, 2); /* discard crlf */
break;
/* Multi-bulk reply */
case '*':
$count = substr($reply, 1);
if ($count == '-1') {
return null;
}
$response = array();
for ($i = 0; $i < $count; $i++) {
$bulk_head = trim(fgets($this->__sock, 512));
$size = substr($bulk_head, 1);
if ($size == '-1') {
$response[] = null;
}
else {
$read = 0;
$block = "";
do {
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
$block .= fread($this->__sock, $block_size);
$read += $block_size;
} while ($read < $size);
fread($this->__sock, 2); /* discard crlf */
$response[] = $block;
}
}
break;
/* Integer reply */
case ':':
$response = intval(substr(trim($reply), 1));
break;
default:
throw new RedisException("invalid server response: {$reply}");
break;
}
/* Party on */
return $response;
}
private function formatArgument($arg) {
return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
}
}

View file

@ -1,138 +0,0 @@
<?php
/**
* Redisent, a Redis interface for the modest
* @author Justin Poliey <jdp34@njit.edu>
* @copyright 2009 Justin Poliey <jdp34@njit.edu>
* @license http://www.opensource.org/licenses/mit-license.php The MIT License
* @package Redisent
*/
require_once dirname(__FILE__) . '/Redisent.php';
/**
* A generalized Redisent interface for a cluster of Redis servers
*/
class RedisentCluster {
/**
* Collection of Redisent objects attached to Redis servers
* @var array
* @access private
*/
private $redisents;
/**
* Aliases of Redisent objects attached to Redis servers, used to route commands to specific servers
* @see RedisentCluster::to
* @var array
* @access private
*/
private $aliases;
/**
* Hash ring of Redis server nodes
* @var array
* @access private
*/
private $ring;
/**
* Individual nodes of pointers to Redis servers on the hash ring
* @var array
* @access private
*/
private $nodes;
/**
* Number of replicas of each node to make around the hash ring
* @var integer
* @access private
*/
private $replicas = 128;
/**
* The commands that are not subject to hashing
* @var array
* @access private
*/
private $dont_hash = array(
'RANDOMKEY', 'DBSIZE',
'SELECT', 'MOVE', 'FLUSHDB', 'FLUSHALL',
'SAVE', 'BGSAVE', 'LASTSAVE', 'SHUTDOWN',
'INFO', 'MONITOR', 'SLAVEOF'
);
/**
* Creates a Redisent interface to a cluster of Redis servers
* @param array $servers The Redis servers in the cluster. Each server should be in the format array('host' => hostname, 'port' => port)
*/
function __construct($servers) {
$this->ring = array();
$this->aliases = array();
foreach ($servers as $alias => $server) {
$this->redisents[] = new Redisent($server['host'], $server['port']);
if (is_string($alias)) {
$this->aliases[$alias] = $this->redisents[count($this->redisents)-1];
}
for ($replica = 1; $replica <= $this->replicas; $replica++) {
$this->ring[crc32($server['host'].':'.$server['port'].'-'.$replica)] = $this->redisents[count($this->redisents)-1];
}
}
ksort($this->ring, SORT_NUMERIC);
$this->nodes = array_keys($this->ring);
}
/**
* Routes a command to a specific Redis server aliased by {$alias}.
* @param string $alias The alias of the Redis server
* @return Redisent The Redisent object attached to the Redis server
*/
function to($alias) {
if (isset($this->aliases[$alias])) {
return $this->aliases[$alias];
}
else {
throw new Exception("That Redisent alias does not exist");
}
}
/* Execute a Redis command on the cluster */
function __call($name, $args) {
/* Pick a server node to send the command to */
$name = strtoupper($name);
if (!in_array($name, $this->dont_hash)) {
$node = $this->nextNode(crc32($args[0]));
$redisent = $this->ring[$node];
}
else {
$redisent = $this->redisents[0];
}
/* Execute the command on the server */
return call_user_func_array(array($redisent, $name), $args);
}
/**
* Routes to the proper server node
* @param integer $needle The hash value of the Redis command
* @return Redisent The Redisent object associated with the hash
*/
private function nextNode($needle) {
$haystack = $this->nodes;
while (count($haystack) > 2) {
$try = floor(count($haystack) / 2);
if ($haystack[$try] == $needle) {
return $needle;
}
if ($needle < $haystack[$try]) {
$haystack = array_slice($haystack, 0, $try + 1);
}
if ($needle > $haystack[$try]) {
$haystack = array_slice($haystack, $try + 1);
}
}
return $haystack[count($haystack)-1];
}
}

View file

@ -1,18 +1,14 @@
<?php
require_once dirname(__FILE__) . '/Resque/Event.php';
require_once dirname(__FILE__) . '/Resque/Exception.php';
/**
* Base Resque class.
*
* @package Resque
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque
{
const VERSION = '1.0';
const VERSION = '1.2';
const DEFAULT_INTERVAL = 5;
@ -32,12 +28,6 @@ class Resque
*/
protected static $redisDatabase = 0;
/**
* @var int PID of current process. Used to detect changes when forking
* and implement "thread" safety to avoid race conditions.
*/
protected static $pid = null;
/**
* Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to.
@ -60,15 +50,7 @@ class Resque
*/
public static function redis()
{
// Detect when the PID of the current process has changed (from a fork, etc)
// and force a reconnect to redis.
$pid = getmypid();
if (self::$pid !== $pid) {
self::$redis = null;
self::$pid = $pid;
}
if(!is_null(self::$redis)) {
if (self::$redis !== null) {
return self::$redis;
}
@ -77,24 +59,35 @@ class Resque
$server = 'localhost:6379';
}
if(is_array($server)) {
require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
self::$redis = new Resque_RedisCluster($server);
}
else {
if (strpos($server, 'unix:') === false) {
list($host, $port) = explode(':', $server);
}
else {
$host = $server;
$port = null;
}
require_once dirname(__FILE__) . '/Resque/Redis.php';
self::$redis = new Resque_Redis($host, $port);
self::$redis = new Resque_Redis($server, self::$redisDatabase);
return self::$redis;
}
/**
* fork() helper method for php-resque that handles issues PHP socket
* and phpredis have with passing around sockets between child/parent
* processes.
*
* Will close connection to Redis before forking.
*
* @return int Return vars as per pcntl_fork()
*/
public static function fork()
{
if(!function_exists('pcntl_fork')) {
return -1;
}
self::$redis->select(self::$redisDatabase);
return self::$redis;
// Close the connection to Redis before forking.
// This is a workaround for issues phpredis has.
self::$redis = null;
$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}
return $pid;
}
/**
@ -156,7 +149,6 @@ class Resque
*/
public static function enqueue($queue, $class, $args = null, $trackStatus = false)
{
require_once dirname(__FILE__) . '/Resque/Job.php';
$result = Resque_Job::create($queue, $class, $args, $trackStatus);
if ($result) {
Resque_Event::trigger('afterEnqueue', array(
@ -177,7 +169,6 @@ class Resque
*/
public static function reserve($queue, $interval = null)
{
require_once dirname(__FILE__) . '/Resque/Job.php';
return Resque_Job::reserve($queue, $interval);
}

View file

@ -3,8 +3,7 @@
* Resque event/plugin system class
*
* @package Resque/Event
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Event

View file

@ -3,8 +3,7 @@
* Resque exception.
*
* @package Resque
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Exception extends Exception

View file

@ -1,12 +1,10 @@
<?php
require_once dirname(__FILE__) . '/Failure/Interface.php';
/**
* Failed Resque job.
*
* @package Resque/Failure
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Failure
@ -38,7 +36,6 @@ class Resque_Failure
public static function getBackend()
{
if(self::$backend === null) {
require dirname(__FILE__) . '/Failure/Redis.php';
self::$backend = 'Resque_Failure_Redis';
}

View file

@ -3,8 +3,7 @@
* Interface that all failure backends should implement.
*
* @package Resque/Failure
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
interface Resque_Failure_Interface

View file

@ -3,8 +3,7 @@
* Redis backend for storing failed Resque jobs.
*
* @package Resque/Failure
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/

View file

@ -1,14 +1,9 @@
<?php
require_once dirname(__FILE__) . '/Event.php';
require_once dirname(__FILE__) . '/Job/Status.php';
require_once dirname(__FILE__) . '/Job/DontPerform.php';
/**
* Resque job.
*
* @package Resque/Job
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job
@ -209,7 +204,6 @@ class Resque_Job
));
$this->updateStatus(Resque_Job_Status::STATUS_FAILED);
require_once dirname(__FILE__) . '/Failure.php';
Resque_Failure::create(
$this->payload,
$exception,

View file

@ -3,8 +3,7 @@
* Runtime exception class for a job that does not exit cleanly.
*
* @package Resque/Job
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_DirtyExitException extends RuntimeException

View file

@ -3,8 +3,7 @@
* Exception to be thrown if a job should not be performed/run.
*
* @package Resque/Job
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_DontPerform extends Exception

View file

@ -3,8 +3,7 @@
* Status tracker/information for a job.
*
* @package Resque/Job
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_Status

View file

@ -1,26 +1,22 @@
<?php
// Third- party apps may have already loaded Resident from elsewhere
// so lets be careful.
if(!class_exists('Redisent', false)) {
require_once dirname(__FILE__) . '/../Redisent/Redisent.php';
}
/**
* Extended Redisent class used by Resque for all communication with
* redis. Essentially adds namespace support to Redisent.
* Wrap Credis to add namespace support and various helper methods.
*
* @package Resque/Redis
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Redis extends Redisent
class Resque_Redis
{
/**
* Redis namespace
* @var string
*/
private static $defaultNamespace = 'resque:';
private $server;
private $database;
/**
* @var array List of all commands in Redis that supply a key as their
* first argument. Used to prefix keys with the Resque namespace.
@ -34,6 +30,7 @@ class Resque_Redis extends Redisent
'ttl',
'move',
'set',
'setex',
'get',
'getset',
'setnx',
@ -82,7 +79,7 @@ class Resque_Redis extends Redisent
// msetnx
// mset
// renamenx
/**
* Set Redis namespace (prefix) default: resque
* @param string $namespace
@ -94,7 +91,48 @@ class Resque_Redis extends Redisent
}
self::$defaultNamespace = $namespace;
}
public function __construct($server, $database = null)
{
$this->server = $server;
$this->database = $database;
if (is_array($this->server)) {
$this->driver = new Credis_Cluster($server);
}
else {
$port = null;
$password = null;
$host = $server;
// If not a UNIX socket path or tcp:// formatted connections string
// assume host:port combination.
if (strpos($server, '/') === false) {
$parts = explode(':', $server);
if (isset($parts[1])) {
$port = $parts[1];
}
$host = $parts[0];
}else if (strpos($server, 'redis://') !== false){
// Redis format is:
// redis://[user]:[password]@[host]:[port]
list($userpwd,$hostport) = explode('@', $server);
$userpwd = substr($userpwd, strpos($userpwd, 'redis://')+8);
list($host, $port) = explode(':', $hostport);
list($user, $password) = explode(':', $userpwd);
}
$this->driver = new Credis_Client($host, $port);
if (isset($password)){
$this->driver->auth($password);
}
}
if ($this->database !== null) {
$this->driver->select($database);
}
}
/**
* Magic method to handle all function requests and prefix key based
* operations with the {self::$defaultNamespace} key prefix.
@ -104,16 +142,30 @@ class Resque_Redis extends Redisent
* @return mixed Return value from Resident::call() based on the command.
*/
public function __call($name, $args) {
$args = func_get_args();
if(in_array($name, $this->keyCommands)) {
$args[1][0] = self::$defaultNamespace . $args[1][0];
$args[0] = self::$defaultNamespace . $args[0];
}
try {
return parent::__call($name, $args[1]);
return $this->driver->__call($name, $args);
}
catch(RedisException $e) {
catch(CredisException $e) {
return false;
}
}
public static function getPrefix()
{
return self::$defaultNamespace;
}
public static function removePrefix($string)
{
$prefix=self::getPrefix();
if (substr($string, 0, strlen($prefix)) == $prefix) {
$string = substr($string, strlen($prefix), strlen($string) );
}
return $string;
}
}
?>

View file

@ -1,119 +0,0 @@
<?php
// Third- party apps may have already loaded Resident from elsewhere
// so lets be careful.
if(!class_exists('RedisentCluster', false)) {
require_once dirname(__FILE__) . '/../Redisent/RedisentCluster.php';
}
/**
* Extended Redisent class used by Resque for all communication with
* redis. Essentially adds namespace support to Redisent.
*
* @package Resque/Redis
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_RedisCluster extends RedisentCluster
{
/**
* Redis namespace
* @var string
*/
private static $defaultNamespace = 'resque:';
/**
* @var array List of all commands in Redis that supply a key as their
* first argument. Used to prefix keys with the Resque namespace.
*/
private $keyCommands = array(
'exists',
'del',
'type',
'keys',
'expire',
'ttl',
'move',
'set',
'get',
'getset',
'setnx',
'incr',
'incrby',
'decrby',
'decrby',
'rpush',
'lpush',
'llen',
'lrange',
'ltrim',
'lindex',
'lset',
'lrem',
'lpop',
'blpop',
'rpop',
'sadd',
'srem',
'spop',
'scard',
'sismember',
'smembers',
'srandmember',
'zadd',
'zrem',
'zrange',
'zrevrange',
'zrangebyscore',
'zcard',
'zscore',
'zremrangebyscore',
'sort'
);
// sinterstore
// sunion
// sunionstore
// sdiff
// sdiffstore
// sinter
// smove
// rename
// rpoplpush
// mget
// msetnx
// mset
// renamenx
/**
* Set Redis namespace (prefix) default: resque
* @param string $namespace
*/
public static function prefix($namespace)
{
if (strpos($namespace, ':') === false) {
$namespace .= ':';
}
self::$defaultNamespace = $namespace;
}
/**
* Magic method to handle all function requests and prefix key based
* operations with the '{self::$defaultNamespace}' key prefix.
*
* @param string $name The name of the method called.
* @param array $args Array of supplied arguments to the method.
* @return mixed Return value from Resident::call() based on the command.
*/
public function __call($name, $args) {
$args = func_get_args();
if(in_array($name, $this->keyCommands)) {
$args[1][0] = self::$defaultNamespace . $args[1][0];
}
try {
return parent::__call($name, $args[1]);
}
catch(RedisException $e) {
return false;
}
}
}
?>

View file

@ -3,8 +3,7 @@
* Resque statistic management (jobs processed, failed, etc)
*
* @package Resque/Stat
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Stat

View file

@ -1,16 +1,10 @@
<?php
require_once dirname(__FILE__) . '/Stat.php';
require_once dirname(__FILE__) . '/Event.php';
require_once dirname(__FILE__) . '/Job.php';
require_once dirname(__FILE__) . '/Job/DirtyExitException.php';
/**
* Resque worker that handles checking queues for jobs, fetching them
* off the queues, running them and handling the result.
*
* @package Resque/Worker
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2010 Chris Boulton
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Worker
@ -190,7 +184,7 @@ class Resque_Worker
Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job);
$this->child = $this->fork();
$this->child = Resque::fork();
// Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) {
@ -292,27 +286,6 @@ class Resque_Worker
return $queues;
}
/**
* Attempt to fork a child process from the parent to run a job in.
*
* Return values are those of pcntl_fork().
*
* @return int -1 if the fork failed, 0 for the forked child, the PID of the child for the parent.
*/
private function fork()
{
if(!function_exists('pcntl_fork')) {
return false;
}
$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}
return $pid;
}
/**
* Perform necessary actions to start a worker.
*/
@ -480,7 +453,7 @@ class Resque_Worker
*/
public function registerWorker()
{
Resque::redis()->sadd('workers', $this);
Resque::redis()->sadd('workers', (string)$this);
Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y'));
}
@ -544,16 +517,21 @@ class Resque_Worker
/**
* Output a given log message to STDOUT.
*
* @param string $message Message to output.
* @param string $message Message to output.
* @param int $logLevel The logging level to capture
*/
public function log($message)
public function log($message, $logLevel = self::LOG_NORMAL)
{
if($this->logLevel == self::LOG_NORMAL) {
if ($logLevel > $this->logLevel) {
return;
}
if ($this->logLevel == self::LOG_NORMAL) {
fwrite(STDOUT, "*** " . $message . "\n");
return;
}
else if($this->logLevel == self::LOG_VERBOSE) {
fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
}
fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
}
/**