From 8218bf8954ebfd7e171f0b053f11729fbcc1b3dc Mon Sep 17 00:00:00 2001 From: Luke Waite Date: Thu, 13 Apr 2017 21:24:54 -0400 Subject: [PATCH] Fix #27, queue should run --- CHANGELOG.md | 6 ++++++ README.md | 1 - src/Console/QueueWorkBatchCommand.php | 27 +++++++++++++++++++++++++-- src/Queues/BatchQueue.php | 24 +++++++++++++++++++++--- tests/BatchQueueTest.php | 18 ++++++++++++++++-- 5 files changed, 68 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5968d8c..c06f157 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Release Notes for 2.x +## [Unreleased] + +### Fixed +* Fix `queue-work:batch` command to run without error +* Fix job failures and retries + ## v2.0.0 (2017-04-09) ### Supported Version diff --git a/README.md b/README.md index f2d7220..979c0e1 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,6 @@ will push your jobs into Batch. In this case, my queue name would be `first-run- "memory": 256, "command": [ "queue:work-batch", - "first-run-job-queue", "Ref::jobId", "--tries=3" ], diff --git a/src/Console/QueueWorkBatchCommand.php b/src/Console/QueueWorkBatchCommand.php index 0821dac..52f8194 100644 --- a/src/Console/QueueWorkBatchCommand.php +++ b/src/Console/QueueWorkBatchCommand.php @@ -16,6 +16,7 @@ use Illuminate\Queue\Console\WorkCommand; use Illuminate\Queue\QueueManager; use Illuminate\Queue\Worker; +use Illuminate\Queue\WorkerOptions; use LukeWaite\LaravelQueueAwsBatch\Exceptions\JobNotFoundException; use LukeWaite\LaravelQueueAwsBatch\Exceptions\UnsupportedException; use LukeWaite\LaravelQueueAwsBatch\Queues\BatchQueue; @@ -27,7 +28,13 @@ class QueueWorkBatchCommand extends WorkCommand protected $description = 'Run a Job for the AWS Batch queue'; - protected $signature = 'queue:work-batch {connection} {job_id} {--tries=}'; + protected $signature = 'queue:work-batch + {job_id : The job id in the database} + {connection? : The name of the queue connection to work} + {--memory=128 : The memory limit in megabytes} + {--timeout=60 : The number of seconds a child process can run} + {--tries=0 : Number of times to attempt a job before logging it failed}'; + protected $manager; protected $exceptions; @@ -41,6 +48,8 @@ public function __construct(QueueManager $manager, Worker $worker, Handler $exce public function fire() { + $this->listenForEvents(); + try { $this->runJob(); } catch (\Exception $e) { @@ -70,7 +79,7 @@ protected function runJob() // If we're able to pull a job off of the stack, we will process it and // then immediately return back out. if (!is_null($job)) { - $this->worker->process( + return $this->worker->process( $this->manager->getName($connectionName), $job, $this->gatherWorkerOptions() @@ -80,4 +89,18 @@ protected function runJob() // If we hit this point, we haven't processed our job throw new JobNotFoundException('No job was returned'); } + + /** + * Gather all of the queue worker options as a single object. + * + * @return \Illuminate\Queue\WorkerOptions + */ + protected function gatherWorkerOptions() + { + return new WorkerOptions( + 0, $this->option('memory'), + $this->option('timeout'), 0, + $this->option('tries'), false + ); + } } diff --git a/src/Queues/BatchQueue.php b/src/Queues/BatchQueue.php index 6f8c9e5..f8e450c 100644 --- a/src/Queues/BatchQueue.php +++ b/src/Queues/BatchQueue.php @@ -14,6 +14,7 @@ use Aws\Batch\BatchClient; use Illuminate\Database\Connection; use Illuminate\Queue\DatabaseQueue; +use Illuminate\Queue\Jobs\DatabaseJobRecord; use LukeWaite\LaravelQueueAwsBatch\Exceptions\JobNotFoundException; use LukeWaite\LaravelQueueAwsBatch\Exceptions\UnsupportedException; use LukeWaite\LaravelQueueAwsBatch\Jobs\BatchJob; @@ -94,13 +95,31 @@ protected function pushToBatch($queue, $payload, $jobName) return $jobId; } - public function getJobById($id, $queue) + public function getJobById($id) { - $job = $this->database->table($this->table)->where('id', $id)->first(); + $this->database->beginTransaction(); + + $job = $this->database->table($this->table) + ->lockForUpdate() + ->where('id', $id) + ->first(); + + if (!isset($job)) { throw new JobNotFoundException('Could not find the job'); } + $job = new DatabaseJobRecord($job); + + return $this->marshalJob($job->queue, $job); + } + + protected function marshalJob($queue, $job) + { + $job = $this->markJobAsReserved($job); + + $this->database->commit(); + return new BatchJob( $this->container, $this, @@ -128,7 +147,6 @@ public function release($queue, $job, $delay) return $this->database->table($this->table)->where('id', $job->id)->update([ 'attempts' => $job->attempts, - 'reserved' => 0, 'reserved_at' => null ]); } diff --git a/tests/BatchQueueTest.php b/tests/BatchQueueTest.php index 083051f..e41f3b1 100644 --- a/tests/BatchQueueTest.php +++ b/tests/BatchQueueTest.php @@ -2,6 +2,7 @@ namespace LukeWaite\LaravelQueueAwsBatch\Tests; +use Carbon\Carbon; use Mockery as m; use PHPUnit\Framework\TestCase; @@ -71,12 +72,26 @@ public function testPushProperlySanitizesJobName() public function testGetJobById() { - $this->database->shouldReceive('table')->once()->with('table')->andReturn($query = m::mock('StdClass')); + $testDate = Carbon::create(2016, 9, 4, 16); + Carbon::setTestNow($testDate); + + $this->database->shouldReceive('beginTransaction')->once(); + $this->database->shouldReceive('table')->with('table')->andReturn($table = m::mock('StdClass')); + $table->shouldReceive('lockForUpdate')->once()->andReturn($query = m::mock('StdClass')); $query->shouldReceive('where')->once()->with('id', 1)->andReturn($results = m::mock('StdClass')); $results->shouldReceive('first')->once()->andReturn($queryResult = m::mock('StdClass')); $queryResult->attempts = 0; + $queryResult->queue = 'default'; + $queryResult->id = 1; + + $table->shouldReceive('where')->once()->with('id', 1)->andReturn($reserved = m::mock('StdClass')); + $reserved->shouldReceive('update')->with(['reserved_at'=> 1473004800, 'attempts'=> 1])->once()->andReturn($job = m::mock('StdClass')); + + $this->database->shouldReceive('commit')->once(); $this->queue->getJobById(1, 'default'); + + Carbon::setTestNow(); } public function testRelease() @@ -85,7 +100,6 @@ public function testRelease() $table->shouldReceive('where')->once()->with('id', 4)->andReturn($query = m::mock('StdClass')); $query->shouldReceive('update')->once()->with([ 'attempts' => 1, - 'reserved' => 0, 'reserved_at' => null, ])->andReturn(4);