From 256b66c478ff0d4d7122ca9830abe7da8438fa87 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 24 Dec 2023 13:40:57 +0100 Subject: [PATCH] add retry logic --- src/Projection/Projection/Projection.php | 26 ++++ .../Projection/Store/DoctrineStore.php | 8 +- .../Projectionist/DefaultProjectionist.php | 132 +++++++++++------- .../DefaultProjectionistTest.php | 2 + 4 files changed, 119 insertions(+), 49 deletions(-) diff --git a/src/Projection/Projection/Projection.php b/src/Projection/Projection/Projection.php index 2180b32b1..16a181eb3 100644 --- a/src/Projection/Projection/Projection.php +++ b/src/Projection/Projection/Projection.php @@ -11,6 +11,7 @@ public function __construct( private ProjectionStatus $status = ProjectionStatus::New, private int $position = 0, private ProjectionError|null $error = null, + private int $retry = 0, ) { } @@ -86,4 +87,29 @@ public function isError(): bool { return $this->status === ProjectionStatus::Error; } + + public function incrementRetry(): void + { + $this->retry++; + } + + public function retry(): int + { + return $this->retry; + } + + public function resetRetry(): void + { + $this->retry = 0; + } + + public function disallowRetry(): void + { + $this->retry = -1; + } + + public function isRetryDisallowed(): bool + { + return $this->retry === -1; + } } diff --git a/src/Projection/Projection/Store/DoctrineStore.php b/src/Projection/Projection/Store/DoctrineStore.php index 55ee43a49..2b03090f7 100644 --- a/src/Projection/Projection/Store/DoctrineStore.php +++ b/src/Projection/Projection/Store/DoctrineStore.php @@ -23,7 +23,8 @@ * position: int, * status: string, * error_message: string|null, - * error_object: string|null + * error_object: string|null, + * retry: int, * } */ final class DoctrineStore implements ProjectionStore, SchemaConfigurator @@ -84,6 +85,7 @@ private function createProjection(array $row): Projection $row['error_message'], ErrorSerializer::unserialize($row['error_object']), ) : null, + $row['retry'], ); } @@ -102,6 +104,7 @@ function (Connection $connection) use ($projections): void { 'status' => $projection->status()->value, 'error_message' => $projection->projectionError()?->errorMessage, 'error_object' => $errorObject, + 'retry' => $projection->retry(), ], [ 'name' => $projection->id()->name(), @@ -122,6 +125,7 @@ function (Connection $connection) use ($projections): void { 'status' => $projection->status()->value, 'error_message' => $projection->projectionError()?->errorMessage, 'error_object' => $errorObject, + 'retry' => $projection->retry(), ], ); } @@ -160,6 +164,8 @@ public function configureSchema(Schema $schema, Connection $connection): void ->setNotnull(false); $table->addColumn('error_object', Types::BLOB) ->setNotnull(false); + $table->addColumn('retry', Types::INTEGER) + ->setNotnull(true); $table->setPrimaryKey(['name', 'version']); } diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index 2bb5e87ef..a4ad25a31 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -25,6 +25,8 @@ final class DefaultProjectionist implements Projectionist { + private const RETRY_LIMIT = 5; + /** @var array|null */ private array|null $projectors = null; @@ -89,9 +91,6 @@ public function boot( $e->getMessage(), )); - $projection->error(ProjectionError::fromThrowable($e)); - $this->projectionStore->save($projection); - if ($throwByError) { throw new ProjectionistError( $projector::class, @@ -99,6 +98,10 @@ public function boot( $e, ); } + + $projection->error(ProjectionError::fromThrowable($e)); + $projection->disallowRetry(); + $this->projectionStore->save($projection); } } @@ -162,22 +165,10 @@ public function run( int|null $limit = null, bool $throwByError = false, ): void { - $projections = $this->projections() - ->filterByProjectionStatus(ProjectionStatus::Active) - ->filterByCriteria($criteria); - - foreach ($projections as $projection) { - $projector = $this->projector($projection->id()); - - if ($projector) { - continue; - } - - $projection->outdated(); - $this->projectionStore->save($projection); + $projections = $this->projections()->filterByCriteria($criteria); - $this->logger?->info(sprintf('projection "%s" has been marked as outdated', $projection->id()->toString())); - } + $this->handleOutdatedProjections($projections); + $this->handleRetryProjections($projections); $projections = $projections->filterByProjectionStatus(ProjectionStatus::Active); @@ -379,44 +370,51 @@ private function handleMessage(Message $message, Projection $projection, bool $t $subscribeMethod = $this->projectorResolver->resolveSubscribeMethod($projector, $message); - if ($subscribeMethod) { - try { - $subscribeMethod($message); + if (!$subscribeMethod) { + $projection->incrementPosition(); + $this->projectionStore->save($projection); - $this->logger?->debug( - sprintf( - 'projector "%s" for "%s" processed the event "%s"', - $projector::class, - $projection->id()->toString(), - $message->event()::class, - ), - ); - } catch (Throwable $e) { - $this->logger?->error( - sprintf( - 'projector "%s" for "%s" could not process the event: %s', - $projector::class, - $projection->id()->toString(), - $e->getMessage(), - ), - ); + return; + } - $projection->error(ProjectionError::fromThrowable($e)); - $this->projectionStore->save($projection); + try { + $subscribeMethod($message); - if ($throwByError) { - throw new ProjectionistError( - $projector::class, - $projection->id(), - $e, - ); - } + $this->logger?->debug( + sprintf( + 'projector "%s" for "%s" processed the event "%s"', + $projector::class, + $projection->id()->toString(), + $message->event()::class, + ), + ); + } catch (Throwable $e) { + $this->logger?->error( + sprintf( + 'projector "%s" for "%s" could not process the event: %s', + $projector::class, + $projection->id()->toString(), + $e->getMessage(), + ), + ); - return; + if ($throwByError) { + throw new ProjectionistError( + $projector::class, + $projection->id(), + $e, + ); } + + $projection->error(ProjectionError::fromThrowable($e)); + $projection->incrementRetry(); + $this->projectionStore->save($projection); + + return; } $projection->incrementPosition(); + $projection->resetRetry(); $this->projectionStore->save($projection); } @@ -442,4 +440,42 @@ private function projectors(): array return $this->projectors; } + + private function handleOutdatedProjections(ProjectionCollection $projections): void + { + foreach ($projections as $projection) { + if ($projection->isRetryDisallowed()) { + continue; + } + + if (!$projection->isActive() && !$projection->isError()) { + continue; + } + + $projector = $this->projector($projection->id()); + + if ($projector) { + continue; + } + + $projection->outdated(); + $this->projectionStore->save($projection); + + $this->logger?->info(sprintf('projection "%s" has been marked as outdated', $projection->id()->toString())); + } + } + + private function handleRetryProjections(ProjectionCollection $projections): void + { + foreach ($projections->filterByProjectionStatus(ProjectionStatus::Error) as $projection) { + if ($projection->retry() >= self::RETRY_LIMIT) { + continue; + } + + $projection->active(); + $this->projectionStore->save($projection); + + $this->logger?->info(sprintf('retry projection "%s"', $projection->id()->toString())); + } + } } diff --git a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php index 6588ff3c7..05f749259 100644 --- a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php +++ b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php @@ -257,6 +257,7 @@ public function create(): void ProjectionStatus::Error, 0, new ProjectionError('ERROR', $projector->exception), + -1, ), ], $projectionStore->savedProjections, @@ -468,6 +469,7 @@ public function handle(Message $message): void ProjectionStatus::Error, 0, new ProjectionError('ERROR', $projector->exception), + 1, ), ], $projectionStore->savedProjections,