From d592037f691d536ca96c9e9957510585c7918d38 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 20 Feb 2024 12:59:47 +0100 Subject: [PATCH 1/5] overhaul error handling in projectionist --- docs/pages/projection.md | 8 +- phpstan-baseline.neon | 2 +- src/Console/Command/ProjectionBootCommand.php | 9 +- .../Command/ProjectionRebuildCommand.php | 19 +- src/Console/Command/ProjectionRunCommand.php | 13 +- .../Command/ProjectionStatusCommand.php | 4 +- src/Projection/Projection/Projection.php | 45 ++-- src/Projection/Projection/ProjectionError.php | 11 +- .../Projection/Store/DoctrineStore.php | 50 ++++- ...=> ThrowableToErrorContextTransformer.php} | 10 +- .../Projectionist/DefaultProjectionist.php | 119 +++++----- .../Projectionist/Projectionist.php | 6 +- .../RetryStrategy/DefaultRetryStrategy.php | 82 +++++++ .../RetryStrategy/NoRetryStrategy.php | 18 ++ src/Projection/RetryStrategy/Retry.php | 31 +++ .../RetryStrategy/RetryStrategy.php | 12 + .../IntegrationTest.php | 2 +- .../BasicIntegrationTest.php | 4 +- .../Projection/ErrorProducerProjector.php | 44 ++++ .../Projectionist/ProjectionistTest.php | 205 +++++++++++++++++- .../Projection/ErrorContextTest.php | 4 +- .../Projection/ProjectionErrorTest.php | 7 +- .../Projection/Projection/ProjectionTest.php | 24 +- .../Projection/Store/ErrorContextTest.php | 8 +- .../DefaultProjectionistTest.php | 109 +++++++++- .../DefaultRetryStrategyTest.php | 87 ++++++++ .../RetryStrategy/NoRetryStrategyTest.php | 54 +++++ 27 files changed, 830 insertions(+), 157 deletions(-) rename src/Projection/Projection/{Store/ErrorContext.php => ThrowableToErrorContextTransformer.php} (79%) create mode 100644 src/Projection/RetryStrategy/DefaultRetryStrategy.php create mode 100644 src/Projection/RetryStrategy/NoRetryStrategy.php create mode 100644 src/Projection/RetryStrategy/Retry.php create mode 100644 src/Projection/RetryStrategy/RetryStrategy.php create mode 100644 tests/Integration/Projectionist/Projection/ErrorProducerProjector.php create mode 100644 tests/Unit/Projection/RetryStrategy/DefaultRetryStrategyTest.php create mode 100644 tests/Unit/Projection/RetryStrategy/NoRetryStrategyTest.php diff --git a/docs/pages/projection.md b/docs/pages/projection.md index c88e54b60..88f8e4a88 100644 --- a/docs/pages/projection.md +++ b/docs/pages/projection.md @@ -322,14 +322,18 @@ stateDiagram-v2 direction LR [*] --> New New --> Booting + New --> Error Booting --> Active + Booting --> Finished Booting --> Error + Active --> Finished Active --> Outdated Active --> Error - Active --> Finished + Finished --> Active Finished --> Outdated + Error --> New + Error --> Booting Error --> Active - Error --> [*] Outdated --> Active Outdated --> [*] ``` diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 8d4101dc7..a17df2de0 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -11,7 +11,7 @@ parameters: path: src/EventBus/Message.php - - message: "#^Parameter \\#2 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Projection\\\\Projection\\\\ProjectionError constructor expects array\\\\}\\>\\|null, mixed given\\.$#" + message: "#^Parameter \\#3 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Projection\\\\Projection\\\\ProjectionError constructor expects array\\\\}\\>\\|null, mixed given\\.$#" count: 1 path: src/Projection/Projection/Store/DoctrineStore.php diff --git a/src/Console/Command/ProjectionBootCommand.php b/src/Console/Command/ProjectionBootCommand.php index d52d0b339..387224dba 100644 --- a/src/Console/Command/ProjectionBootCommand.php +++ b/src/Console/Command/ProjectionBootCommand.php @@ -26,22 +26,15 @@ public function configure(): void null, InputOption::VALUE_REQUIRED, 'How many messages should be consumed in one run', - ) - ->addOption( - 'throw-by-error', - null, - InputOption::VALUE_NONE, - 'throw exception by error', ); } protected function execute(InputInterface $input, OutputInterface $output): int { $limit = InputHelper::nullablePositiveInt($input->getOption('limit')); - $throwByError = InputHelper::bool($input->getOption('throw-by-error')); $criteria = $this->projectionCriteria($input); - $this->projectionist->boot($criteria, $limit, $throwByError); + $this->projectionist->boot($criteria, $limit); return 0; } diff --git a/src/Console/Command/ProjectionRebuildCommand.php b/src/Console/Command/ProjectionRebuildCommand.php index e9b4a1b74..137333d31 100644 --- a/src/Console/Command/ProjectionRebuildCommand.php +++ b/src/Console/Command/ProjectionRebuildCommand.php @@ -4,11 +4,9 @@ namespace Patchlevel\EventSourcing\Console\Command; -use Patchlevel\EventSourcing\Console\InputHelper; use Patchlevel\EventSourcing\Console\OutputStyle; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; #[AsCommand( @@ -17,19 +15,6 @@ )] final class ProjectionRebuildCommand extends ProjectionCommand { - public function configure(): void - { - parent::configure(); - - $this - ->addOption( - 'throw-by-error', - null, - InputOption::VALUE_NONE, - 'throw exception by error', - ); - } - protected function execute(InputInterface $input, OutputInterface $output): int { $io = new OutputStyle($input, $output); @@ -40,10 +25,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 1; } - $throwByError = InputHelper::bool($input->getOption('throw-by-error')); - $this->projectionist->remove($criteria); - $this->projectionist->boot($criteria, null, $throwByError); + $this->projectionist->boot($criteria, null); return 0; } diff --git a/src/Console/Command/ProjectionRunCommand.php b/src/Console/Command/ProjectionRunCommand.php index a58c7fafd..1a6d79381 100644 --- a/src/Console/Command/ProjectionRunCommand.php +++ b/src/Console/Command/ProjectionRunCommand.php @@ -55,12 +55,6 @@ protected function configure(): void 'How much time should elapse before the next job is executed in milliseconds', 1000, ) - ->addOption( - 'throw-by-error', - null, - InputOption::VALUE_NONE, - 'throw exception by error', - ) ->addOption( 'rebuild', null, @@ -76,7 +70,6 @@ protected function execute(InputInterface $input, OutputInterface $output): int $memoryLimit = InputHelper::nullableString($input->getOption('memory-limit')); $timeLimit = InputHelper::nullablePositiveInt($input->getOption('time-limit')); $sleep = InputHelper::positiveIntOrZero($input->getOption('sleep')); - $throwByError = InputHelper::bool($input->getOption('throw-by-error')); $rebuild = InputHelper::bool($input->getOption('rebuild')); $criteria = $this->projectionCriteria($input); @@ -84,8 +77,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int $logger = new ConsoleLogger($output); $worker = DefaultWorker::create( - function () use ($criteria, $messageLimit, $throwByError): void { - $this->projectionist->run($criteria, $messageLimit, $throwByError); + function () use ($criteria, $messageLimit): void { + $this->projectionist->run($criteria, $messageLimit); }, [ 'runLimit' => $runLimit, @@ -97,7 +90,7 @@ function () use ($criteria, $messageLimit, $throwByError): void { if ($rebuild) { $this->projectionist->remove($criteria); - $this->projectionist->boot($criteria, null, $throwByError); + $this->projectionist->boot($criteria); } $worker->run($sleep); diff --git a/src/Console/Command/ProjectionStatusCommand.php b/src/Console/Command/ProjectionStatusCommand.php index 391eca805..52224ef7a 100644 --- a/src/Console/Command/ProjectionStatusCommand.php +++ b/src/Console/Command/ProjectionStatusCommand.php @@ -7,8 +7,8 @@ use Patchlevel\EventSourcing\Console\InputHelper; use Patchlevel\EventSourcing\Console\OutputStyle; use Patchlevel\EventSourcing\Projection\Projection\Projection; +use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound; -use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; @@ -18,7 +18,7 @@ use function is_array; use function sprintf; -/** @psalm-import-type Context from ErrorContext */ +/** @psalm-import-type Context from ProjectionError */ #[AsCommand( 'event-sourcing:projection:status', 'View the current status of the projections', diff --git a/src/Projection/Projection/Projection.php b/src/Projection/Projection/Projection.php index 1d3078ce0..5024d5a2d 100644 --- a/src/Projection/Projection/Projection.php +++ b/src/Projection/Projection/Projection.php @@ -4,6 +4,9 @@ namespace Patchlevel\EventSourcing\Projection\Projection; +use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry; +use Throwable; + final class Projection { public const DEFAULT_GROUP = 'default'; @@ -15,7 +18,7 @@ public function __construct( private ProjectionStatus $status = ProjectionStatus::New, private int $position = 0, private ProjectionError|null $error = null, - private int $retry = 0, + private Retry|null $retry = null, ) { } @@ -54,6 +57,12 @@ public function changePosition(int $position): void $this->position = $position; } + public function new(): void + { + $this->status = ProjectionStatus::New; + $this->error = null; + } + public function isNew(): bool { return $this->status === ProjectionStatus::New; @@ -102,10 +111,18 @@ public function isOutdated(): bool return $this->status === ProjectionStatus::Outdated; } - public function error(ProjectionError|null $error = null): void + public function error(Throwable|string $error): void { + $previousStatus = $this->status; $this->status = ProjectionStatus::Error; - $this->error = $error; + + if ($error instanceof Throwable) { + $this->error = ProjectionError::fromThrowable($previousStatus, $error); + + return; + } + + $this->error = new ProjectionError($error, $previousStatus); } public function isError(): bool @@ -113,28 +130,28 @@ public function isError(): bool return $this->status === ProjectionStatus::Error; } - public function incrementRetry(): void + public function updateRetry(Retry|null $retry): void { - $this->retry++; + $this->retry = $retry; } - public function retry(): int + public function retry(): Retry|null { return $this->retry; } - public function resetRetry(): void + public function doRetry(): void { - $this->retry = 0; - } + if ($this->error === null) { + return; + } - public function disallowRetry(): void - { - $this->retry = -1; + $this->status = $this->error->previousStatus; + $this->error = null; } - public function isRetryDisallowed(): bool + public function resetRetry(): void { - return $this->retry === -1; + $this->retry = null; } } diff --git a/src/Projection/Projection/ProjectionError.php b/src/Projection/Projection/ProjectionError.php index 1500b768d..dc221bb37 100644 --- a/src/Projection/Projection/ProjectionError.php +++ b/src/Projection/Projection/ProjectionError.php @@ -4,21 +4,24 @@ namespace Patchlevel\EventSourcing\Projection\Projection; -use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext; use Throwable; -/** @psalm-import-type Context from ErrorContext */ +/** + * @psalm-type Trace = array{file?: string, line?: int, function?: string, class?: string, type?: string, args?: array} + * @psalm-type Context = array{class: class-string, message: string, code: int|string, file: string, line: int, trace: list} + */ final class ProjectionError { /** @param list|null $errorContext */ public function __construct( public readonly string $errorMessage, + public readonly ProjectionStatus $previousStatus, public readonly array|null $errorContext = null, ) { } - public static function fromThrowable(Throwable $error): self + public static function fromThrowable(ProjectionStatus $projectionStatus, Throwable $error): self { - return new self($error->getMessage(), ErrorContext::fromThrowable($error)); + return new self($error->getMessage(), $projectionStatus, ThrowableToErrorContextTransformer::transform($error)); } } diff --git a/src/Projection/Projection/Store/DoctrineStore.php b/src/Projection/Projection/Store/DoctrineStore.php index 16995838f..d303b88fa 100644 --- a/src/Projection/Projection/Store/DoctrineStore.php +++ b/src/Projection/Projection/Store/DoctrineStore.php @@ -5,11 +5,14 @@ namespace Patchlevel\EventSourcing\Projection\Projection\Store; use Closure; +use DateTimeImmutable; use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\DriverException; +use Doctrine\DBAL\Platforms\AbstractPlatform; use Doctrine\DBAL\Platforms\SQLitePlatform; use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; use Patchlevel\EventSourcing\Projection\Projection\Projection; use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; @@ -17,9 +20,11 @@ use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound; use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; use Patchlevel\EventSourcing\Projection\Projection\RunMode; +use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry; use Patchlevel\EventSourcing\Schema\SchemaConfigurator; use function array_map; +use function assert; use function json_decode; use function json_encode; @@ -32,8 +37,10 @@ * position: int, * status: string, * error_message: string|null, + * error_previous_status: string|null, * error_context: string|null, - * retry: int, + * retry_attempt: int|null, + * retry_next: string|null, * } */ final class DoctrineStore implements LockableProjectionStore, SchemaConfigurator @@ -114,6 +121,7 @@ public function find(ProjectionCriteria|null $criteria = null): array public function add(Projection $projection): void { $projectionError = $projection->projectionError(); + $projectionRetry = $projection->retry(); $this->connection->insert( $this->projectionTable, @@ -124,8 +132,13 @@ public function add(Projection $projection): void 'status' => $projection->status()->value, 'position' => $projection->position(), 'error_message' => $projectionError?->errorMessage, + 'error_previous_status' => $projectionError?->previousStatus?->value, 'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null, - 'retry' => $projection->retry(), + 'retry_attempt' => $projectionRetry?->attempt, + 'retry_next' => $projectionRetry?->nextRetry, + ], + [ + 'retry_next' => Types::DATETIME_IMMUTABLE, ], ); } @@ -133,6 +146,7 @@ public function add(Projection $projection): void public function update(Projection $projection): void { $projectionError = $projection->projectionError(); + $projectionRetry = $projection->retry(); $effectedRows = $this->connection->update( $this->projectionTable, @@ -142,12 +156,17 @@ public function update(Projection $projection): void 'status' => $projection->status()->value, 'position' => $projection->position(), 'error_message' => $projectionError?->errorMessage, + 'error_previous_status' => $projectionError?->previousStatus?->value, 'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null, - 'retry' => $projection->retry(), + 'retry_attempt' => $projectionRetry?->attempt, + 'retry_next' => $projectionRetry?->nextRetry, ], [ 'id' => $projection->id(), ], + [ + 'retry_next' => Types::DATETIME_IMMUTABLE, + ], ); if ($effectedRows === 0) { @@ -196,11 +215,15 @@ public function configureSchema(Schema $schema, Connection $connection): void $table->addColumn('error_message', Types::STRING) ->setLength(255) ->setNotnull(false); + $table->addColumn('error_previous_status', Types::STRING) + ->setLength(32) + ->setNotnull(false); $table->addColumn('error_context', Types::JSON) ->setNotnull(false); - $table->addColumn('retry', Types::INTEGER) - ->setNotnull(true) - ->setDefault(0); + $table->addColumn('retry_attempt', Types::INTEGER) + ->setNotnull(false); + $table->addColumn('retry_next', Types::DATETIME_IMMUTABLE) + ->setNotnull(false); $table->setPrimaryKey(['id']); $table->addIndex(['group_name']); @@ -221,9 +244,22 @@ private function createProjection(array $row): Projection $row['position'], $row['error_message'] !== null ? new ProjectionError( $row['error_message'], + $row['error_previous_status'] !== null ? ProjectionStatus::from($row['error_previous_status']) : ProjectionStatus::New, $context, ) : null, - $row['retry'], + $row['retry_attempt'] !== null ? new Retry( + $row['retry_attempt'], + self::normalizeDateTime($row['retry_next'], $this->connection->getDatabasePlatform()), + ) : null, ); } + + private static function normalizeDateTime(mixed $value, AbstractPlatform $platform): DateTimeImmutable + { + $normalizedValue = Type::getType(Types::DATETIMETZ_IMMUTABLE)->convertToPHPValue($value, $platform); + + assert($normalizedValue instanceof DateTimeImmutable); + + return $normalizedValue; + } } diff --git a/src/Projection/Projection/Store/ErrorContext.php b/src/Projection/Projection/ThrowableToErrorContextTransformer.php similarity index 79% rename from src/Projection/Projection/Store/ErrorContext.php rename to src/Projection/Projection/ThrowableToErrorContextTransformer.php index 7fa462978..c2b2fd84d 100644 --- a/src/Projection/Projection/Store/ErrorContext.php +++ b/src/Projection/Projection/ThrowableToErrorContextTransformer.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Patchlevel\EventSourcing\Projection\Projection\Store; +namespace Patchlevel\EventSourcing\Projection\Projection; use Throwable; @@ -15,13 +15,13 @@ use function sprintf; /** - * @psalm-type Trace = array{file?: string, line?: int, function?: string, class?: string, type?: string, args?: array} - * @psalm-type Context = array{class: class-string, message: string, code: int|string, file: string, line: int, trace: list} + * @psalm-import-type Context from ProjectionError + * @psalm-import-type Trace from ProjectionError */ -final class ErrorContext +final class ThrowableToErrorContextTransformer { /** @return list */ - public static function fromThrowable(Throwable $error): array + public static function transform(Throwable $error): array { $errors = []; diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index ffd454246..0098786de 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -12,11 +12,12 @@ use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadataFactory; use Patchlevel\EventSourcing\Projection\Projection\Projection; use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; -use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; use Patchlevel\EventSourcing\Projection\Projection\RunMode; use Patchlevel\EventSourcing\Projection\Projection\Store\LockableProjectionStore; use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore; +use Patchlevel\EventSourcing\Projection\RetryStrategy\DefaultRetryStrategy; +use Patchlevel\EventSourcing\Projection\RetryStrategy\RetryStrategy; use Patchlevel\EventSourcing\Store\Criteria; use Patchlevel\EventSourcing\Store\Store; use Psr\Log\LoggerInterface; @@ -25,12 +26,11 @@ use function array_map; use function array_merge; use function count; +use function in_array; use function sprintf; final class DefaultProjectionist implements Projectionist { - private const RETRY_LIMIT = 5; - /** @var array|null */ private array|null $projectorIndex = null; @@ -39,6 +39,7 @@ public function __construct( private readonly Store $streamableMessageStore, private readonly ProjectionStore $projectionStore, private readonly iterable $projectors, + private readonly RetryStrategy $retryStrategy = new DefaultRetryStrategy(), private readonly ProjectorMetadataFactory $metadataFactory = new AttributeProjectorMetadataFactory(), private readonly LoggerInterface|null $logger = null, ) { @@ -47,17 +48,16 @@ public function __construct( public function boot( ProjectionistCriteria|null $criteria = null, int|null $limit = null, - bool $throwByError = false, ): void { $criteria ??= new ProjectionistCriteria(); - $this->discoverNewProjections(); - $this->logger?->info( 'Projectionist: Start booting.', ); - $this->handleNewProjections($criteria, $throwByError); + $this->discoverNewProjections(); + $this->handleRetryProjections($criteria); + $this->handleNewProjections($criteria); $this->findForUpdate( new ProjectionCriteria( @@ -65,7 +65,7 @@ public function boot( groups: $criteria->groups, status: [ProjectionStatus::Booting], ), - function ($projections) use ($limit, $throwByError): void { + function ($projections) use ($limit): void { $projections = $this->fastForwardFromNowProjections($projections); if (count($projections) === 0) { @@ -117,7 +117,7 @@ function ($projections) use ($limit, $throwByError): void { continue; } - $this->handleMessage($index, $message, $projection, $throwByError); + $this->handleMessage($index, $message, $projection); } $messageCounter++; @@ -180,14 +180,12 @@ function ($projections) use ($limit, $throwByError): void { public function run( ProjectionistCriteria|null $criteria = null, int|null $limit = null, - bool $throwByError = false, ): void { $criteria ??= new ProjectionistCriteria(); - $this->discoverNewProjections(); - $this->logger?->info('Projectionist: Start processing.'); + $this->discoverNewProjections(); $this->handleOutdatedProjections($criteria); $this->handleRetryProjections($criteria); @@ -197,7 +195,7 @@ public function run( groups: $criteria->groups, status: [ProjectionStatus::Active, ProjectionStatus::Error], ), - function (array $projections) use ($limit, $throwByError): void { + function (array $projections) use ($limit): void { if (count($projections) === 0) { $this->logger?->info('Projectionist: No projections to process, finish processing.'); @@ -246,7 +244,7 @@ function (array $projections) use ($limit, $throwByError): void { continue; } - $this->handleMessage($index, $message, $projection, $throwByError); + $this->handleMessage($index, $message, $projection); } $messageCounter++; @@ -430,9 +428,10 @@ public function reactivate(ProjectionistCriteria|null $criteria = null): void new ProjectionCriteria( ids: $criteria->ids, groups: $criteria->groups, - status: [ProjectionStatus::Error], + status: [ProjectionStatus::Error, ProjectionStatus::Outdated, ProjectionStatus::Finished], ), function (array $projections): void { + /** @var Projection $projection */ foreach ($projections as $projection) { $projector = $this->projector($projection->id()); @@ -444,6 +443,23 @@ function (array $projections): void { continue; } + $error = $projection->projectionError(); + + if ($error) { + $projection->doRetry(); + $projection->resetRetry(); + + $this->projectionStore->update($projection); + + $this->logger?->info(sprintf( + 'Projectionist: Projector "%s" for "%s" is reactivated.', + $projector::class, + $projection->id(), + )); + + continue; + } + $projection->active(); $this->projectionStore->update($projection); @@ -472,7 +488,7 @@ public function projections(ProjectionistCriteria|null $criteria = null): array ); } - private function handleMessage(int $index, Message $message, Projection $projection, bool $throwByError): void + private function handleMessage(int $index, Message $message, Projection $projection): void { $projector = $this->projector($projection->id()); @@ -513,17 +529,7 @@ private function handleMessage(int $index, Message $message, Projection $project ), ); - $projection->error(ProjectionError::fromThrowable($e)); - $projection->incrementRetry(); - $this->projectionStore->update($projection); - - if ($throwByError) { - throw new ProjectionistError( - $projector::class, - $projection->id(), - $e, - ); - } + $this->handleError($projection, $e); return; } @@ -563,14 +569,10 @@ private function handleOutdatedProjections(ProjectionistCriteria $criteria): voi new ProjectionCriteria( ids: $criteria->ids, groups: $criteria->groups, - status: [ProjectionStatus::Active, ProjectionStatus::Error], + status: [ProjectionStatus::Active, ProjectionStatus::Finished], ), function (array $projections): void { foreach ($projections as $projection) { - if ($projection->isRetryDisallowed()) { - continue; - } - $projector = $this->projector($projection->id()); if ($projector) { @@ -600,20 +602,28 @@ private function handleRetryProjections(ProjectionistCriteria $criteria): void status: [ProjectionStatus::Error], ), function (array $projections): void { + /** @var Projection $projection */ foreach ($projections as $projection) { - if ($projection->retry() >= self::RETRY_LIMIT) { + $retry = $projection->retry(); + + if (!$retry) { continue; } - $projection->active(); + if (!$this->retryStrategy->shouldRetry($retry)) { + continue; + } + + $projection->doRetry(); + $this->projectionStore->update($projection); $this->logger?->info( sprintf( - 'Projectionist: Retry projection "%s" (%d/%d) and set to active.', + 'Projectionist: Retry projection "%s" (%d) and set back to %s.', $projection->id(), - $projection->retry(), - self::RETRY_LIMIT, + $retry->attempt, + $projection->status()->value, ), ); } @@ -666,7 +676,7 @@ private function fastForwardFromNowProjections(array $projections): array return $forwardedProjections; } - private function handleNewProjections(ProjectionistCriteria $criteria, bool $throwByError): void + private function handleNewProjections(ProjectionistCriteria $criteria): void { $this->findForUpdate( new ProjectionCriteria( @@ -674,7 +684,7 @@ private function handleNewProjections(ProjectionistCriteria $criteria, bool $thr groups: $criteria->groups, status: [ProjectionStatus::New], ), - function (array $projections) use ($throwByError): void { + function (array $projections): void { foreach ($projections as $projection) { $projector = $this->projector($projection->id()); @@ -716,17 +726,7 @@ function (array $projections) use ($throwByError): void { $e->getMessage(), )); - $projection->error(ProjectionError::fromThrowable($e)); - $projection->disallowRetry(); - $this->projectionStore->update($projection); - - if ($throwByError) { - throw new ProjectionistError( - $projector::class, - $projection->id(), - $e, - ); - } + $this->handleError($projection, $e); } } }, @@ -852,4 +852,23 @@ private function findForUpdate(ProjectionCriteria $criteria, Closure $closure): $closure($projections); }); } + + private function handleError(Projection $projection, Throwable $throwable): void + { + $retryable = in_array( + $projection->status(), + [ProjectionStatus::New, ProjectionStatus::Booting, ProjectionStatus::Active], + true, + ); + + $projection->error($throwable); + + if ($retryable) { + $projection->updateRetry( + $this->retryStrategy->nextAttempt($projection->retry()), + ); + } + + $this->projectionStore->update($projection); + } } diff --git a/src/Projection/Projectionist/Projectionist.php b/src/Projection/Projectionist/Projectionist.php index 63e0dc774..3fe85e5e9 100644 --- a/src/Projection/Projectionist/Projectionist.php +++ b/src/Projection/Projectionist/Projectionist.php @@ -9,17 +9,18 @@ interface Projectionist { /** + * @param positive-int|null $limit + * * @throws ProjectionistError * @throws ProjectorNotFound */ public function boot( ProjectionistCriteria|null $criteria = null, int|null $limit = null, - bool $throwByError = false, ): void; /** - * @param positive-int $limit + * @param positive-int|null $limit * * @throws ProjectionistError * @throws ProjectorNotFound @@ -27,7 +28,6 @@ public function boot( public function run( ProjectionistCriteria|null $criteria = null, int|null $limit = null, - bool $throwByError = false, ): void; public function teardown(ProjectionistCriteria|null $criteria = null): void; diff --git a/src/Projection/RetryStrategy/DefaultRetryStrategy.php b/src/Projection/RetryStrategy/DefaultRetryStrategy.php new file mode 100644 index 000000000..28fe62f4c --- /dev/null +++ b/src/Projection/RetryStrategy/DefaultRetryStrategy.php @@ -0,0 +1,82 @@ +calculateNextDate(1), + ); + } + + if ($retry->attempt() >= $this->maxAttempts) { + return null; + } + + return new Retry( + $retry->attempt() + 1, + $this->calculateNextDate($retry->attempt()), + ); + } + + public function shouldRetry(Retry|null $retry): bool + { + if ($retry === null) { + return false; + } + + $nextRetry = $retry->nextRetry(); + + if ($nextRetry === null) { + return false; + } + + return $nextRetry <= $this->clock->now(); + } + + private function calculateNextDate(int $attempt): DateTimeImmutable + { + $nextDate = $this->clock->now()->modify(sprintf('+%d seconds', $this->calculateDelay($attempt))); + + if ($nextDate === false) { + throw new RuntimeException('Could not calculate next date'); + } + + return $nextDate; + } + + private function calculateDelay(int $attempt): int + { + return (int)round($this->baseDelay * ($this->delayFactor ** $attempt)); + } +} diff --git a/src/Projection/RetryStrategy/NoRetryStrategy.php b/src/Projection/RetryStrategy/NoRetryStrategy.php new file mode 100644 index 000000000..0f617c166 --- /dev/null +++ b/src/Projection/RetryStrategy/NoRetryStrategy.php @@ -0,0 +1,18 @@ +attempt; + } + + public function nextRetry(): DateTimeImmutable|null + { + return $this->nextRetry; + } + + public function canRetry(): bool + { + return $this->nextRetry !== null; + } +} diff --git a/src/Projection/RetryStrategy/RetryStrategy.php b/src/Projection/RetryStrategy/RetryStrategy.php new file mode 100644 index 000000000..5f21f29d4 --- /dev/null +++ b/src/Projection/RetryStrategy/RetryStrategy.php @@ -0,0 +1,12 @@ +create(); - $projectionist->boot(new ProjectionistCriteria(), null, true); + $projectionist->boot(new ProjectionistCriteria()); $bankAccountId = AccountId::fromString('1'); $bankAccount = BankAccount::create($bankAccountId, 'John'); diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index b7aa74616..c78bac310 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -75,7 +75,7 @@ public function testSuccessful(): void ); $schemaDirector->create(); - $projectionist->boot(new ProjectionistCriteria(), null, true); + $projectionist->boot(new ProjectionistCriteria()); $profileId = ProfileId::fromString('1'); $profile = Profile::create($profileId, 'John'); @@ -139,7 +139,7 @@ public function testSnapshot(): void ); $schemaDirector->create(); - $projectionist->boot(new ProjectionistCriteria(), null, true); + $projectionist->boot(new ProjectionistCriteria()); $profileId = ProfileId::fromString('1'); $profile = Profile::create($profileId, 'John'); diff --git a/tests/Integration/Projectionist/Projection/ErrorProducerProjector.php b/tests/Integration/Projectionist/Projection/ErrorProducerProjector.php new file mode 100644 index 000000000..78fd766a0 --- /dev/null +++ b/tests/Integration/Projectionist/Projection/ErrorProducerProjector.php @@ -0,0 +1,44 @@ +setupError) { + throw new RuntimeException('setup error'); + } + } + + #[Teardown] + public function teardown(): void + { + if ($this->teardownError) { + throw new RuntimeException('teardown error'); + } + } + + #[Subscribe('*')] + public function subscribe(Message $message): void + { + if ($this->subscribeError) { + throw new RuntimeException('subscribe error'); + } + } +} diff --git a/tests/Integration/Projectionist/ProjectionistTest.php b/tests/Integration/Projectionist/ProjectionistTest.php index 9f9ced908..2e95c18cc 100644 --- a/tests/Integration/Projectionist/ProjectionistTest.php +++ b/tests/Integration/Projectionist/ProjectionistTest.php @@ -4,11 +4,18 @@ namespace Patchlevel\EventSourcing\Tests\Integration\Projectionist; +use DateTimeImmutable; use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\Clock\FrozenClock; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; +use Patchlevel\EventSourcing\Projection\Projection\Projection; +use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; +use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistCriteria; +use Patchlevel\EventSourcing\Projection\RetryStrategy\DefaultRetryStrategy; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -16,6 +23,7 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Tests\Integration\DbalManager; use Patchlevel\EventSourcing\Tests\Integration\Projectionist\Aggregate\Profile; +use Patchlevel\EventSourcing\Tests\Integration\Projectionist\Projection\ErrorProducerProjector; use Patchlevel\EventSourcing\Tests\Integration\Projectionist\Projection\ProfileProjector; use PHPUnit\Framework\TestCase; @@ -37,7 +45,7 @@ public function tearDown(): void $this->projectionConnection->close(); } - public function testRun(): void + public function testHappyPath(): void { $store = new DoctrineDbalStore( $this->connection, @@ -65,18 +73,49 @@ public function testRun(): void $schemaDirector->create(); - $profile = Profile::create(ProfileId::fromString('1'), 'John'); - $repository->save($profile); - $projectionist = new DefaultProjectionist( $store, $projectionStore, [new ProfileProjector($this->projectionConnection)], ); + self::assertEquals( + [new Projection('profile_1')], + $projectionist->projections(), + ); + $projectionist->boot(); + + self::assertEquals( + [ + new Projection( + 'profile_1', + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + ], + $projectionist->projections(), + ); + + $profile = Profile::create(ProfileId::fromString('1'), 'John'); + $repository->save($profile); + $projectionist->run(); + self::assertEquals( + [ + new Projection( + 'profile_1', + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), + ], + $projectionist->projections(), + ); + $result = $this->projectionConnection->fetchAssociative('SELECT * FROM projection_profile_1 WHERE id = ?', ['1']); self::assertIsArray($result); @@ -85,5 +124,163 @@ public function testRun(): void self::assertSame('John', $result['name']); $projectionist->remove(); + + self::assertEquals( + [ + new Projection( + 'profile_1', + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), + ], + $projectionist->projections(), + ); + + self::assertFalse( + $this->projectionConnection->createSchemaManager()->tableExists('projection_profile_1'), + ); + } + + public function testErrorHandling(): void + { + $store = new DoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + 'eventstore', + ); + + $projectionStore = new DoctrineStore($this->connection); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainSchemaConfigurator([ + $store, + $projectionStore, + ]), + ); + + $schemaDirector->create(); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + DefaultEventBus::create(), + ); + + $projector = new ErrorProducerProjector(); + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $projectionist = new DefaultProjectionist( + $store, + $projectionStore, + [$projector], + new DefaultRetryStrategy( + $clock, + DefaultRetryStrategy::DEFAULT_BASE_DELAY, + DefaultRetryStrategy::DEFAULT_DELAY_FACTOR, + 2, + ), + ); + + $projectionist->boot(); + + $projection = self::findProjection($projectionist->projections(), 'error_producer'); + + self::assertEquals(ProjectionStatus::Active, $projection->status()); + self::assertEquals(null, $projection->projectionError()); + self::assertEquals(null, $projection->retry()); + + $repository = $manager->get(Profile::class); + + $profile = Profile::create(ProfileId::fromString('1'), 'John'); + $repository->save($profile); + + $projector->subscribeError = true; + $projectionist->run(); + + $projection = self::findProjection($projectionist->projections(), 'error_producer'); + + self::assertEquals(ProjectionStatus::Error, $projection->status()); + self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); + self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); + self::assertEquals(1, $projection->retry()?->attempt); + self::assertEquals(new DateTimeImmutable('2021-01-01T00:00:10'), $projection->retry()?->nextRetry); + + $projectionist->run(); + + $projection = self::findProjection($projectionist->projections(), 'error_producer'); + + self::assertEquals(ProjectionStatus::Error, $projection->status()); + self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); + self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); + self::assertEquals(1, $projection->retry()?->attempt); + self::assertEquals(new DateTimeImmutable('2021-01-01T00:00:10'), $projection->retry()?->nextRetry); + + $clock->sleep(10); + + $projectionist->run(); + + $projection = self::findProjection($projectionist->projections(), 'error_producer'); + + self::assertEquals(ProjectionStatus::Error, $projection->status()); + self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); + self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); + self::assertEquals(2, $projection->retry()?->attempt); + self::assertEquals(new DateTimeImmutable('2021-01-01T00:00:20'), $projection->retry()?->nextRetry); + + $clock->sleep(20); + + $projectionist->run(); + + $projection = self::findProjection($projectionist->projections(), 'error_producer'); + + self::assertEquals(ProjectionStatus::Error, $projection->status()); + self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); + self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); + self::assertEquals(null, $projection->retry()); + + $projectionist->reactivate(new ProjectionistCriteria( + ids: ['error_producer'], + )); + + $projection = self::findProjection($projectionist->projections(), 'error_producer'); + + self::assertEquals(ProjectionStatus::Active, $projection->status()); + self::assertEquals(null, $projection->projectionError()); + self::assertEquals(null, $projection->retry()); + + $projectionist->run(); + + $projection = self::findProjection($projectionist->projections(), 'error_producer'); + + self::assertEquals(ProjectionStatus::Error, $projection->status()); + self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); + self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); + self::assertEquals(1, $projection->retry()?->attempt); + self::assertEquals(new DateTimeImmutable('2021-01-01T00:00:40'), $projection->retry()?->nextRetry); + + $clock->sleep(10); + $projector->subscribeError = false; + + $projectionist->run(); + + $projection = self::findProjection($projectionist->projections(), 'error_producer'); + + self::assertEquals(ProjectionStatus::Active, $projection->status()); + self::assertEquals(null, $projection->projectionError()); + self::assertEquals(null, $projection->retry()); + } + + /** @param list $projections */ + private static function findProjection(array $projections, string $id): Projection + { + foreach ($projections as $projection) { + if ($projection->id() === $id) { + return $projection; + } + } + + self::fail('projection not found'); } } diff --git a/tests/Unit/Projection/Projection/ErrorContextTest.php b/tests/Unit/Projection/Projection/ErrorContextTest.php index b81d58bec..eb6329ca8 100644 --- a/tests/Unit/Projection/Projection/ErrorContextTest.php +++ b/tests/Unit/Projection/Projection/ErrorContextTest.php @@ -5,7 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Projection\Projection; use Patchlevel\EventSourcing\Aggregate\CustomId; -use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext; +use Patchlevel\EventSourcing\Projection\Projection\ThrowableToErrorContextTransformer; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -18,7 +18,7 @@ final class ErrorContextTest extends TestCase public function testErrorContext(): void { $resource = fopen('php://memory', 'r'); - $result = ErrorContext::fromThrowable( + $result = ThrowableToErrorContextTransformer::transform( $this->createException( 'test', new CustomId('test'), diff --git a/tests/Unit/Projection/Projection/ProjectionErrorTest.php b/tests/Unit/Projection/Projection/ProjectionErrorTest.php index 24b615924..633e0b204 100644 --- a/tests/Unit/Projection/Projection/ProjectionErrorTest.php +++ b/tests/Unit/Projection/Projection/ProjectionErrorTest.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Projection\Projection; use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; +use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -13,9 +14,13 @@ final class ProjectionErrorTest extends TestCase { public function testCreate(): void { - $error = ProjectionError::fromThrowable(new RuntimeException('foo bar')); + $error = ProjectionError::fromThrowable( + ProjectionStatus::Active, + new RuntimeException('foo bar'), + ); self::assertSame('foo bar', $error->errorMessage); + self::assertSame(ProjectionStatus::Active, $error->previousStatus); self::assertIsArray($error->errorContext); } } diff --git a/tests/Unit/Projection/Projection/ProjectionTest.php b/tests/Unit/Projection/Projection/ProjectionTest.php index 1017e6132..c8be5f2c5 100644 --- a/tests/Unit/Projection/Projection/ProjectionTest.php +++ b/tests/Unit/Projection/Projection/ProjectionTest.php @@ -7,7 +7,8 @@ use Patchlevel\EventSourcing\Projection\Projection\Projection; use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; -use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext; +use Patchlevel\EventSourcing\Projection\Projection\ThrowableToErrorContextTransformer; +use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -69,7 +70,7 @@ public function testError(): void $exception = new RuntimeException('test'); - $projection->error(ProjectionError::fromThrowable($exception)); + $projection->error($exception); self::assertEquals(ProjectionStatus::Error, $projection->status()); self::assertFalse($projection->isNew()); @@ -80,7 +81,8 @@ public function testError(): void self::assertEquals( new ProjectionError( 'test', - ErrorContext::fromThrowable($exception), + ProjectionStatus::New, + ThrowableToErrorContextTransformer::transform($exception), ), $projection->projectionError(), ); @@ -119,18 +121,16 @@ public function testRetry(): void 'test', ); - self::assertEquals(0, $projection->retry()); - self::assertFalse($projection->isRetryDisallowed()); + self::assertEquals(null, $projection->retry()); - $projection->incrementRetry(); - self::assertEquals(1, $projection->retry()); + $retry = new Retry(1, null); - $projection->disallowRetry(); - self::assertEquals(-1, $projection->retry()); - self::assertTrue($projection->isRetryDisallowed()); + $projection->updateRetry($retry); + + self::assertEquals($retry, $projection->retry()); $projection->resetRetry(); - self::assertEquals(0, $projection->retry()); - self::assertFalse($projection->isRetryDisallowed()); + + self::assertEquals(null, $projection->retry()); } } diff --git a/tests/Unit/Projection/Projection/Store/ErrorContextTest.php b/tests/Unit/Projection/Projection/Store/ErrorContextTest.php index 2006e8962..fe3d29692 100644 --- a/tests/Unit/Projection/Projection/Store/ErrorContextTest.php +++ b/tests/Unit/Projection/Projection/Store/ErrorContextTest.php @@ -4,18 +4,18 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Projection\Projection\Store; -use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext; +use Patchlevel\EventSourcing\Projection\Projection\ThrowableToErrorContextTransformer; use PHPUnit\Framework\TestCase; use RuntimeException; -/** @covers \Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext */ +/** @covers \Patchlevel\EventSourcing\Projection\Projection\ThrowableToErrorContextTransformer */ final class ErrorContextTest extends TestCase { public function testWithoutPrevious(): void { $error = new RuntimeException('foo'); - $result = ErrorContext::fromThrowable($error); + $result = ThrowableToErrorContextTransformer::transform($error); self::assertCount(1, $result); self::assertSame('foo', $result[0]['message']); @@ -27,7 +27,7 @@ public function testWithPrevious(): void { $error = new RuntimeException('foo', 0, new RuntimeException('bar')); - $result = ErrorContext::fromThrowable($error); + $result = ThrowableToErrorContextTransformer::transform($error); self::assertCount(2, $result); self::assertSame('foo', $result[0]['message']); diff --git a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php index baa663ad3..1d382559b 100644 --- a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php +++ b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Projection\Projectionist; use Closure; +use DateTimeImmutable; use Generator; use Patchlevel\EventSourcing\Attribute\Projector as ProjectionAttribute; use Patchlevel\EventSourcing\Attribute\Setup; @@ -16,11 +17,13 @@ use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; use Patchlevel\EventSourcing\Projection\Projection\RunMode; -use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext; use Patchlevel\EventSourcing\Projection\Projection\Store\LockableProjectionStore; use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore; +use Patchlevel\EventSourcing\Projection\Projection\ThrowableToErrorContextTransformer; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistCriteria; +use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry; +use Patchlevel\EventSourcing\Projection\RetryStrategy\RetryStrategy; use Patchlevel\EventSourcing\Store\ArrayStream; use Patchlevel\EventSourcing\Store\Criteria; use Patchlevel\EventSourcing\Store\Store; @@ -394,10 +397,15 @@ public function create(): void $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->shouldNotBeCalled(); + $retry = new Retry(1, new DateTimeImmutable()); + $retryStrategy = $this->prophesize(RetryStrategy::class); + $retryStrategy->nextAttempt(null)->willReturn($retry)->shouldBeCalledOnce(); + $projectionist = new DefaultProjectionist( $streamableStore->reveal(), $projectionStore, [$projector], + $retryStrategy->reveal(), ); $projectionist->boot(); @@ -410,8 +418,12 @@ public function create(): void RunMode::FromBeginning, ProjectionStatus::Error, 0, - new ProjectionError('ERROR', ErrorContext::fromThrowable($projector->exception)), - -1, + new ProjectionError( + 'ERROR', + ProjectionStatus::New, + ThrowableToErrorContextTransformer::transform($projector->exception), + ), + $retry, ), ], $projectionStore->updatedProjections, @@ -818,10 +830,15 @@ public function handle(Message $message): void $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + $retry = new Retry(1, new DateTimeImmutable()); + $retryStrategy = $this->prophesize(RetryStrategy::class); + $retryStrategy->nextAttempt(null)->willReturn($retry)->shouldBeCalledOnce(); + $projectionist = new DefaultProjectionist( $streamableStore->reveal(), $projectionStore, [$projector], + $retryStrategy->reveal(), ); $projectionist->run(); @@ -834,8 +851,12 @@ public function handle(Message $message): void RunMode::FromBeginning, ProjectionStatus::Error, 0, - new ProjectionError('ERROR', ErrorContext::fromThrowable($projector->exception)), - 1, + new ProjectionError( + 'ERROR', + ProjectionStatus::Active, + ThrowableToErrorContextTransformer::transform($projector->exception), + ), + $retry, ), ], $projectionStore->updatedProjections, @@ -1308,7 +1329,7 @@ class { ], $projectionStore->addedProjections); } - public function testReactivate(): void + public function testReactivateError(): void { $projectionId = 'test'; $projector = new #[ProjectionAttribute('test')] @@ -1321,6 +1342,8 @@ class { Projection::DEFAULT_GROUP, RunMode::FromBeginning, ProjectionStatus::Error, + 0, + new ProjectionError('ERROR', ProjectionStatus::New), ), ]); @@ -1339,12 +1362,84 @@ class { $projectionId, Projection::DEFAULT_GROUP, RunMode::FromBeginning, - ProjectionStatus::Active, + ProjectionStatus::New, 0, ), ], $projectionStore->updatedProjections); } + public function testReactivateOutdated(): void + { + $projectionId = 'test'; + $projector = new #[ProjectionAttribute('test')] + class { + }; + + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ), + ]); + + $streamableStore = $this->prophesize(Store::class); + + $projectionist = new DefaultProjectionist( + $streamableStore->reveal(), + $projectionStore, + [$projector], + ); + + $projectionist->reactivate(); + + self::assertEquals([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + ], $projectionStore->updatedProjections); + } + + public function testReactivateFinished(): void + { + $projectionId = 'test'; + $projector = new #[ProjectionAttribute('test')] + class { + }; + + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Finished, + ), + ]); + + $streamableStore = $this->prophesize(Store::class); + + $projectionist = new DefaultProjectionist( + $streamableStore->reveal(), + $projectionStore, + [$projector], + ); + + $projectionist->reactivate(); + + self::assertEquals([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + ], $projectionStore->updatedProjections); + } + public function testGetProjectionAndDiscoverNewProjectors(): void { $projectionId = 'test'; diff --git a/tests/Unit/Projection/RetryStrategy/DefaultRetryStrategyTest.php b/tests/Unit/Projection/RetryStrategy/DefaultRetryStrategyTest.php new file mode 100644 index 000000000..fb6564166 --- /dev/null +++ b/tests/Unit/Projection/RetryStrategy/DefaultRetryStrategyTest.php @@ -0,0 +1,87 @@ +clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00+00:00')); + $this->strategy = new DefaultRetryStrategy($this->clock); + } + + public function testShouldRetryWithNull(): void + { + self::assertFalse($this->strategy->shouldRetry(null)); + } + + public function testShouldRetryWithoutTime(): void + { + self::assertFalse($this->strategy->shouldRetry(new Retry(1, null))); + } + + public function testShouldRetryWithTime(): void + { + self::assertFalse($this->strategy->shouldRetry(new Retry(1, new DateTimeImmutable()))); + } + + public function testNextAttemptWithNull(): void + { + $expected = new Retry(1, new DateTimeImmutable('2021-01-01T00:00:10+00:00')); + + self::assertEquals($expected, $this->strategy->nextAttempt(null)); + } + + #[DataProvider('attemptProvider')] + public function testNextAttempt(int $attempt, string $dateString): void + { + $expected = new Retry($attempt, new DateTimeImmutable($dateString)); + + self::assertEquals( + $expected, + $this->strategy->nextAttempt( + new Retry( + $attempt - 1, + null, + ), + ), + ); + } + + public static function attemptProvider(): Generator + { + yield 'first attempt' => [1, '2021-01-01T00:00:5+00:00']; + yield 'second attempt' => [2, '2021-01-01T00:00:10+00:00']; + yield 'third attempt' => [3, '2021-01-01T00:00:20+00:00']; + yield 'fourth attempt' => [4, '2021-01-01T00:00:40+00:00']; + yield 'fifth attempt' => [5, '2021-01-01T00:01:20+00:00']; + } + + public function testMaxAttempt(): void + { + self::assertNull( + $this->strategy->nextAttempt( + new Retry( + 6, + null, + ), + ), + ); + } +} diff --git a/tests/Unit/Projection/RetryStrategy/NoRetryStrategyTest.php b/tests/Unit/Projection/RetryStrategy/NoRetryStrategyTest.php new file mode 100644 index 000000000..c669cbdfa --- /dev/null +++ b/tests/Unit/Projection/RetryStrategy/NoRetryStrategyTest.php @@ -0,0 +1,54 @@ +shouldRetry(null)); + } + + public function testShouldRetryWithoutTime(): void + { + $strategy = new NoRetryStrategy(); + self::assertFalse($strategy->shouldRetry(new Retry(1, null))); + } + + public function testShouldRetryWithTime(): void + { + $strategy = new NoRetryStrategy(); + self::assertFalse($strategy->shouldRetry(new Retry(1, new DateTimeImmutable()))); + } + + public function testNextAttemptWithNull(): void + { + $strategy = new NoRetryStrategy(); + self::assertNull($strategy->nextAttempt(null)); + } + + public function testNextAttemptWithoutTime(): void + { + $strategy = new NoRetryStrategy(); + self::assertNull($strategy->nextAttempt( + new Retry(1, null), + )); + } + + public function testNextAttemptWithTime(): void + { + $strategy = new NoRetryStrategy(); + self::assertNull($strategy->nextAttempt( + new Retry(1, new DateTimeImmutable()), + )); + } +} From 1bfe2c2a9bfdedb3f94a006616d5261b19094a6b Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 21 Feb 2024 13:06:31 +0100 Subject: [PATCH 2/5] add timezone --- src/Projection/Projection/Store/DoctrineStore.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Projection/Projection/Store/DoctrineStore.php b/src/Projection/Projection/Store/DoctrineStore.php index d303b88fa..a20538133 100644 --- a/src/Projection/Projection/Store/DoctrineStore.php +++ b/src/Projection/Projection/Store/DoctrineStore.php @@ -222,7 +222,7 @@ public function configureSchema(Schema $schema, Connection $connection): void ->setNotnull(false); $table->addColumn('retry_attempt', Types::INTEGER) ->setNotnull(false); - $table->addColumn('retry_next', Types::DATETIME_IMMUTABLE) + $table->addColumn('retry_next', Types::DATETIMETZ_IMMUTABLE) ->setNotnull(false); $table->setPrimaryKey(['id']); From 56091825a52a436e4901151348a3fa768c8ff9b3 Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 21 Feb 2024 13:49:02 +0100 Subject: [PATCH 3/5] add more tests & update docs --- baseline.xml | 5 + docs/pages/projection.md | 49 +++++++-- .../Projectionist/DefaultProjectionist.php | 2 +- .../DefaultProjectionistTest.php | 103 ++++++++++++++++++ 4 files changed, 151 insertions(+), 8 deletions(-) diff --git a/baseline.xml b/baseline.xml index 18f6289ae..10aae5a50 100644 --- a/baseline.xml +++ b/baseline.xml @@ -240,4 +240,9 @@ + + + + + diff --git a/docs/pages/projection.md b/docs/pages/projection.md index 88f8e4a88..56023afe2 100644 --- a/docs/pages/projection.md +++ b/docs/pages/projection.md @@ -334,6 +334,7 @@ stateDiagram-v2 Error --> New Error --> Booting Error --> Active + Error --> [*] Outdated --> Active Outdated --> [*] ``` @@ -360,10 +361,11 @@ These projections have a projector, follow the event stream and should be up-to- A projection is finished if the projector has the mode `RunMode::Once`. This means that the projection is only run once and then set to finished if it reaches the end of the event stream. +You can also reactivate the projection if you want so that it continues. ### Outdated -If a projection exists in the projection store +If an active or finished projection exists in the projection store that does not have a projector in the source code with a corresponding projector ID, then this projection is marked as outdated. This happens when either the projector has been deleted @@ -381,10 +383,16 @@ There are two options to reactivate the projection: ### Error If an error occurs in a projector, then the target projection is set to Error. -This projection will then no longer run until the projection is activated again. +This can happen in the create process, in the boot process or in the run process. +This projection will then no longer boot/run until the projection is reactivate or retried. + +The projectionist has a retry strategy to retry projections that have failed. +It tries to reactivate the projection after a certain time and a certain number of attempts. +If this does not work, the projection is set to error and must be manually reactivated. + There are two options here: -* Reactivate the projection, so that the projection is active again. +* Reactivate the projection, so that the projection is in the previous state again. * Remove the projection and rebuild it from scratch. ## Setup @@ -424,11 +432,34 @@ $schemaDirector = new DoctrineSchemaDirector( You can find more about schema configurator [here](./store.md) +### Retry Strategy + +The projectionist uses a retry strategy to retry projections that have failed. +Our default strategy can be configured with the following parameters: + +* `baseDelay` - The base delay in seconds. +* `delayFactor` - The factor by which the delay is multiplied after each attempt. +* `maxAttempts` - The maximum number of attempts. + +```php +use Patchlevel\EventSourcing\Projection\RetryStrategy\DefaultRetryStrategy; + +$retryStrategy = new DefaultRetryStrategy( + baseDelay: 5, + delayFactor: 2, + maxAttempts: 5, +); +``` + +!!! tip + + You can reactivate the projection manually or remove it and rebuild it from scratch. + ### Projectionist Now we can create the projectionist and plug together the necessary services. The event store is needed to load the events, the Projection Store to store the projection state -and the respective projectors. +and the respective projectors. Optionally, we can also pass a retry strategy. ```php use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; @@ -436,7 +467,8 @@ use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; $projectionist = new DefaultProjectionist( $eventStore, $projectionStore, - [$projector1, $projector2, $projector3] + [$projector1, $projector2, $projector3], + $retryStrategy, ); ``` @@ -516,6 +548,9 @@ foreach ($projections as $projection) { } ``` -!!! note +## Learn more - There are also [cli commands](./cli.md) for all commands. +* [How to use CLI commands](./cli.md) +* [How to use Pipeline](./pipeline.md) +* [How to use Event Bus](./event_bus.md) +* [How to Test](./testing.md) \ No newline at end of file diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index 0098786de..75ec4f2e8 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -193,7 +193,7 @@ public function run( new ProjectionCriteria( ids: $criteria->ids, groups: $criteria->groups, - status: [ProjectionStatus::Active, ProjectionStatus::Error], + status: [ProjectionStatus::Active], ), function (array $projections) use ($limit): void { if (count($projections) === 0) { diff --git a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php index 1d382559b..67c0f6490 100644 --- a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php +++ b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php @@ -1477,6 +1477,109 @@ class { ], $projections); } + public function testRetry(): void + { + $projectionId = 'test'; + $projector = new #[ProjectionAttribute('test')] + class { + #[Subscribe(ProfileVisited::class)] + public function subscribe(): void + { + throw new RuntimeException('ERROR2'); + } + }; + + $retry1 = new Retry(1, new DateTimeImmutable()); + $retry2 = new Retry(2, new DateTimeImmutable()); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Error, + 0, + new ProjectionError('ERROR', ProjectionStatus::Active), + $retry1, + ), + ]); + + $retryStrategy = $this->prophesize(RetryStrategy::class); + $retryStrategy->shouldRetry($retry1)->willReturn(true); + $retryStrategy->nextAttempt($retry1)->willReturn($retry2); + + $projectionist = new DefaultProjectionist( + $streamableStore->reveal(), + $projectionStore, + [$projector], + $retryStrategy->reveal(), + ); + + $projectionist->run(); + + self::assertCount(2, $projectionStore->updatedProjections); + + [$update1, $update2] = $projectionStore->updatedProjections; + + self::assertEquals($update1, new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 0, + null, + $retry1, + )); + + self::assertEquals(ProjectionStatus::Error, $update2->status()); + self::assertEquals(ProjectionStatus::Active, $update2->projectionError()?->previousStatus); + self::assertEquals('ERROR2', $update2->projectionError()?->errorMessage); + self::assertEquals($retry2, $update2->retry()); + } + + public function testShouldNotRetry(): void + { + $projectionId = 'test'; + $projector = new #[ProjectionAttribute('test')] + class { + }; + + $retry = new Retry(1, new DateTimeImmutable()); + + $streamableStore = $this->prophesize(Store::class); + + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Error, + 0, + new ProjectionError('ERROR', ProjectionStatus::Active), + $retry, + ), + ]); + + $retryStrategy = $this->prophesize(RetryStrategy::class); + $retryStrategy->shouldRetry($retry)->willReturn(false); + + $projectionist = new DefaultProjectionist( + $streamableStore->reveal(), + $projectionStore, + [$projector], + $retryStrategy->reveal(), + ); + + $projectionist->run(); + + self::assertEquals([], $projectionStore->updatedProjections); + } + #[DataProvider('methodProvider')] public function testCriteria(string $method): void { From c582ede003cffdac2f6db1535887df4ab58929c1 Mon Sep 17 00:00:00 2001 From: David Badura Date: Thu, 22 Feb 2024 16:53:19 +0100 Subject: [PATCH 4/5] improve api --- docs/pages/projection.md | 4 +- src/Projection/Projection/NoErrorToRetry.php | 15 ++++ src/Projection/Projection/Projection.php | 29 ++++--- .../Projection/Store/DoctrineStore.php | 38 ++++---- .../Projectionist/DefaultProjectionist.php | 36 ++++---- ...rategy.php => ClockBasedRetryStrategy.php} | 41 +++------ .../RetryStrategy/NoRetryStrategy.php | 9 +- src/Projection/RetryStrategy/Retry.php | 31 ------- .../RetryStrategy/RetryStrategy.php | 6 +- .../RetryStrategy/UnexpectedError.php | 11 +++ .../Projectionist/ProjectionistTest.php | 54 +++++++----- .../Projection/Projection/ProjectionTest.php | 29 +++++-- .../DefaultProjectionistTest.php | 70 +++++---------- .../ClockBasedRetryStrategyTest.php | 69 +++++++++++++++ .../DefaultRetryStrategyTest.php | 87 ------------------- .../RetryStrategy/NoRetryStrategyTest.php | 40 +-------- 16 files changed, 249 insertions(+), 320 deletions(-) create mode 100644 src/Projection/Projection/NoErrorToRetry.php rename src/Projection/RetryStrategy/{DefaultRetryStrategy.php => ClockBasedRetryStrategy.php} (53%) delete mode 100644 src/Projection/RetryStrategy/Retry.php create mode 100644 src/Projection/RetryStrategy/UnexpectedError.php create mode 100644 tests/Unit/Projection/RetryStrategy/ClockBasedRetryStrategyTest.php delete mode 100644 tests/Unit/Projection/RetryStrategy/DefaultRetryStrategyTest.php diff --git a/docs/pages/projection.md b/docs/pages/projection.md index 56023afe2..10adde9b9 100644 --- a/docs/pages/projection.md +++ b/docs/pages/projection.md @@ -442,9 +442,9 @@ Our default strategy can be configured with the following parameters: * `maxAttempts` - The maximum number of attempts. ```php -use Patchlevel\EventSourcing\Projection\RetryStrategy\DefaultRetryStrategy; +use Patchlevel\EventSourcing\Projection\RetryStrategy\ClockBasedRetryStrategy; -$retryStrategy = new DefaultRetryStrategy( +$retryStrategy = new ClockBasedRetryStrategy( baseDelay: 5, delayFactor: 2, maxAttempts: 5, diff --git a/src/Projection/Projection/NoErrorToRetry.php b/src/Projection/Projection/NoErrorToRetry.php new file mode 100644 index 000000000..2ff10b192 --- /dev/null +++ b/src/Projection/Projection/NoErrorToRetry.php @@ -0,0 +1,15 @@ +status === ProjectionStatus::Error; } - public function updateRetry(Retry|null $retry): void + public function retryAttempt(): int { - $this->retry = $retry; - } - - public function retry(): Retry|null - { - return $this->retry; + return $this->retryAttempt; } public function doRetry(): void { if ($this->error === null) { - return; + throw new NoErrorToRetry(); } + $this->retryAttempt++; $this->status = $this->error->previousStatus; $this->error = null; } public function resetRetry(): void { - $this->retry = null; + $this->retryAttempt = 0; + } + + public function lastSavedAt(): DateTimeImmutable|null + { + return $this->lastSavedAt; + } + + public function updateLastSavedAt(DateTimeImmutable $lastSavedAt): void + { + $this->lastSavedAt = $lastSavedAt; } } diff --git a/src/Projection/Projection/Store/DoctrineStore.php b/src/Projection/Projection/Store/DoctrineStore.php index a20538133..7e437c474 100644 --- a/src/Projection/Projection/Store/DoctrineStore.php +++ b/src/Projection/Projection/Store/DoctrineStore.php @@ -14,14 +14,15 @@ use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; +use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\Projection\Projection\Projection; use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound; use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; use Patchlevel\EventSourcing\Projection\Projection\RunMode; -use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry; use Patchlevel\EventSourcing\Schema\SchemaConfigurator; +use Psr\Clock\ClockInterface; use function array_map; use function assert; @@ -39,14 +40,15 @@ * error_message: string|null, * error_previous_status: string|null, * error_context: string|null, - * retry_attempt: int|null, - * retry_next: string|null, + * retry_attempt: int, + * last_saved_at: string, * } */ final class DoctrineStore implements LockableProjectionStore, SchemaConfigurator { public function __construct( private readonly Connection $connection, + private readonly ClockInterface $clock = new SystemClock(), private readonly string $projectionTable = 'projections', ) { } @@ -121,7 +123,8 @@ public function find(ProjectionCriteria|null $criteria = null): array public function add(Projection $projection): void { $projectionError = $projection->projectionError(); - $projectionRetry = $projection->retry(); + + $projection->updateLastSavedAt($this->clock->now()); $this->connection->insert( $this->projectionTable, @@ -134,11 +137,11 @@ public function add(Projection $projection): void 'error_message' => $projectionError?->errorMessage, 'error_previous_status' => $projectionError?->previousStatus?->value, 'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null, - 'retry_attempt' => $projectionRetry?->attempt, - 'retry_next' => $projectionRetry?->nextRetry, + 'retry_attempt' => $projection->retryAttempt(), + 'last_saved_at' => $projection->lastSavedAt(), ], [ - 'retry_next' => Types::DATETIME_IMMUTABLE, + 'last_saved_at' => Types::DATETIME_IMMUTABLE, ], ); } @@ -146,7 +149,8 @@ public function add(Projection $projection): void public function update(Projection $projection): void { $projectionError = $projection->projectionError(); - $projectionRetry = $projection->retry(); + + $projection->updateLastSavedAt($this->clock->now()); $effectedRows = $this->connection->update( $this->projectionTable, @@ -158,14 +162,14 @@ public function update(Projection $projection): void 'error_message' => $projectionError?->errorMessage, 'error_previous_status' => $projectionError?->previousStatus?->value, 'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null, - 'retry_attempt' => $projectionRetry?->attempt, - 'retry_next' => $projectionRetry?->nextRetry, + 'retry_attempt' => $projection->retryAttempt(), + 'last_saved_at' => $projection->lastSavedAt(), ], [ 'id' => $projection->id(), ], [ - 'retry_next' => Types::DATETIME_IMMUTABLE, + 'last_saved_at' => Types::DATETIME_IMMUTABLE, ], ); @@ -221,9 +225,9 @@ public function configureSchema(Schema $schema, Connection $connection): void $table->addColumn('error_context', Types::JSON) ->setNotnull(false); $table->addColumn('retry_attempt', Types::INTEGER) - ->setNotnull(false); - $table->addColumn('retry_next', Types::DATETIMETZ_IMMUTABLE) - ->setNotnull(false); + ->setNotnull(true); + $table->addColumn('last_saved_at', Types::DATETIMETZ_IMMUTABLE) + ->setNotnull(true); $table->setPrimaryKey(['id']); $table->addIndex(['group_name']); @@ -247,10 +251,8 @@ private function createProjection(array $row): Projection $row['error_previous_status'] !== null ? ProjectionStatus::from($row['error_previous_status']) : ProjectionStatus::New, $context, ) : null, - $row['retry_attempt'] !== null ? new Retry( - $row['retry_attempt'], - self::normalizeDateTime($row['retry_next'], $this->connection->getDatabasePlatform()), - ) : null, + $row['retry_attempt'], + self::normalizeDateTime($row['last_saved_at'], $this->connection->getDatabasePlatform()), ); } diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index 75ec4f2e8..c12c412e5 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -16,7 +16,7 @@ use Patchlevel\EventSourcing\Projection\Projection\RunMode; use Patchlevel\EventSourcing\Projection\Projection\Store\LockableProjectionStore; use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore; -use Patchlevel\EventSourcing\Projection\RetryStrategy\DefaultRetryStrategy; +use Patchlevel\EventSourcing\Projection\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Projection\RetryStrategy\RetryStrategy; use Patchlevel\EventSourcing\Store\Criteria; use Patchlevel\EventSourcing\Store\Store; @@ -39,7 +39,7 @@ public function __construct( private readonly Store $streamableMessageStore, private readonly ProjectionStore $projectionStore, private readonly iterable $projectors, - private readonly RetryStrategy $retryStrategy = new DefaultRetryStrategy(), + private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(), private readonly ProjectorMetadataFactory $metadataFactory = new AttributeProjectorMetadataFactory(), private readonly LoggerInterface|null $logger = null, ) { @@ -604,25 +604,34 @@ private function handleRetryProjections(ProjectionistCriteria $criteria): void function (array $projections): void { /** @var Projection $projection */ foreach ($projections as $projection) { - $retry = $projection->retry(); + $error = $projection->projectionError(); - if (!$retry) { + if ($error === null) { continue; } - if (!$this->retryStrategy->shouldRetry($retry)) { + $retryable = in_array( + $error->previousStatus, + [ProjectionStatus::New, ProjectionStatus::Booting, ProjectionStatus::Active], + true, + ); + + if (!$retryable) { continue; } - $projection->doRetry(); + if (!$this->retryStrategy->shouldRetry($projection)) { + continue; + } + $projection->doRetry(); $this->projectionStore->update($projection); $this->logger?->info( sprintf( 'Projectionist: Retry projection "%s" (%d) and set back to %s.', $projection->id(), - $retry->attempt, + $projection->retryAttempt(), $projection->status()->value, ), ); @@ -855,20 +864,7 @@ private function findForUpdate(ProjectionCriteria $criteria, Closure $closure): private function handleError(Projection $projection, Throwable $throwable): void { - $retryable = in_array( - $projection->status(), - [ProjectionStatus::New, ProjectionStatus::Booting, ProjectionStatus::Active], - true, - ); - $projection->error($throwable); - - if ($retryable) { - $projection->updateRetry( - $this->retryStrategy->nextAttempt($projection->retry()), - ); - } - $this->projectionStore->update($projection); } } diff --git a/src/Projection/RetryStrategy/DefaultRetryStrategy.php b/src/Projection/RetryStrategy/ClockBasedRetryStrategy.php similarity index 53% rename from src/Projection/RetryStrategy/DefaultRetryStrategy.php rename to src/Projection/RetryStrategy/ClockBasedRetryStrategy.php index 28fe62f4c..cd4ea1ac1 100644 --- a/src/Projection/RetryStrategy/DefaultRetryStrategy.php +++ b/src/Projection/RetryStrategy/ClockBasedRetryStrategy.php @@ -6,13 +6,13 @@ use DateTimeImmutable; use Patchlevel\EventSourcing\Clock\SystemClock; +use Patchlevel\EventSourcing\Projection\Projection\Projection; use Psr\Clock\ClockInterface; -use RuntimeException; use function round; use function sprintf; -final class DefaultRetryStrategy implements RetryStrategy +final class ClockBasedRetryStrategy implements RetryStrategy { public const DEFAULT_BASE_DELAY = 5; public const DEFAULT_DELAY_FACTOR = 2; @@ -30,46 +30,29 @@ public function __construct( ) { } - public function nextAttempt(Retry|null $retry): Retry|null + public function shouldRetry(Projection $projection): bool { - if ($retry === null) { - return new Retry( - 1, - $this->calculateNextDate(1), - ); - } - - if ($retry->attempt() >= $this->maxAttempts) { - return null; - } - - return new Retry( - $retry->attempt() + 1, - $this->calculateNextDate($retry->attempt()), - ); - } - - public function shouldRetry(Retry|null $retry): bool - { - if ($retry === null) { + if ($projection->retryAttempt() >= $this->maxAttempts) { return false; } - $nextRetry = $retry->nextRetry(); + $lastSavedAt = $projection->lastSavedAt(); - if ($nextRetry === null) { + if ($lastSavedAt === null) { return false; } - return $nextRetry <= $this->clock->now(); + $nextRetryDate = $this->calculateNextRetryDate($lastSavedAt, $projection->retryAttempt()); + + return $nextRetryDate <= $this->clock->now(); } - private function calculateNextDate(int $attempt): DateTimeImmutable + private function calculateNextRetryDate(DateTimeImmutable $lastDate, int $attempt): DateTimeImmutable { - $nextDate = $this->clock->now()->modify(sprintf('+%d seconds', $this->calculateDelay($attempt))); + $nextDate = $lastDate->modify(sprintf('+%d seconds', $this->calculateDelay($attempt))); if ($nextDate === false) { - throw new RuntimeException('Could not calculate next date'); + throw new UnexpectedError('Could not calculate next date'); } return $nextDate; diff --git a/src/Projection/RetryStrategy/NoRetryStrategy.php b/src/Projection/RetryStrategy/NoRetryStrategy.php index 0f617c166..a9baea3ac 100644 --- a/src/Projection/RetryStrategy/NoRetryStrategy.php +++ b/src/Projection/RetryStrategy/NoRetryStrategy.php @@ -4,14 +4,11 @@ namespace Patchlevel\EventSourcing\Projection\RetryStrategy; +use Patchlevel\EventSourcing\Projection\Projection\Projection; + final class NoRetryStrategy implements RetryStrategy { - public function nextAttempt(Retry|null $retry): Retry|null - { - return null; - } - - public function shouldRetry(Retry|null $retry): bool + public function shouldRetry(Projection $projection): bool { return false; } diff --git a/src/Projection/RetryStrategy/Retry.php b/src/Projection/RetryStrategy/Retry.php deleted file mode 100644 index 1b2de99e2..000000000 --- a/src/Projection/RetryStrategy/Retry.php +++ /dev/null @@ -1,31 +0,0 @@ -attempt; - } - - public function nextRetry(): DateTimeImmutable|null - { - return $this->nextRetry; - } - - public function canRetry(): bool - { - return $this->nextRetry !== null; - } -} diff --git a/src/Projection/RetryStrategy/RetryStrategy.php b/src/Projection/RetryStrategy/RetryStrategy.php index 5f21f29d4..ff4a66f22 100644 --- a/src/Projection/RetryStrategy/RetryStrategy.php +++ b/src/Projection/RetryStrategy/RetryStrategy.php @@ -4,9 +4,9 @@ namespace Patchlevel\EventSourcing\Projection\RetryStrategy; +use Patchlevel\EventSourcing\Projection\Projection\Projection; + interface RetryStrategy { - public function nextAttempt(Retry|null $retry): Retry|null; - - public function shouldRetry(Retry|null $retry): bool; + public function shouldRetry(Projection $projection): bool; } diff --git a/src/Projection/RetryStrategy/UnexpectedError.php b/src/Projection/RetryStrategy/UnexpectedError.php new file mode 100644 index 000000000..da8d1a371 --- /dev/null +++ b/src/Projection/RetryStrategy/UnexpectedError.php @@ -0,0 +1,11 @@ +connection); + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $projectionStore = new DoctrineStore( + $this->connection, + $clock, + ); $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), @@ -80,7 +85,7 @@ public function testHappyPath(): void ); self::assertEquals( - [new Projection('profile_1')], + [new Projection('profile_1', lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'))], $projectionist->projections(), ); @@ -93,6 +98,7 @@ public function testHappyPath(): void Projection::DEFAULT_GROUP, RunMode::FromBeginning, ProjectionStatus::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), ), ], $projectionist->projections(), @@ -111,6 +117,7 @@ public function testHappyPath(): void RunMode::FromBeginning, ProjectionStatus::Active, 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), ), ], $projectionist->projections(), @@ -132,6 +139,7 @@ public function testHappyPath(): void Projection::DEFAULT_GROUP, RunMode::FromBeginning, ProjectionStatus::New, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), ), ], $projectionist->projections(), @@ -144,13 +152,18 @@ public function testHappyPath(): void public function testErrorHandling(): void { + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + $store = new DoctrineDbalStore( $this->connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), 'eventstore', ); - $projectionStore = new DoctrineStore($this->connection); + $projectionStore = new DoctrineStore( + $this->connection, + $clock, + ); $schemaDirector = new DoctrineSchemaDirector( $this->connection, @@ -169,16 +182,15 @@ public function testErrorHandling(): void ); $projector = new ErrorProducerProjector(); - $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); $projectionist = new DefaultProjectionist( $store, $projectionStore, [$projector], - new DefaultRetryStrategy( + new ClockBasedRetryStrategy( $clock, - DefaultRetryStrategy::DEFAULT_BASE_DELAY, - DefaultRetryStrategy::DEFAULT_DELAY_FACTOR, + ClockBasedRetryStrategy::DEFAULT_BASE_DELAY, + ClockBasedRetryStrategy::DEFAULT_DELAY_FACTOR, 2, ), ); @@ -189,7 +201,7 @@ public function testErrorHandling(): void self::assertEquals(ProjectionStatus::Active, $projection->status()); self::assertEquals(null, $projection->projectionError()); - self::assertEquals(null, $projection->retry()); + self::assertEquals(0, $projection->retryAttempt()); $repository = $manager->get(Profile::class); @@ -204,8 +216,7 @@ public function testErrorHandling(): void self::assertEquals(ProjectionStatus::Error, $projection->status()); self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); - self::assertEquals(1, $projection->retry()?->attempt); - self::assertEquals(new DateTimeImmutable('2021-01-01T00:00:10'), $projection->retry()?->nextRetry); + self::assertEquals(0, $projection->retryAttempt()); $projectionist->run(); @@ -214,10 +225,9 @@ public function testErrorHandling(): void self::assertEquals(ProjectionStatus::Error, $projection->status()); self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); - self::assertEquals(1, $projection->retry()?->attempt); - self::assertEquals(new DateTimeImmutable('2021-01-01T00:00:10'), $projection->retry()?->nextRetry); + self::assertEquals(0, $projection->retryAttempt()); - $clock->sleep(10); + $clock->sleep(5); $projectionist->run(); @@ -226,10 +236,9 @@ public function testErrorHandling(): void self::assertEquals(ProjectionStatus::Error, $projection->status()); self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); - self::assertEquals(2, $projection->retry()?->attempt); - self::assertEquals(new DateTimeImmutable('2021-01-01T00:00:20'), $projection->retry()?->nextRetry); + self::assertEquals(1, $projection->retryAttempt()); - $clock->sleep(20); + $clock->sleep(10); $projectionist->run(); @@ -238,7 +247,7 @@ public function testErrorHandling(): void self::assertEquals(ProjectionStatus::Error, $projection->status()); self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); - self::assertEquals(null, $projection->retry()); + self::assertEquals(2, $projection->retryAttempt()); $projectionist->reactivate(new ProjectionistCriteria( ids: ['error_producer'], @@ -248,7 +257,7 @@ public function testErrorHandling(): void self::assertEquals(ProjectionStatus::Active, $projection->status()); self::assertEquals(null, $projection->projectionError()); - self::assertEquals(null, $projection->retry()); + self::assertEquals(0, $projection->retryAttempt()); $projectionist->run(); @@ -257,10 +266,9 @@ public function testErrorHandling(): void self::assertEquals(ProjectionStatus::Error, $projection->status()); self::assertEquals('subscribe error', $projection->projectionError()?->errorMessage); self::assertEquals(ProjectionStatus::Active, $projection->projectionError()?->previousStatus); - self::assertEquals(1, $projection->retry()?->attempt); - self::assertEquals(new DateTimeImmutable('2021-01-01T00:00:40'), $projection->retry()?->nextRetry); + self::assertEquals(0, $projection->retryAttempt()); - $clock->sleep(10); + $clock->sleep(5); $projector->subscribeError = false; $projectionist->run(); @@ -269,7 +277,7 @@ public function testErrorHandling(): void self::assertEquals(ProjectionStatus::Active, $projection->status()); self::assertEquals(null, $projection->projectionError()); - self::assertEquals(null, $projection->retry()); + self::assertEquals(0, $projection->retryAttempt()); } /** @param list $projections */ diff --git a/tests/Unit/Projection/Projection/ProjectionTest.php b/tests/Unit/Projection/Projection/ProjectionTest.php index c8be5f2c5..6c23af7b6 100644 --- a/tests/Unit/Projection/Projection/ProjectionTest.php +++ b/tests/Unit/Projection/Projection/ProjectionTest.php @@ -4,11 +4,12 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Projection\Projection; +use Patchlevel\EventSourcing\Projection\Projection\NoErrorToRetry; use Patchlevel\EventSourcing\Projection\Projection\Projection; use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; use Patchlevel\EventSourcing\Projection\Projection\ThrowableToErrorContextTransformer; -use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -115,22 +116,34 @@ public function testChangePosition(): void self::assertEquals(10, $projection->position()); } - public function testRetry(): void + public function testCanNotRetry(): void { + $this->expectException(NoErrorToRetry::class); + $projection = new Projection( 'test', ); - self::assertEquals(null, $projection->retry()); - - $retry = new Retry(1, null); + $projection->doRetry(); + } - $projection->updateRetry($retry); + public function testDoRetry(): void + { + $projection = new Projection( + 'test', + 'default', + RunMode::FromBeginning, + ProjectionStatus::Error, + 0, + new ProjectionError('test', ProjectionStatus::New, []), + ); - self::assertEquals($retry, $projection->retry()); + self::assertEquals(null, $projection->retryAttempt()); + $projection->doRetry(); + self::assertEquals(1, $projection->retryAttempt()); $projection->resetRetry(); - self::assertEquals(null, $projection->retry()); + self::assertEquals(null, $projection->retryAttempt()); } } diff --git a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php index 67c0f6490..e51a63b4c 100644 --- a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php +++ b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php @@ -5,7 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Projection\Projectionist; use Closure; -use DateTimeImmutable; use Generator; use Patchlevel\EventSourcing\Attribute\Projector as ProjectionAttribute; use Patchlevel\EventSourcing\Attribute\Setup; @@ -22,7 +21,6 @@ use Patchlevel\EventSourcing\Projection\Projection\ThrowableToErrorContextTransformer; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistCriteria; -use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry; use Patchlevel\EventSourcing\Projection\RetryStrategy\RetryStrategy; use Patchlevel\EventSourcing\Store\ArrayStream; use Patchlevel\EventSourcing\Store\Criteria; @@ -397,15 +395,10 @@ public function create(): void $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->shouldNotBeCalled(); - $retry = new Retry(1, new DateTimeImmutable()); - $retryStrategy = $this->prophesize(RetryStrategy::class); - $retryStrategy->nextAttempt(null)->willReturn($retry)->shouldBeCalledOnce(); - $projectionist = new DefaultProjectionist( $streamableStore->reveal(), $projectionStore, [$projector], - $retryStrategy->reveal(), ); $projectionist->boot(); @@ -423,7 +416,6 @@ public function create(): void ProjectionStatus::New, ThrowableToErrorContextTransformer::transform($projector->exception), ), - $retry, ), ], $projectionStore->updatedProjections, @@ -830,15 +822,10 @@ public function handle(Message $message): void $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); - $retry = new Retry(1, new DateTimeImmutable()); - $retryStrategy = $this->prophesize(RetryStrategy::class); - $retryStrategy->nextAttempt(null)->willReturn($retry)->shouldBeCalledOnce(); - $projectionist = new DefaultProjectionist( $streamableStore->reveal(), $projectionStore, [$projector], - $retryStrategy->reveal(), ); $projectionist->run(); @@ -856,7 +843,6 @@ public function handle(Message $message): void ProjectionStatus::Active, ThrowableToErrorContextTransformer::transform($projector->exception), ), - $retry, ), ], $projectionStore->updatedProjections, @@ -1489,29 +1475,24 @@ public function subscribe(): void } }; - $retry1 = new Retry(1, new DateTimeImmutable()); - $retry2 = new Retry(2, new DateTimeImmutable()); - $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); - $projectionStore = new DummyStore([ - new Projection( - $projectionId, - Projection::DEFAULT_GROUP, - RunMode::FromBeginning, - ProjectionStatus::Error, - 0, - new ProjectionError('ERROR', ProjectionStatus::Active), - $retry1, - ), - ]); + $projection = new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Error, + 0, + new ProjectionError('ERROR', ProjectionStatus::Active), + ); + + $projectionStore = new DummyStore([$projection]); $retryStrategy = $this->prophesize(RetryStrategy::class); - $retryStrategy->shouldRetry($retry1)->willReturn(true); - $retryStrategy->nextAttempt($retry1)->willReturn($retry2); + $retryStrategy->shouldRetry($projection)->willReturn(true); $projectionist = new DefaultProjectionist( $streamableStore->reveal(), @@ -1533,13 +1514,13 @@ public function subscribe(): void ProjectionStatus::Active, 0, null, - $retry1, + 1, )); self::assertEquals(ProjectionStatus::Error, $update2->status()); self::assertEquals(ProjectionStatus::Active, $update2->projectionError()?->previousStatus); self::assertEquals('ERROR2', $update2->projectionError()?->errorMessage); - self::assertEquals($retry2, $update2->retry()); + self::assertEquals(1, $update2->retryAttempt()); } public function testShouldNotRetry(): void @@ -1549,24 +1530,21 @@ public function testShouldNotRetry(): void class { }; - $retry = new Retry(1, new DateTimeImmutable()); - $streamableStore = $this->prophesize(Store::class); - $projectionStore = new DummyStore([ - new Projection( - $projectionId, - Projection::DEFAULT_GROUP, - RunMode::FromBeginning, - ProjectionStatus::Error, - 0, - new ProjectionError('ERROR', ProjectionStatus::Active), - $retry, - ), - ]); + $projection = new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Error, + 0, + new ProjectionError('ERROR', ProjectionStatus::Active), + ); + + $projectionStore = new DummyStore([$projection]); $retryStrategy = $this->prophesize(RetryStrategy::class); - $retryStrategy->shouldRetry($retry)->willReturn(false); + $retryStrategy->shouldRetry($projection)->willReturn(false); $projectionist = new DefaultProjectionist( $streamableStore->reveal(), diff --git a/tests/Unit/Projection/RetryStrategy/ClockBasedRetryStrategyTest.php b/tests/Unit/Projection/RetryStrategy/ClockBasedRetryStrategyTest.php new file mode 100644 index 000000000..420486dc4 --- /dev/null +++ b/tests/Unit/Projection/RetryStrategy/ClockBasedRetryStrategyTest.php @@ -0,0 +1,69 @@ +clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00+00:00')); + $this->strategy = new ClockBasedRetryStrategy($this->clock); + } + + /** @param positive-int $seconds */ + #[DataProvider('attemptProvider')] + public function testShouldRetry(int $attempt, int $seconds, bool $expected): void + { + $projection = new Projection( + 'test', + 'default', + RunMode::FromBeginning, + ProjectionStatus::Error, + 0, + null, + $attempt, + $this->clock->now(), + ); + + $this->clock->sleep($seconds); + + self::assertEquals( + $expected, + $this->strategy->shouldRetry($projection), + ); + } + + public static function attemptProvider(): Generator + { + yield [0, 0, false]; + yield [0, 5, true]; + yield [1, 5, false]; + yield [1, 10, true]; + yield [2, 10, false]; + yield [2, 20, true]; + yield [3, 20, false]; + yield [3, 40, true]; + yield [4, 40, false]; + yield [4, 80, true]; + yield [5, 80, false]; + yield [5, 160, false]; + yield [5, 320, false]; + } +} diff --git a/tests/Unit/Projection/RetryStrategy/DefaultRetryStrategyTest.php b/tests/Unit/Projection/RetryStrategy/DefaultRetryStrategyTest.php deleted file mode 100644 index fb6564166..000000000 --- a/tests/Unit/Projection/RetryStrategy/DefaultRetryStrategyTest.php +++ /dev/null @@ -1,87 +0,0 @@ -clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00+00:00')); - $this->strategy = new DefaultRetryStrategy($this->clock); - } - - public function testShouldRetryWithNull(): void - { - self::assertFalse($this->strategy->shouldRetry(null)); - } - - public function testShouldRetryWithoutTime(): void - { - self::assertFalse($this->strategy->shouldRetry(new Retry(1, null))); - } - - public function testShouldRetryWithTime(): void - { - self::assertFalse($this->strategy->shouldRetry(new Retry(1, new DateTimeImmutable()))); - } - - public function testNextAttemptWithNull(): void - { - $expected = new Retry(1, new DateTimeImmutable('2021-01-01T00:00:10+00:00')); - - self::assertEquals($expected, $this->strategy->nextAttempt(null)); - } - - #[DataProvider('attemptProvider')] - public function testNextAttempt(int $attempt, string $dateString): void - { - $expected = new Retry($attempt, new DateTimeImmutable($dateString)); - - self::assertEquals( - $expected, - $this->strategy->nextAttempt( - new Retry( - $attempt - 1, - null, - ), - ), - ); - } - - public static function attemptProvider(): Generator - { - yield 'first attempt' => [1, '2021-01-01T00:00:5+00:00']; - yield 'second attempt' => [2, '2021-01-01T00:00:10+00:00']; - yield 'third attempt' => [3, '2021-01-01T00:00:20+00:00']; - yield 'fourth attempt' => [4, '2021-01-01T00:00:40+00:00']; - yield 'fifth attempt' => [5, '2021-01-01T00:01:20+00:00']; - } - - public function testMaxAttempt(): void - { - self::assertNull( - $this->strategy->nextAttempt( - new Retry( - 6, - null, - ), - ), - ); - } -} diff --git a/tests/Unit/Projection/RetryStrategy/NoRetryStrategyTest.php b/tests/Unit/Projection/RetryStrategy/NoRetryStrategyTest.php index c669cbdfa..dc76fabe8 100644 --- a/tests/Unit/Projection/RetryStrategy/NoRetryStrategyTest.php +++ b/tests/Unit/Projection/RetryStrategy/NoRetryStrategyTest.php @@ -4,51 +4,19 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Projection\RetryStrategy; -use DateTimeImmutable; +use Patchlevel\EventSourcing\Projection\Projection\Projection; use Patchlevel\EventSourcing\Projection\RetryStrategy\NoRetryStrategy; -use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry; use PHPUnit\Framework\TestCase; /** @covers \Patchlevel\EventSourcing\Projection\RetryStrategy\NoRetryStrategy */ final class NoRetryStrategyTest extends TestCase { - public function testShouldRetryWithNull(): void + public function testNull(): void { $strategy = new NoRetryStrategy(); - self::assertFalse($strategy->shouldRetry(null)); - } - - public function testShouldRetryWithoutTime(): void - { - $strategy = new NoRetryStrategy(); - self::assertFalse($strategy->shouldRetry(new Retry(1, null))); - } - - public function testShouldRetryWithTime(): void - { - $strategy = new NoRetryStrategy(); - self::assertFalse($strategy->shouldRetry(new Retry(1, new DateTimeImmutable()))); - } - - public function testNextAttemptWithNull(): void - { - $strategy = new NoRetryStrategy(); - self::assertNull($strategy->nextAttempt(null)); - } - - public function testNextAttemptWithoutTime(): void - { - $strategy = new NoRetryStrategy(); - self::assertNull($strategy->nextAttempt( - new Retry(1, null), - )); - } - public function testNextAttemptWithTime(): void - { - $strategy = new NoRetryStrategy(); - self::assertNull($strategy->nextAttempt( - new Retry(1, new DateTimeImmutable()), + self::assertFalse($strategy->shouldRetry( + new Projection('test'), )); } } From 7a519353c162404eed44af49d8f9fc5288268f7b Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 23 Feb 2024 10:13:15 +0100 Subject: [PATCH 5/5] removed unused exception --- .../Projectionist/Projectionist.php | 2 -- .../Projectionist/ProjectionistError.php | 30 ------------------- 2 files changed, 32 deletions(-) delete mode 100644 src/Projection/Projectionist/ProjectionistError.php diff --git a/src/Projection/Projectionist/Projectionist.php b/src/Projection/Projectionist/Projectionist.php index 3fe85e5e9..c959d860a 100644 --- a/src/Projection/Projectionist/Projectionist.php +++ b/src/Projection/Projectionist/Projectionist.php @@ -11,7 +11,6 @@ interface Projectionist /** * @param positive-int|null $limit * - * @throws ProjectionistError * @throws ProjectorNotFound */ public function boot( @@ -22,7 +21,6 @@ public function boot( /** * @param positive-int|null $limit * - * @throws ProjectionistError * @throws ProjectorNotFound */ public function run( diff --git a/src/Projection/Projectionist/ProjectionistError.php b/src/Projection/Projectionist/ProjectionistError.php deleted file mode 100644 index 0ab37e96a..000000000 --- a/src/Projection/Projectionist/ProjectionistError.php +++ /dev/null @@ -1,30 +0,0 @@ -getMessage(), - ), - 0, - $error, - ); - } -}