From 4ebeacc4a3e2cf33a20e93064e18ee3aebcb336e Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 3 Mar 2024 11:19:28 +0100 Subject: [PATCH] improve batch processing in projectionist --- .../Projectionist/DefaultProjectionist.php | 28 +++++++++++++++---- .../DefaultProjectionistTest.php | 28 +++++++++---------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index 850539c1b..6e566fda7 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -84,14 +84,13 @@ function ($projections) use ($limit): void { ); $stream = null; + $messageCounter = 0; try { $stream = $this->messageStore->load( new Criteria(fromIndex: $startIndex), ); - $messageCounter = 0; - foreach ($stream as $message) { $index = $stream->index(); @@ -141,6 +140,16 @@ function ($projections) use ($limit): void { } } } finally { + if ($messageCounter > 0) { + foreach ($projections as $projection) { + if (!$projection->isBooting()) { + continue; + } + + $this->projectionStore->update($projection); + } + } + $stream?->close(); } @@ -212,13 +221,12 @@ function (array $projections) use ($limit): void { ); $stream = null; + $messageCounter = 0; try { $criteria = new Criteria(fromIndex: $startIndex); $stream = $this->messageStore->load($criteria); - $messageCounter = 0; - foreach ($stream as $message) { $index = $stream->index(); @@ -263,6 +271,16 @@ function (array $projections) use ($limit): void { } } } finally { + if ($messageCounter > 0) { + foreach ($projections as $projection) { + if (!$projection->isActive()) { + continue; + } + + $this->projectionStore->update($projection); + } + } + $stream?->close(); } @@ -547,7 +565,6 @@ private function handleMessage(int $index, Message $message, Projection $project if ($subscribeMethods === []) { $projection->changePosition($index); - $this->projectionStore->update($projection); $this->logger?->debug( sprintf( @@ -583,7 +600,6 @@ private function handleMessage(int $index, Message $message, Projection $project $projection->changePosition($index); $projection->resetRetry(); - $this->projectionStore->update($projection); $this->logger?->debug( sprintf( diff --git a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php index 1ab1830d9..335065027 100644 --- a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php +++ b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php @@ -351,6 +351,13 @@ public function handle(Message $message): void ProjectionStatus::Booting, 1, ), + new Projection( + $projectionId2, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + 1, + ), new Projection( $projectionId1, Projection::DEFAULT_GROUP, @@ -461,13 +468,6 @@ public function handle(Message $message): void $projectionist->boot(); self::assertEquals([ - new Projection( - $projectionId, - Projection::DEFAULT_GROUP, - RunMode::FromBeginning, - ProjectionStatus::Booting, - 1, - ), new Projection( $projectionId, Projection::DEFAULT_GROUP, @@ -834,6 +834,13 @@ public function handle(Message $message): void ProjectionStatus::Active, 1, ), + new Projection( + $projectionId2, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), ], $projectionStore->updatedProjections); self::assertSame($message, $projector1->message); @@ -999,13 +1006,6 @@ public function handle(Message $message): void $projectionist->run(); self::assertEquals([ - new Projection( - $projectionId, - Projection::DEFAULT_GROUP, - RunMode::FromBeginning, - ProjectionStatus::Active, - 1, - ), new Projection( $projectionId, Projection::DEFAULT_GROUP,