Skip to content

Commit

Permalink
Merge pull request #28 from lukewaite/fix_27_queue_work_batch
Browse files Browse the repository at this point in the history
Fix #27, queue should run
  • Loading branch information
lukewaite authored Apr 14, 2017
2 parents 73fbcfc + 8218bf8 commit 1ed8ca1
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 8 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes for 2.x

## [Unreleased]

### Fixed
* Fix `queue-work:batch` command to run without error
* Fix job failures and retries

## v2.0.0 (2017-04-09)

### Supported Version
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ will push your jobs into Batch. In this case, my queue name would be `first-run-
"memory": 256,
"command": [
"queue:work-batch",
"first-run-job-queue",
"Ref::jobId",
"--tries=3"
],
Expand Down
27 changes: 25 additions & 2 deletions src/Console/QueueWorkBatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Illuminate\Queue\Console\WorkCommand;
use Illuminate\Queue\QueueManager;
use Illuminate\Queue\Worker;
use Illuminate\Queue\WorkerOptions;
use LukeWaite\LaravelQueueAwsBatch\Exceptions\JobNotFoundException;
use LukeWaite\LaravelQueueAwsBatch\Exceptions\UnsupportedException;
use LukeWaite\LaravelQueueAwsBatch\Queues\BatchQueue;
Expand All @@ -27,7 +28,13 @@ class QueueWorkBatchCommand extends WorkCommand

protected $description = 'Run a Job for the AWS Batch queue';

protected $signature = 'queue:work-batch {connection} {job_id} {--tries=}';
protected $signature = 'queue:work-batch
{job_id : The job id in the database}
{connection? : The name of the queue connection to work}
{--memory=128 : The memory limit in megabytes}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}';


protected $manager;
protected $exceptions;
Expand All @@ -41,6 +48,8 @@ public function __construct(QueueManager $manager, Worker $worker, Handler $exce

public function fire()
{
$this->listenForEvents();

try {
$this->runJob();
} catch (\Exception $e) {
Expand Down Expand Up @@ -70,7 +79,7 @@ protected function runJob()
// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out.
if (!is_null($job)) {
$this->worker->process(
return $this->worker->process(
$this->manager->getName($connectionName),
$job,
$this->gatherWorkerOptions()
Expand All @@ -80,4 +89,18 @@ protected function runJob()
// If we hit this point, we haven't processed our job
throw new JobNotFoundException('No job was returned');
}

/**
* Gather all of the queue worker options as a single object.
*
* @return \Illuminate\Queue\WorkerOptions
*/
protected function gatherWorkerOptions()
{
return new WorkerOptions(
0, $this->option('memory'),
$this->option('timeout'), 0,
$this->option('tries'), false
);
}
}
24 changes: 21 additions & 3 deletions src/Queues/BatchQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Aws\Batch\BatchClient;
use Illuminate\Database\Connection;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use LukeWaite\LaravelQueueAwsBatch\Exceptions\JobNotFoundException;
use LukeWaite\LaravelQueueAwsBatch\Exceptions\UnsupportedException;
use LukeWaite\LaravelQueueAwsBatch\Jobs\BatchJob;
Expand Down Expand Up @@ -94,13 +95,31 @@ protected function pushToBatch($queue, $payload, $jobName)
return $jobId;
}

public function getJobById($id, $queue)
public function getJobById($id)
{
$job = $this->database->table($this->table)->where('id', $id)->first();
$this->database->beginTransaction();

$job = $this->database->table($this->table)
->lockForUpdate()
->where('id', $id)
->first();


if (!isset($job)) {
throw new JobNotFoundException('Could not find the job');
}

$job = new DatabaseJobRecord($job);

return $this->marshalJob($job->queue, $job);
}

protected function marshalJob($queue, $job)
{
$job = $this->markJobAsReserved($job);

$this->database->commit();

return new BatchJob(
$this->container,
$this,
Expand Down Expand Up @@ -128,7 +147,6 @@ public function release($queue, $job, $delay)

return $this->database->table($this->table)->where('id', $job->id)->update([
'attempts' => $job->attempts,
'reserved' => 0,
'reserved_at' => null
]);
}
Expand Down
18 changes: 16 additions & 2 deletions tests/BatchQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace LukeWaite\LaravelQueueAwsBatch\Tests;

use Carbon\Carbon;
use Mockery as m;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -71,12 +72,26 @@ public function testPushProperlySanitizesJobName()

public function testGetJobById()
{
$this->database->shouldReceive('table')->once()->with('table')->andReturn($query = m::mock('StdClass'));
$testDate = Carbon::create(2016, 9, 4, 16);
Carbon::setTestNow($testDate);

$this->database->shouldReceive('beginTransaction')->once();
$this->database->shouldReceive('table')->with('table')->andReturn($table = m::mock('StdClass'));
$table->shouldReceive('lockForUpdate')->once()->andReturn($query = m::mock('StdClass'));
$query->shouldReceive('where')->once()->with('id', 1)->andReturn($results = m::mock('StdClass'));
$results->shouldReceive('first')->once()->andReturn($queryResult = m::mock('StdClass'));
$queryResult->attempts = 0;
$queryResult->queue = 'default';
$queryResult->id = 1;

$table->shouldReceive('where')->once()->with('id', 1)->andReturn($reserved = m::mock('StdClass'));
$reserved->shouldReceive('update')->with(['reserved_at'=> 1473004800, 'attempts'=> 1])->once()->andReturn($job = m::mock('StdClass'));

$this->database->shouldReceive('commit')->once();

$this->queue->getJobById(1, 'default');

Carbon::setTestNow();
}

public function testRelease()
Expand All @@ -85,7 +100,6 @@ public function testRelease()
$table->shouldReceive('where')->once()->with('id', 4)->andReturn($query = m::mock('StdClass'));
$query->shouldReceive('update')->once()->with([
'attempts' => 1,
'reserved' => 0,
'reserved_at' => null,
])->andReturn(4);

Expand Down

0 comments on commit 1ed8ca1

Please sign in to comment.