mirror of
https://github.com/idanoo/php-resque.git
synced 2024-11-22 00:11:53 +00:00
updated redisent to make it redis 2.2 compatible
This commit is contained in:
parent
4bc96dd88c
commit
d39a5f57d6
@ -20,118 +20,123 @@ class RedisException extends Exception {
|
|||||||
*/
|
*/
|
||||||
class Redisent {
|
class Redisent {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Socket connection to the Redis server
|
* Socket connection to the Redis server
|
||||||
* @var resource
|
* @var resource
|
||||||
* @access private
|
* @access private
|
||||||
*/
|
*/
|
||||||
private $__sock;
|
private $__sock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Redis bulk commands, they are sent in a slightly different format to the server
|
* Host of the Redis server
|
||||||
* @var array
|
* @var string
|
||||||
* @access private
|
* @access public
|
||||||
*/
|
*/
|
||||||
private $bulk_cmds = array(
|
public $host;
|
||||||
'SET', 'GETSET', 'SETNX', 'ECHO',
|
|
||||||
'RPUSH', 'LPUSH', 'LSET', 'LREM',
|
|
||||||
'SADD', 'SREM', 'SMOVE', 'SISMEMBER'
|
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
|
* Port on which the Redis server is running
|
||||||
* @param string $host The hostname of the Redis server
|
* @var integer
|
||||||
* @param integer $port The port number of the Redis server
|
* @access public
|
||||||
*/
|
*/
|
||||||
function __construct($host, $port = 6379) {
|
public $port;
|
||||||
$this->__sock = fsockopen($host, $port, $errno, $errstr);
|
|
||||||
if (!$this->__sock) {
|
|
||||||
throw new Exception("{$errno} - {$errstr}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function __destruct() {
|
/**
|
||||||
fclose($this->__sock);
|
* Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
|
||||||
}
|
* @param string $host The hostname of the Redis server
|
||||||
|
* @param integer $port The port number of the Redis server
|
||||||
|
*/
|
||||||
|
function __construct($host, $port = 6379) {
|
||||||
|
$this->host = $host;
|
||||||
|
$this->port = $port;
|
||||||
|
$this->__sock = fsockopen($this->host, $this->port, $errno, $errstr);
|
||||||
|
if (!$this->__sock) {
|
||||||
|
throw new \Exception("{$errno} - {$errstr}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function __call($name, $args) {
|
function __destruct() {
|
||||||
|
fclose($this->__sock);
|
||||||
|
}
|
||||||
|
|
||||||
/* Build the Redis protocol command */
|
function __call($name, $args) {
|
||||||
$name = strtoupper($name);
|
|
||||||
if (in_array($name, $this->bulk_cmds)) {
|
|
||||||
$value = array_pop($args);
|
|
||||||
$command = sprintf("%s %s %d%s%s%s", $name, trim(implode(' ', $args)), strlen($value), CRLF, $value, CRLF);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
$command = sprintf("%s %s%s", $name, trim(implode(' ', $args)), CRLF);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Open a Redis connection and execute the command */
|
/* Build the Redis unified protocol command */
|
||||||
fwrite($this->__sock, $command);
|
array_unshift($args, strtoupper($name));
|
||||||
|
$command = sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(function($arg) {
|
||||||
|
return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
|
||||||
|
}, $args), CRLF), CRLF);
|
||||||
|
|
||||||
/* Parse the response based on the reply identifier */
|
/* Open a Redis connection and execute the command */
|
||||||
$reply = trim(fgets($this->__sock, 512));
|
for ($written = 0; $written < strlen($command); $written += $fwrite) {
|
||||||
switch (substr($reply, 0, 1)) {
|
$fwrite = fwrite($this->__sock, substr($command, $written));
|
||||||
/* Error reply */
|
if ($fwrite === FALSE) {
|
||||||
case '-':
|
throw new \Exception('Failed to write entire command to stream');
|
||||||
echo $command."\n";
|
}
|
||||||
throw new RedisException(substr(trim($reply), 4));
|
}
|
||||||
break;
|
|
||||||
/* Inline reply */
|
/* Parse the response based on the reply identifier */
|
||||||
case '+':
|
$reply = trim(fgets($this->__sock, 512));
|
||||||
$response = substr(trim($reply), 1);
|
switch (substr($reply, 0, 1)) {
|
||||||
break;
|
/* Error reply */
|
||||||
/* Bulk reply */
|
case '-':
|
||||||
case '$':
|
throw new RedisException(substr(trim($reply), 4));
|
||||||
$response = null;
|
break;
|
||||||
if ($reply == '$-1') {
|
/* Inline reply */
|
||||||
break;
|
case '+':
|
||||||
}
|
$response = substr(trim($reply), 1);
|
||||||
$read = 0;
|
break;
|
||||||
$size = substr($reply, 1);
|
/* Bulk reply */
|
||||||
do {
|
case '$':
|
||||||
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
|
$response = null;
|
||||||
$response .= fread($this->__sock, $block_size);
|
if ($reply == '$-1') {
|
||||||
$read += $block_size;
|
break;
|
||||||
} while ($read < $size);
|
}
|
||||||
fread($this->__sock, 2); /* discard crlf */
|
$read = 0;
|
||||||
break;
|
$size = substr($reply, 1);
|
||||||
/* Multi-bulk reply */
|
do {
|
||||||
case '*':
|
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
|
||||||
$count = substr($reply, 1);
|
$response .= fread($this->__sock, $block_size);
|
||||||
if ($count == '-1') {
|
$read += $block_size;
|
||||||
return null;
|
} while ($read < $size);
|
||||||
}
|
fread($this->__sock, 2); /* discard crlf */
|
||||||
$response = array();
|
break;
|
||||||
for ($i = 0; $i < $count; $i++) {
|
/* Multi-bulk reply */
|
||||||
$bulk_head = trim(fgets($this->__sock, 512));
|
case '*':
|
||||||
$size = substr($bulk_head, 1);
|
$count = substr($reply, 1);
|
||||||
if ($size == '-1') {
|
if ($count == '-1') {
|
||||||
$response[] = null;
|
return null;
|
||||||
}
|
}
|
||||||
else {
|
$response = array();
|
||||||
$read = 0;
|
for ($i = 0; $i < $count; $i++) {
|
||||||
$block = "";
|
$bulk_head = trim(fgets($this->__sock, 512));
|
||||||
do {
|
$size = substr($bulk_head, 1);
|
||||||
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
|
if ($size == '-1') {
|
||||||
$block .= fread($this->__sock, $block_size);
|
$response[] = null;
|
||||||
$read += $block_size;
|
}
|
||||||
} while ($read < $size);
|
else {
|
||||||
fread($this->__sock, 2); /* discard crlf */
|
$read = 0;
|
||||||
$response[] = $block;
|
$block = "";
|
||||||
}
|
do {
|
||||||
}
|
$block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
|
||||||
break;
|
$block .= fread($this->__sock, $block_size);
|
||||||
/* Integer reply */
|
$read += $block_size;
|
||||||
case ':':
|
} while ($read < $size);
|
||||||
$response = substr(trim($reply), 1);
|
fread($this->__sock, 2); /* discard crlf */
|
||||||
break;
|
$response[] = $block;
|
||||||
default:
|
}
|
||||||
throw new RedisException("invalid server response: {$reply}");
|
}
|
||||||
break;
|
break;
|
||||||
}
|
/* Integer reply */
|
||||||
/* Party on */
|
case ':':
|
||||||
return $response;
|
$response = intval(substr(trim($reply), 1));
|
||||||
}
|
break;
|
||||||
|
default:
|
||||||
|
throw new RedisException("invalid server response: {$reply}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
/* Party on */
|
||||||
|
return $response;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user