php-resque/lib/Resque/Job.php

298 lines
7.3 KiB
PHP
Raw Normal View History

2010-04-18 13:58:43 +00:00
<?php
/**
* Resque job.
*
* @package Resque/Job
* @author Chris Boulton <chris@bigcommerce.com>
2010-04-18 13:58:43 +00:00
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job implements Resque_JobInterface
2010-04-18 13:58:43 +00:00
{
/**
* @var string The name of the queue that this job belongs to.
*/
public $queue;
/**
* @var Resque_Worker Instance of the Resque worker running this job.
*/
public $worker;
/**
* @var array Array containing details of the job.
2010-04-18 13:58:43 +00:00
*/
public $payload;
2011-03-27 07:42:46 +00:00
/**
* @var Resque_JobInterface Instance of the class performing work for this job.
2011-03-27 07:42:46 +00:00
*/
private $instance;
2010-04-18 13:58:43 +00:00
/**
* @var Resque_Job_FactoryInterface
*/
private $jobFactory;
2010-04-18 13:58:43 +00:00
/**
* Instantiate a new instance of a job.
*
* @param string $queue The queue that the job belongs to.
* @param array $payload array containing details of the job.
2010-04-18 13:58:43 +00:00
*/
public function __construct($queue, $payload)
{
$this->queue = $queue;
$this->payload = $payload;
}
/**
* Create a new job and save it to the specified queue.
*
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $monitor Set to true to be able to monitor the status of a job.
* @param string $id Unique identifier for tracking the job. Generated if not supplied.
*
* @return string
* @throws \InvalidArgumentException
*/
public static function create($queue, $class, $args = null, $monitor = false, $id = null)
2010-04-18 13:58:43 +00:00
{
if (is_null($id)) {
$id = Resque::generateJobId();
}
if($args !== null && !is_array($args)) {
2010-04-18 13:58:43 +00:00
throw new InvalidArgumentException(
'Supplied $args must be an array.'
2010-04-18 13:58:43 +00:00
);
}
Resque::push($queue, array(
'class' => $class,
'args' => array($args),
2010-04-18 13:58:43 +00:00
'id' => $id,
'queue_time' => microtime(true),
2010-04-18 13:58:43 +00:00
));
if($monitor) {
Resque_Job_Status::create($id);
}
return $id;
}
2013-03-12 10:18:37 +00:00
/**
* Find the next available job from the specified queue and return an
* instance of Resque_Job for it.
*
* @param string $queue The name of the queue to check for a job in.
* @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
2013-03-12 10:18:37 +00:00
*/
public static function reserve($queue)
{
$payload = Resque::pop($queue);
if(!is_array($payload)) {
return false;
}
2010-04-18 13:58:43 +00:00
2013-03-12 10:18:37 +00:00
return new Resque_Job($queue, $payload);
}
/**
2013-03-13 11:41:32 +00:00
* Find the next available job from the specified queues using blocking list pop
* and return an instance of Resque_Job for it.
2013-03-12 10:18:37 +00:00
*
2013-03-13 11:41:32 +00:00
* @param array $queues
* @param int $timeout
* @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
2013-03-12 10:18:37 +00:00
*/
2013-03-13 11:41:32 +00:00
public static function reserveBlocking(array $queues, $timeout = null)
2013-03-12 10:18:37 +00:00
{
2013-05-29 07:29:24 +00:00
$item = Resque::blpop($queues, $timeout);
if(!is_array($item)) {
2013-03-12 10:18:37 +00:00
return false;
}
2013-05-29 07:29:24 +00:00
return new Resque_Job($item['queue'], $item['payload']);
2013-03-12 10:18:37 +00:00
}
2010-04-18 13:58:43 +00:00
/**
* Update the status of the current job.
*
* @param int $status Status constant from Resque_Job_Status indicating the current status of a job.
*/
public function updateStatus($status)
{
2010-12-11 05:25:33 +00:00
if(empty($this->payload['id'])) {
2010-04-18 13:58:43 +00:00
return;
}
$statusInstance = new Resque_Job_Status($this->payload['id']);
2010-04-18 13:58:43 +00:00
$statusInstance->update($status);
}
/**
* Return the status of the current job.
*
* @return int The status of the job as one of the Resque_Job_Status constants.
*/
public function getStatus()
{
$status = new Resque_Job_Status($this->payload['id']);
2010-04-18 13:58:43 +00:00
return $status->get();
}
2010-04-18 13:58:43 +00:00
/**
2011-03-27 07:42:46 +00:00
* Get the arguments supplied to this job.
2010-04-18 13:58:43 +00:00
*
2011-03-27 07:42:46 +00:00
* @return array Array of arguments.
2010-04-18 13:58:43 +00:00
*/
2011-03-27 07:42:46 +00:00
public function getArguments()
2010-04-18 13:58:43 +00:00
{
2011-03-27 07:42:46 +00:00
if (!isset($this->payload['args'])) {
return array();
}
return $this->payload['args'][0];
2011-03-27 07:42:46 +00:00
}
/**
* Get the instantiated object for this job that will be performing work.
* @return Resque_JobInterface Instance of the object that this job belongs to.
* @throws Resque_Exception
*/
2011-03-27 07:42:46 +00:00
public function getInstance()
{
if (!is_null($this->instance)) {
return $this->instance;
}
if(!class_exists($this->payload['class'])) {
2010-04-18 13:58:43 +00:00
throw new Resque_Exception(
'Could not find job class ' . $this->payload['class'] . '.'
2010-04-18 13:58:43 +00:00
);
}
if(!method_exists($this->payload['class'], 'perform')) {
2010-04-18 13:58:43 +00:00
throw new Resque_Exception(
'Job class ' . $this->payload['class'] . ' does not contain a perform method.'
2010-04-18 13:58:43 +00:00
);
}
if ($this->jobFactory !== null) {
$this->instance = $this->jobFactory->create($this->payload['class'], $this->getArguments(), $this->queue);
return $this->instance;
}
$this->instance = new $this->payload['class'];
$this->instance->job = $this;
$this->instance->args = $this->getArguments();
$this->instance->queue = $this->queue;
2011-03-27 07:42:46 +00:00
return $this->instance;
}
2011-03-27 07:42:46 +00:00
/**
* Actually execute a job by calling the perform method on the class
* associated with the job with the supplied arguments.
*
2012-03-02 01:21:06 +00:00
* @return bool
2011-03-27 07:42:46 +00:00
* @throws Resque_Exception When the job's class could not be found or it does not contain a perform method.
*/
public function perform()
{
try {
Resque_Event::trigger('beforePerform', $this);
$instance = $this->getInstance();
2011-03-27 07:42:46 +00:00
if(method_exists($instance, 'setUp')) {
$instance->setUp();
}
$instance->perform();
2010-04-18 13:58:43 +00:00
2011-03-27 07:42:46 +00:00
if(method_exists($instance, 'tearDown')) {
$instance->tearDown();
}
2011-03-27 07:42:46 +00:00
Resque_Event::trigger('afterPerform', $this);
}
// beforePerform/setUp have said don't perform this job. Return.
catch(Resque_Job_DontPerform $e) {
return false;
}
2011-03-27 07:42:46 +00:00
return true;
2010-04-18 13:58:43 +00:00
}
/**
* Mark the current job as having failed.
2012-03-02 01:21:06 +00:00
*
* @param $exception
2010-04-18 13:58:43 +00:00
*/
public function fail($exception)
{
Resque_Event::trigger('onFailure', array(
'exception' => $exception,
'job' => $this,
));
2010-04-18 13:58:43 +00:00
$this->updateStatus(Resque_Job_Status::STATUS_FAILED);
Resque_Failure::create(
$this->payload,
$exception,
$this->worker,
$this->queue
);
Resque_Stat::incr('failed');
Resque_Stat::incr('failed:' . $this->worker);
}
/**
* Re-queue the current job.
2012-03-02 01:21:06 +00:00
* @return string
2010-04-18 13:58:43 +00:00
*/
public function recreate()
{
$status = new Resque_Job_Status($this->payload['id']);
2010-04-18 13:58:43 +00:00
$monitor = false;
if($status->isTracking()) {
$monitor = true;
}
return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor);
2010-04-18 13:58:43 +00:00
}
/**
* Generate a string representation used to describe the current job.
*
* @return string The string representation of the job.
*/
public function __toString()
{
$name = array(
'Job{' . $this->queue .'}'
);
if(!empty($this->payload['id'])) {
$name[] = 'ID: ' . $this->payload['id'];
}
$name[] = $this->payload['class'];
if(!empty($this->payload['args'])) {
$name[] = json_encode($this->payload['args']);
2010-04-18 13:58:43 +00:00
}
return '(' . implode(' | ', $name) . ')';
}
/**
* @param Resque_Job_FactoryInterface $jobFactory
* @return Resque_Job
*/
public function setJobFactory(Resque_Job_FactoryInterface $jobFactory)
{
$this->jobFactory = $jobFactory;
return $this;
}
2010-04-18 13:58:43 +00:00
}