mirror of
https://github.com/idanoo/php-resque
synced 2025-07-02 06:02:21 +00:00
Compare commits
No commits in common. "main" and "2.1.1" have entirely different histories.
14 changed files with 291 additions and 454 deletions
36
.github/workflows/ci.yml
vendored
36
.github/workflows/ci.yml
vendored
|
@ -1,36 +0,0 @@
|
||||||
name: CI
|
|
||||||
on: [push]
|
|
||||||
jobs:
|
|
||||||
Linter:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
container: php:8.2
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v3
|
|
||||||
- name: Install composer
|
|
||||||
run: apt-get update -yq && apt-get install git wget procps unzip -y && pecl install -o -f redis && rm -rf /tmp/pear && docker-php-ext-enable redis && wget https://getcomposer.org/composer.phar && php composer.phar install --dev
|
|
||||||
- name: Validate composer.json and composer.lock
|
|
||||||
run: php composer.phar validate --strict
|
|
||||||
- name: Install dependencies
|
|
||||||
run: php composer.phar install --dev --prefer-dist --no-progress
|
|
||||||
- name: Run PHPCS Linter
|
|
||||||
run: php -d memory_limit=256M vendor/bin/phpcs -s --standard=ruleset.xml
|
|
||||||
PHPTest:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
container: php:${{ matrix.php_version }}
|
|
||||||
strategy:
|
|
||||||
matrix:
|
|
||||||
php_version: [8.1, 8.2,8.3,8.4]
|
|
||||||
services:
|
|
||||||
redis:
|
|
||||||
image: redis:7.0
|
|
||||||
options: >-
|
|
||||||
--health-cmd "redis-cli ping"
|
|
||||||
--health-interval 10s
|
|
||||||
--health-timeout 5s
|
|
||||||
--health-retries 5
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v3
|
|
||||||
- name: Install composer
|
|
||||||
run: apt-get update -yq && apt-get install git wget procps unzip -y && pecl install -o -f redis && rm -rf /tmp/pear && docker-php-ext-enable redis && wget https://getcomposer.org/composer.phar && php composer.phar install --dev
|
|
||||||
- name: Run PHP ${{ matrix.php_version }}} Unit Tests
|
|
||||||
run: php vendor/bin/phpunit --verbose --configuration phpunit.xml
|
|
29
CHANGELOG.md
29
CHANGELOG.md
|
@ -1,32 +1,3 @@
|
||||||
# 2.5.3 (2025-06-08)
|
|
||||||
- Update typing of Log() to support all psr\log versions
|
|
||||||
|
|
||||||
- # 2.5.2 (2025-06-08)
|
|
||||||
- Update typing of Log() to support all psr\log versions
|
|
||||||
|
|
||||||
# 2.5.1 (2025-06-08)
|
|
||||||
- Update psr/log version requirements
|
|
||||||
|
|
||||||
# 2.5.0 (2025-06-08)
|
|
||||||
- Update packages
|
|
||||||
|
|
||||||
# 2.4.0 (2024-12-11)
|
|
||||||
- Update packages (psr/log ^3.0.2)
|
|
||||||
|
|
||||||
# 2.3.0 (2024-09-04)
|
|
||||||
- Update packages
|
|
||||||
|
|
||||||
# 2.2.0 (2023-03-20)
|
|
||||||
- Update pacakges
|
|
||||||
- Bump requirements to PHP >= 8.1
|
|
||||||
|
|
||||||
# 2.1.3 (2023-11-15)
|
|
||||||
- Resolved issue with SET EX TTL's using unix-timestamps
|
|
||||||
|
|
||||||
# 2.1.2 (2023-03-22)
|
|
||||||
- Update composer packages
|
|
||||||
- Update git information (GitHub)
|
|
||||||
|
|
||||||
# 2.1.1 (2023-03-20)
|
# 2.1.1 (2023-03-20)
|
||||||
- Changed setex to set with EX values
|
- Changed setex to set with EX values
|
||||||
- Added TTLs to missing keys
|
- Added TTLs to missing keys
|
||||||
|
|
44
README.md
44
README.md
|
@ -1,4 +1,4 @@
|
||||||
php-resque: PHP Background (Resque) Worker
|
php-resque: PHP Resque Worker (and Enqueue)
|
||||||
===========================================
|
===========================================
|
||||||
|
|
||||||
Resque is a Redis-backed library for creating background jobs, placing
|
Resque is a Redis-backed library for creating background jobs, placing
|
||||||
|
@ -42,7 +42,7 @@ On top of the original fork (chrisboulton/php-resque) I have added:
|
||||||
|
|
||||||
## Requirements ##
|
## Requirements ##
|
||||||
|
|
||||||
* PHP 8.1+
|
* PHP 7.0+
|
||||||
* phpredis
|
* phpredis
|
||||||
* Redis 2.2+
|
* Redis 2.2+
|
||||||
|
|
||||||
|
@ -53,9 +53,19 @@ Composer package inside your project.
|
||||||
|
|
||||||
If you're not familiar with Composer, please see <http://getcomposer.org/>.
|
If you're not familiar with Composer, please see <http://getcomposer.org/>.
|
||||||
|
|
||||||
1. Run `composer require idanoo/php-resque`.
|
1. Add php-resque to your application's composer.json.
|
||||||
|
|
||||||
2. If you haven't already, add the Composer autoload to your project's
|
```json
|
||||||
|
{
|
||||||
|
"require": {
|
||||||
|
"idanoo/php-resque": "^2.0"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Run `composer install`.
|
||||||
|
|
||||||
|
3. If you haven't already, add the Composer autoload to your project's
|
||||||
initialization file. (example)
|
initialization file. (example)
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
|
@ -74,7 +84,7 @@ Resque::setBackend('redis:6379');
|
||||||
|
|
||||||
$args = ['name' => 'TestName'];
|
$args = ['name' => 'TestName'];
|
||||||
|
|
||||||
Resque::enqueue('default', '\App\MyJobClass', $args);
|
Resque::enqueue('default', 'My_Job', $args);
|
||||||
```
|
```
|
||||||
|
|
||||||
### Defining Jobs ###
|
### Defining Jobs ###
|
||||||
|
@ -82,9 +92,7 @@ Resque::enqueue('default', '\App\MyJobClass', $args);
|
||||||
Each job should be in its own class, and include a `perform` method.
|
Each job should be in its own class, and include a `perform` method.
|
||||||
|
|
||||||
```php
|
```php
|
||||||
namespace \App;
|
class My_Job
|
||||||
|
|
||||||
class MyJobClass
|
|
||||||
{
|
{
|
||||||
public function perform()
|
public function perform()
|
||||||
{
|
{
|
||||||
|
@ -107,9 +115,7 @@ The `tearDown` method, if defined, will be called after the job finishes.
|
||||||
|
|
||||||
|
|
||||||
```php
|
```php
|
||||||
namespace App;
|
class My_Job
|
||||||
|
|
||||||
class MyJobClass
|
|
||||||
{
|
{
|
||||||
public function setUp()
|
public function setUp()
|
||||||
{
|
{
|
||||||
|
@ -133,17 +139,17 @@ class MyJobClass
|
||||||
This method can be used to conveniently remove a job from a queue.
|
This method can be used to conveniently remove a job from a queue.
|
||||||
|
|
||||||
```php
|
```php
|
||||||
// Removes job class '\App\MyJobClass' of queue 'default'
|
// Removes job class 'My_Job' of queue 'default'
|
||||||
Resque::dequeue('default', ['\App\MyJobClass']);
|
Resque::dequeue('default', ['My_Job']);
|
||||||
|
|
||||||
// Removes job class '\App\MyJobClass' with Job ID '087df5819a790ac666c9608e2234b21e' of queue 'default'
|
// Removes job class 'My_Job' with Job ID '087df5819a790ac666c9608e2234b21e' of queue 'default'
|
||||||
Resque::dequeue('default', ['\App\MyJobClass' => '087df5819a790ac666c9608e2234b21e']);
|
Resque::dequeue('default', ['My_Job' => '087df5819a790ac666c9608e2234b21e']);
|
||||||
|
|
||||||
// Removes job class '\App\MyJobClass' with arguments of queue 'default'
|
// Removes job class 'My_Job' with arguments of queue 'default'
|
||||||
Resque::dequeue('default', ['\App\MyJobClass' => ['foo' => 1, 'bar' => 2]]);
|
Resque::dequeue('default', ['My_Job' => ['foo' => 1, 'bar' => 2]]);
|
||||||
|
|
||||||
// Removes multiple jobs
|
// Removes multiple jobs
|
||||||
Resque::dequeue('default', ['\App\MyJobClass', '\App\MyJobClass2']);
|
Resque::dequeue('default', ['My_Job', 'My_Job2']);
|
||||||
```
|
```
|
||||||
|
|
||||||
If no jobs are given, this method will dequeue all jobs matching the provided queue.
|
If no jobs are given, this method will dequeue all jobs matching the provided queue.
|
||||||
|
@ -164,7 +170,7 @@ To track the status of a job, pass `true` as the fourth argument to
|
||||||
returned:
|
returned:
|
||||||
|
|
||||||
```php
|
```php
|
||||||
$token = Resque::enqueue('default', '\App\MyJobClass', $args, true);
|
$token = Resque::enqueue('default', 'My_Job', $args, true);
|
||||||
echo $token;
|
echo $token;
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
},
|
},
|
||||||
"description": "Redis backed library for creating background jobs and processing them later. Based on resque for Ruby. Originally forked from chrisboulton/php-resque.",
|
"description": "Redis backed library for creating background jobs and processing them later. Based on resque for Ruby. Originally forked from chrisboulton/php-resque.",
|
||||||
"keywords": ["job", "background", "redis", "resque", "php"],
|
"keywords": ["job", "background", "redis", "resque", "php"],
|
||||||
"homepage": "https://github.com/idanoo/php-resque",
|
"homepage": "https://gitlab.com/idanoo/php-resque/",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"authors": [
|
"authors": [
|
||||||
{
|
{
|
||||||
|
@ -16,9 +16,9 @@
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"require": {
|
"require": {
|
||||||
"php": ">=8.1",
|
"php": ">7.4",
|
||||||
"psr/log": "^1.1 || ^2.0 || ^3.0",
|
"psr/log": "^1.1.0",
|
||||||
"colinmollenhour/credis": "^1.14.0"
|
"colinmollenhour/credis": "^1.13.0"
|
||||||
},
|
},
|
||||||
"require-dev": {
|
"require-dev": {
|
||||||
"phpunit/phpunit": "^9",
|
"phpunit/phpunit": "^9",
|
||||||
|
@ -40,8 +40,8 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"support": {
|
"support": {
|
||||||
"issues": "https://github.com/idanoo/php-resque/issues",
|
"issues": "https://gitlab.com/idanoo/php-resque/issues",
|
||||||
"source": "https://github.com/idanoo/php-resque"
|
"source": "https://gitlab.com/idanoo/php-resque"
|
||||||
},
|
},
|
||||||
"config": {
|
"config": {
|
||||||
"allow-plugins": {
|
"allow-plugins": {
|
||||||
|
|
441
composer.lock
generated
441
composer.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -22,7 +22,6 @@ class Event
|
||||||
*
|
*
|
||||||
* @param string $event Name of event to be raised.
|
* @param string $event Name of event to be raised.
|
||||||
* @param mixed $data Optional, any data that should be passed to each callback.
|
* @param mixed $data Optional, any data that should be passed to each callback.
|
||||||
*
|
|
||||||
* @return true
|
* @return true
|
||||||
*/
|
*/
|
||||||
public static function trigger($event, $data = null)
|
public static function trigger($event, $data = null)
|
||||||
|
@ -50,8 +49,7 @@ class Event
|
||||||
* Listen in on a given event to have a specified callback fired.
|
* Listen in on a given event to have a specified callback fired.
|
||||||
*
|
*
|
||||||
* @param string $event Name of event to listen on.
|
* @param string $event Name of event to listen on.
|
||||||
* @param mixed $callback Any callback callable by call_user_func_array
|
* @param mixed $callback Any callback callable by call_user_func_array.
|
||||||
*
|
|
||||||
* @return true
|
* @return true
|
||||||
*/
|
*/
|
||||||
public static function listen($event, $callback)
|
public static function listen($event, $callback)
|
||||||
|
@ -69,7 +67,6 @@ class Event
|
||||||
*
|
*
|
||||||
* @param string $event Name of event.
|
* @param string $event Name of event.
|
||||||
* @param mixed $callback The callback as defined when listen() was called.
|
* @param mixed $callback The callback as defined when listen() was called.
|
||||||
*
|
|
||||||
* @return true
|
* @return true
|
||||||
*/
|
*/
|
||||||
public static function stopListening($event, $callback)
|
public static function stopListening($event, $callback)
|
||||||
|
@ -88,10 +85,8 @@ class Event
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call all registered listeners.
|
* Call all registered listeners.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public static function clearListeners(): void
|
public static function clearListeners()
|
||||||
{
|
{
|
||||||
self::$events = [];
|
self::$events = [];
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ namespace Resque\Job;
|
||||||
* @author Daniel Mason <daniel@m2.nz>
|
* @author Daniel Mason <daniel@m2.nz>
|
||||||
* @license http://www.opensource.org/licenses/mit-license.php
|
* @license http://www.opensource.org/licenses/mit-license.php
|
||||||
*/
|
*/
|
||||||
|
|
||||||
class Factory implements FactoryInterface
|
class Factory implements FactoryInterface
|
||||||
{
|
{
|
||||||
public ?Job $job;
|
public ?Job $job;
|
||||||
|
@ -17,8 +16,6 @@ class Factory implements FactoryInterface
|
||||||
public array $args;
|
public array $args;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create job factory
|
|
||||||
*
|
|
||||||
* @param $className
|
* @param $className
|
||||||
* @param array $args
|
* @param array $args
|
||||||
* @param $queue
|
* @param $queue
|
||||||
|
|
|
@ -62,7 +62,7 @@ class Status
|
||||||
\Resque\Resque::redis()->set(
|
\Resque\Resque::redis()->set(
|
||||||
'job:' . $id . ':status',
|
'job:' . $id . ':status',
|
||||||
json_encode($statusPacket),
|
json_encode($statusPacket),
|
||||||
['ex' => \Resque\Redis::DEFAULT_REDIS_TTL],
|
['ex' => time() + 86400],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ class Status
|
||||||
\Resque\Resque::redis()->set(
|
\Resque\Resque::redis()->set(
|
||||||
(string)$this,
|
(string)$this,
|
||||||
json_encode($statusPacket),
|
json_encode($statusPacket),
|
||||||
['ex' => \Resque\Redis::DEFAULT_REDIS_TTL],
|
['ex' => time() + 86400],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,10 +25,9 @@ class Log extends \Psr\Log\AbstractLogger
|
||||||
* @param mixed $level PSR-3 log level constant, or equivalent string
|
* @param mixed $level PSR-3 log level constant, or equivalent string
|
||||||
* @param string $message Message to log, may contain a { placeholder }
|
* @param string $message Message to log, may contain a { placeholder }
|
||||||
* @param array $context Variables to replace { placeholder }
|
* @param array $context Variables to replace { placeholder }
|
||||||
*
|
|
||||||
* @return null
|
* @return null
|
||||||
*/
|
*/
|
||||||
public function log($level, $message, array $context = []): void
|
public function log($level, $message, array $context = [])
|
||||||
{
|
{
|
||||||
$logLevels = [
|
$logLevels = [
|
||||||
'emergency',
|
'emergency',
|
||||||
|
|
|
@ -14,14 +14,12 @@ class Redis
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Redis Client
|
* Redis Client
|
||||||
*
|
|
||||||
* @var \Credis_Client
|
* @var \Credis_Client
|
||||||
*/
|
*/
|
||||||
private $driver;
|
private $driver;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Redis namespace
|
* Redis namespace
|
||||||
*
|
|
||||||
* @var string
|
* @var string
|
||||||
*/
|
*/
|
||||||
private static $defaultNamespace = 'resque:';
|
private static $defaultNamespace = 'resque:';
|
||||||
|
@ -41,11 +39,6 @@ class Redis
|
||||||
*/
|
*/
|
||||||
public const DEFAULT_DATABASE = 0;
|
public const DEFAULT_DATABASE = 0;
|
||||||
|
|
||||||
/**
|
|
||||||
* Default Redis TTL (2 days)
|
|
||||||
*/
|
|
||||||
public const DEFAULT_REDIS_TTL = 172800;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @var array List of all commands in Redis that supply a key as their
|
* @var array List of all commands in Redis that supply a key as their
|
||||||
* first argument. Used to prefix keys with the Resque namespace.
|
* first argument. Used to prefix keys with the Resque namespace.
|
||||||
|
@ -97,6 +90,17 @@ class Redis
|
||||||
'rename',
|
'rename',
|
||||||
'rpoplpush'
|
'rpoplpush'
|
||||||
];
|
];
|
||||||
|
// sinterstore
|
||||||
|
// sunion
|
||||||
|
// sunionstore
|
||||||
|
// sdiff
|
||||||
|
// sdiffstore
|
||||||
|
// sinter
|
||||||
|
// smove
|
||||||
|
// mget
|
||||||
|
// msetnx
|
||||||
|
// mset
|
||||||
|
// renamenx
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set Redis namespace (prefix) default: resque
|
* Set Redis namespace (prefix) default: resque
|
||||||
|
@ -110,7 +114,6 @@ class Redis
|
||||||
if (substr($namespace, -1) !== ':' && $namespace != '') {
|
if (substr($namespace, -1) !== ':' && $namespace != '') {
|
||||||
$namespace .= ':';
|
$namespace .= ':';
|
||||||
}
|
}
|
||||||
|
|
||||||
self::$defaultNamespace = $namespace;
|
self::$defaultNamespace = $namespace;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ namespace Resque;
|
||||||
|
|
||||||
class Resque
|
class Resque
|
||||||
{
|
{
|
||||||
public const VERSION = '2.5.3';
|
public const VERSION = '2.1.0';
|
||||||
|
|
||||||
public const DEFAULT_INTERVAL = 5;
|
public const DEFAULT_INTERVAL = 5;
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ class Stat
|
||||||
$set = Resque::redis()->set(
|
$set = Resque::redis()->set(
|
||||||
'stat:' . $stat,
|
'stat:' . $stat,
|
||||||
$by,
|
$by,
|
||||||
['ex' => Redis::DEFAULT_REDIS_TTL, 'nx'],
|
['ex' => time() + 86400, 'nx'],
|
||||||
);
|
);
|
||||||
|
|
||||||
// If it already exists, return the incrby value
|
// If it already exists, return the incrby value
|
||||||
|
|
|
@ -83,7 +83,6 @@ class Worker
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return all workers known to Resque as instantiated instances.
|
* Return all workers known to Resque as instantiated instances.
|
||||||
*
|
|
||||||
* @return array
|
* @return array
|
||||||
*/
|
*/
|
||||||
public static function all(): array
|
public static function all(): array
|
||||||
|
@ -97,17 +96,14 @@ class Worker
|
||||||
foreach ($workers as $workerId) {
|
foreach ($workers as $workerId) {
|
||||||
$instances[] = self::find($workerId);
|
$instances[] = self::find($workerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return $instances;
|
return $instances;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a worker ID, check if it is registered/valid.
|
* Given a worker ID, check if it is registered/valid.
|
||||||
*
|
*
|
||||||
* @param string $workerId ID of the worker
|
* @param string $workerId ID of the worker.
|
||||||
*
|
* @return boolean True if the worker exists, false if not.
|
||||||
* @return boolean True if the worker exists, false if not
|
|
||||||
*
|
|
||||||
* @throws Resque_RedisException
|
* @throws Resque_RedisException
|
||||||
*/
|
*/
|
||||||
public static function exists($workerId): bool
|
public static function exists($workerId): bool
|
||||||
|
@ -118,10 +114,8 @@ class Worker
|
||||||
/**
|
/**
|
||||||
* Given a worker ID, find it and return an instantiated worker class for it.
|
* Given a worker ID, find it and return an instantiated worker class for it.
|
||||||
*
|
*
|
||||||
* @param string $workerId The ID of the worker
|
* @param string $workerId The ID of the worker.
|
||||||
*
|
* @return bool|Resque_Worker
|
||||||
* @return Resque_Worker|bool
|
|
||||||
*
|
|
||||||
* @throws Resque_RedisException
|
* @throws Resque_RedisException
|
||||||
*/
|
*/
|
||||||
public static function find($workerId)
|
public static function find($workerId)
|
||||||
|
@ -129,13 +123,11 @@ class Worker
|
||||||
if (false === strpos($workerId, ":") || !self::exists($workerId)) {
|
if (false === strpos($workerId, ":") || !self::exists($workerId)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @noinspection PhpUnusedLocalVariableInspection */
|
/** @noinspection PhpUnusedLocalVariableInspection */
|
||||||
list($hostname, $pid, $queues) = explode(':', $workerId, 3);
|
list($hostname, $pid, $queues) = explode(':', $workerId, 3);
|
||||||
$queues = explode(',', $queues);
|
$queues = explode(',', $queues);
|
||||||
$worker = new self($queues);
|
$worker = new self($queues);
|
||||||
$worker->setId($workerId);
|
$worker->setId($workerId);
|
||||||
|
|
||||||
return $worker;
|
return $worker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,10 +135,8 @@ class Worker
|
||||||
* Set the ID of this worker to a given ID string.
|
* Set the ID of this worker to a given ID string.
|
||||||
*
|
*
|
||||||
* @param string $workerId ID for the worker.
|
* @param string $workerId ID for the worker.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function setId($workerId): void
|
public function setId($workerId)
|
||||||
{
|
{
|
||||||
$this->id = $workerId;
|
$this->id = $workerId;
|
||||||
}
|
}
|
||||||
|
@ -160,11 +150,9 @@ class Worker
|
||||||
* @param int $interval How often to check for new jobs across the queues.
|
* @param int $interval How often to check for new jobs across the queues.
|
||||||
* @param bool $blocking
|
* @param bool $blocking
|
||||||
*
|
*
|
||||||
* @return void
|
|
||||||
*
|
|
||||||
* @throws Resque_RedisException
|
* @throws Resque_RedisException
|
||||||
*/
|
*/
|
||||||
public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false): void
|
public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
|
||||||
{
|
{
|
||||||
$this->updateProcLine('Starting');
|
$this->updateProcLine('Starting');
|
||||||
$this->startup();
|
$this->startup();
|
||||||
|
@ -263,13 +251,11 @@ class Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a single job
|
* Process a single job.
|
||||||
*
|
*
|
||||||
* @param \Resque\Job\Job $job The job to be processed
|
* @param \Resque\Job\Job $job The job to be processed.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function perform(\Resque\Job\Job $job): void
|
public function perform(\Resque\Job\Job $job)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
Event::trigger('afterFork', $job);
|
Event::trigger('afterFork', $job);
|
||||||
|
@ -287,8 +273,7 @@ class Worker
|
||||||
/**
|
/**
|
||||||
* @param bool $blocking
|
* @param bool $blocking
|
||||||
* @param int $timeout
|
* @param int $timeout
|
||||||
*
|
* @return object|boolean Instance of \Resque\Job\Job if a job is found, false if not.
|
||||||
* @return object|boolean - Instance of \Resque\Job\Job if a job is found, false if not
|
|
||||||
*/
|
*/
|
||||||
public function reserve($blocking = false, $timeout = null)
|
public function reserve($blocking = false, $timeout = null)
|
||||||
{
|
{
|
||||||
|
@ -319,17 +304,16 @@ class Worker
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return an array containing all of the queues that this worker should use
|
* Return an array containing all of the queues that this worker should use
|
||||||
* when searching for jobs
|
* when searching for jobs.
|
||||||
*
|
*
|
||||||
* If * is found in the list of queues, every queue will be searched in
|
* If * is found in the list of queues, every queue will be searched in
|
||||||
* alphabetic order. (@param boolean $fetch If true, and the queue is set to *, will fetch
|
* alphabetic order. (@param boolean $fetch If true, and the queue is set to *, will fetch
|
||||||
* all queue names from redis
|
* all queue names from redis.
|
||||||
|
* @return array Array of associated queues.
|
||||||
|
* @see $fetch)
|
||||||
*
|
*
|
||||||
* @param boolean $fetch
|
|
||||||
*
|
|
||||||
* @return array Array of associated queues
|
|
||||||
*/
|
*/
|
||||||
public function queues(bool $fetch = true): array
|
public function queues($fetch = true)
|
||||||
{
|
{
|
||||||
if (!in_array('*', $this->queues) || $fetch == false) {
|
if (!in_array('*', $this->queues) || $fetch == false) {
|
||||||
return $this->queues;
|
return $this->queues;
|
||||||
|
@ -337,16 +321,13 @@ class Worker
|
||||||
|
|
||||||
$queues = Resque::queues();
|
$queues = Resque::queues();
|
||||||
sort($queues);
|
sort($queues);
|
||||||
|
|
||||||
return $queues;
|
return $queues;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform necessary actions to start a worker
|
* Perform necessary actions to start a worker.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
private function startup(): void
|
private function startup()
|
||||||
{
|
{
|
||||||
$this->registerSigHandlers();
|
$this->registerSigHandlers();
|
||||||
$this->pruneDeadWorkers();
|
$this->pruneDeadWorkers();
|
||||||
|
@ -359,11 +340,9 @@ class Worker
|
||||||
* the name of the currently running process to indicate the current state
|
* the name of the currently running process to indicate the current state
|
||||||
* of a worker.
|
* of a worker.
|
||||||
*
|
*
|
||||||
* @param string $status The updated process title
|
* @param string $status The updated process title.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
private function updateProcLine($status): void
|
private function updateProcLine($status)
|
||||||
{
|
{
|
||||||
$processTitle = 'resque-' . Resque::VERSION . ': ' . $status;
|
$processTitle = 'resque-' . Resque::VERSION . ': ' . $status;
|
||||||
if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') {
|
if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') {
|
||||||
|
@ -380,10 +359,8 @@ class Worker
|
||||||
* INT: Shutdown immediately and stop processing jobs.
|
* INT: Shutdown immediately and stop processing jobs.
|
||||||
* QUIT: Shutdown after the current job finishes processing.
|
* QUIT: Shutdown after the current job finishes processing.
|
||||||
* USR1: Kill the forked child immediately and continue processing jobs.
|
* USR1: Kill the forked child immediately and continue processing jobs.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
private function registerSigHandlers(): void
|
private function registerSigHandlers()
|
||||||
{
|
{
|
||||||
if (!function_exists('pcntl_signal')) {
|
if (!function_exists('pcntl_signal')) {
|
||||||
return;
|
return;
|
||||||
|
@ -399,11 +376,9 @@ class Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signal handler callback for USR2, pauses processing of new jobs
|
* Signal handler callback for USR2, pauses processing of new jobs.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function pauseProcessing(): void
|
public function pauseProcessing()
|
||||||
{
|
{
|
||||||
$this->logger->log(\Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
|
$this->logger->log(\Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
|
||||||
$this->paused = true;
|
$this->paused = true;
|
||||||
|
@ -412,10 +387,8 @@ class Worker
|
||||||
/**
|
/**
|
||||||
* Signal handler callback for CONT, resumes worker allowing it to pick
|
* Signal handler callback for CONT, resumes worker allowing it to pick
|
||||||
* up new jobs.
|
* up new jobs.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function unPauseProcessing(): void
|
public function unPauseProcessing()
|
||||||
{
|
{
|
||||||
$this->logger->log(\Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
|
$this->logger->log(\Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
|
||||||
$this->paused = false;
|
$this->paused = false;
|
||||||
|
@ -424,10 +397,8 @@ class Worker
|
||||||
/**
|
/**
|
||||||
* Schedule a worker for shutdown. Will finish processing the current job
|
* Schedule a worker for shutdown. Will finish processing the current job
|
||||||
* and when the timeout interval is reached, the worker will shut down.
|
* and when the timeout interval is reached, the worker will shut down.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function shutdown(): void
|
public function shutdown()
|
||||||
{
|
{
|
||||||
$this->shutdown = true;
|
$this->shutdown = true;
|
||||||
$this->logger->log(\Psr\Log\LogLevel::NOTICE, 'Shutting down');
|
$this->logger->log(\Psr\Log\LogLevel::NOTICE, 'Shutting down');
|
||||||
|
@ -436,10 +407,8 @@ class Worker
|
||||||
/**
|
/**
|
||||||
* Force an immediate shutdown of the worker, killing any child jobs
|
* Force an immediate shutdown of the worker, killing any child jobs
|
||||||
* currently running.
|
* currently running.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function shutdownNow(): void
|
public function shutdownNow()
|
||||||
{
|
{
|
||||||
$this->shutdown();
|
$this->shutdown();
|
||||||
$this->killChild();
|
$this->killChild();
|
||||||
|
@ -448,10 +417,8 @@ class Worker
|
||||||
/**
|
/**
|
||||||
* Kill a forked child job immediately. The job it is processing will not
|
* Kill a forked child job immediately. The job it is processing will not
|
||||||
* be completed.
|
* be completed.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function killChild(): void
|
public function killChild()
|
||||||
{
|
{
|
||||||
if (!$this->child) {
|
if (!$this->child) {
|
||||||
$this->logger->log(\Psr\Log\LogLevel::DEBUG, 'No child to kill.');
|
$this->logger->log(\Psr\Log\LogLevel::DEBUG, 'No child to kill.');
|
||||||
|
@ -480,10 +447,8 @@ class Worker
|
||||||
* This is a form of garbage collection to handle cases where the
|
* This is a form of garbage collection to handle cases where the
|
||||||
* server may have been killed and the Resque workers did not die gracefully
|
* server may have been killed and the Resque workers did not die gracefully
|
||||||
* and therefore leave state information in Redis.
|
* and therefore leave state information in Redis.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function pruneDeadWorkers(): void
|
public function pruneDeadWorkers()
|
||||||
{
|
{
|
||||||
$workerPids = $this->workerPids();
|
$workerPids = $this->workerPids();
|
||||||
$workers = self::all();
|
$workers = self::all();
|
||||||
|
@ -509,7 +474,7 @@ class Worker
|
||||||
*
|
*
|
||||||
* @return array Array of Resque worker process IDs.
|
* @return array Array of Resque worker process IDs.
|
||||||
*/
|
*/
|
||||||
public function workerPids(): array
|
public function workerPids()
|
||||||
{
|
{
|
||||||
$pids = [];
|
$pids = [];
|
||||||
exec('ps -A -o pid,command | grep [r]esque', $cmdOutput);
|
exec('ps -A -o pid,command | grep [r]esque', $cmdOutput);
|
||||||
|
@ -521,26 +486,22 @@ class Worker
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register this worker in Redis.
|
* Register this worker in Redis.
|
||||||
* 48 hour TTL so we don't pollute the redis db on server termination.
|
* 48 hour TTL so we don't pollute the db on server termination.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function registerWorker(): void
|
public function registerWorker()
|
||||||
{
|
{
|
||||||
Resque::redis()->sadd('workers', (string)$this);
|
Resque::redis()->sadd('workers', (string)$this);
|
||||||
Resque::redis()->set(
|
Resque::redis()->set(
|
||||||
'worker:' . (string)$this . ':started',
|
'worker:' . (string)$this . ':started',
|
||||||
date('D M d H:i:s T Y'),
|
date('D M d H:i:s T Y'),
|
||||||
['ex' => Redis::DEFAULT_REDIS_TTL],
|
['ex' => time() + 86400],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unregister this worker in Redis. (shutdown etc)
|
* Unregister this worker in Redis. (shutdown etc)
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function unregisterWorker(): void
|
public function unregisterWorker()
|
||||||
{
|
{
|
||||||
if (is_object($this->currentJob)) {
|
if (is_object($this->currentJob)) {
|
||||||
$this->currentJob->fail(new \Resque\Job\DirtyExitException());
|
$this->currentJob->fail(new \Resque\Job\DirtyExitException());
|
||||||
|
@ -555,15 +516,12 @@ class Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tell Redis which job we're currently working on
|
* Tell Redis which job we're currently working on.
|
||||||
*
|
|
||||||
* @param \Resque\Job\Job $job \Resque\Job\Job instance containing the job we're working on
|
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*
|
*
|
||||||
|
* @param \Resque\Job\Job $job \Resque\Job\Job instance containing the job we're working on.
|
||||||
* @throws Resque_RedisException
|
* @throws Resque_RedisException
|
||||||
*/
|
*/
|
||||||
public function workingOn(\Resque\Job\Job $job): void
|
public function workingOn(\Resque\Job\Job $job)
|
||||||
{
|
{
|
||||||
$job->worker = $this;
|
$job->worker = $this;
|
||||||
$this->currentJob = $job;
|
$this->currentJob = $job;
|
||||||
|
@ -577,17 +535,15 @@ class Worker
|
||||||
Resque::redis()->set(
|
Resque::redis()->set(
|
||||||
'worker:' . $job->worker,
|
'worker:' . $job->worker,
|
||||||
$data,
|
$data,
|
||||||
['ex' => Redis::DEFAULT_REDIS_TTL],
|
['ex' => time() + 86400],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify Redis that we've finished working on a job, clearing the working
|
* Notify Redis that we've finished working on a job, clearing the working
|
||||||
* state and incrementing the job stats
|
* state and incrementing the job stats.
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function doneWorking(): void
|
public function doneWorking()
|
||||||
{
|
{
|
||||||
$this->currentJob = null;
|
$this->currentJob = null;
|
||||||
Stat::incr('processed');
|
Stat::incr('processed');
|
||||||
|
@ -596,29 +552,28 @@ class Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate a string representation of this worker
|
* Generate a string representation of this worker.
|
||||||
*
|
*
|
||||||
* @return string String identifier for this worker instance
|
* @return string String identifier for this worker instance.
|
||||||
*/
|
*/
|
||||||
public function __toString(): string
|
public function __toString()
|
||||||
{
|
{
|
||||||
return (string) $this->id;
|
return $this->id;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return an object describing the job this worker is currently working on
|
* Return an object describing the job this worker is currently working on.
|
||||||
*
|
*
|
||||||
* @return array Array with details of current job
|
* @return array Array with details of current job.
|
||||||
*/
|
*/
|
||||||
public function job(): array
|
public function job(): array
|
||||||
{
|
{
|
||||||
$job = Resque::redis()->get('worker:' . $this);
|
$job = Resque::redis()->get('worker:' . $this);
|
||||||
|
|
||||||
return $job ? json_decode($job, true) : [];
|
return $job ? json_decode($job, true) : [];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a statistic belonging to this worker
|
* Get a statistic belonging to this worker.
|
||||||
*
|
*
|
||||||
* @param string $stat Statistic to fetch.
|
* @param string $stat Statistic to fetch.
|
||||||
*
|
*
|
||||||
|
@ -633,10 +588,8 @@ class Worker
|
||||||
* Inject the logging object into the worker
|
* Inject the logging object into the worker
|
||||||
*
|
*
|
||||||
* @param \Psr\Log\LoggerInterface $logger
|
* @param \Psr\Log\LoggerInterface $logger
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
*/
|
||||||
public function setLogger(\Psr\Log\LoggerInterface $logger): void
|
public function setLogger(\Psr\Log\LoggerInterface $logger)
|
||||||
{
|
{
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ class RedisTest extends TestCase
|
||||||
$this->redis->set(
|
$this->redis->set(
|
||||||
'testKey',
|
'testKey',
|
||||||
24,
|
24,
|
||||||
['ex' => \Resque\Redis::DEFAULT_REDIS_TTL],
|
['ex' => time() + 3600],
|
||||||
);
|
);
|
||||||
|
|
||||||
$val = $this->redis->get("testKey");
|
$val = $this->redis->get("testKey");
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue