diff --git a/baseline.xml b/baseline.xml index 18f6289ae..10aae5a50 100644 --- a/baseline.xml +++ b/baseline.xml @@ -240,4 +240,9 @@ + + + + + diff --git a/docs/pages/projection.md b/docs/pages/projection.md index 88f8e4a88..56023afe2 100644 --- a/docs/pages/projection.md +++ b/docs/pages/projection.md @@ -334,6 +334,7 @@ stateDiagram-v2 Error --> New Error --> Booting Error --> Active + Error --> [*] Outdated --> Active Outdated --> [*] ``` @@ -360,10 +361,11 @@ These projections have a projector, follow the event stream and should be up-to- A projection is finished if the projector has the mode `RunMode::Once`. This means that the projection is only run once and then set to finished if it reaches the end of the event stream. +You can also reactivate the projection if you want so that it continues. ### Outdated -If a projection exists in the projection store +If an active or finished projection exists in the projection store that does not have a projector in the source code with a corresponding projector ID, then this projection is marked as outdated. This happens when either the projector has been deleted @@ -381,10 +383,16 @@ There are two options to reactivate the projection: ### Error If an error occurs in a projector, then the target projection is set to Error. -This projection will then no longer run until the projection is activated again. +This can happen in the create process, in the boot process or in the run process. +This projection will then no longer boot/run until the projection is reactivate or retried. + +The projectionist has a retry strategy to retry projections that have failed. +It tries to reactivate the projection after a certain time and a certain number of attempts. +If this does not work, the projection is set to error and must be manually reactivated. + There are two options here: -* Reactivate the projection, so that the projection is active again. +* Reactivate the projection, so that the projection is in the previous state again. * Remove the projection and rebuild it from scratch. ## Setup @@ -424,11 +432,34 @@ $schemaDirector = new DoctrineSchemaDirector( You can find more about schema configurator [here](./store.md) +### Retry Strategy + +The projectionist uses a retry strategy to retry projections that have failed. +Our default strategy can be configured with the following parameters: + +* `baseDelay` - The base delay in seconds. +* `delayFactor` - The factor by which the delay is multiplied after each attempt. +* `maxAttempts` - The maximum number of attempts. + +```php +use Patchlevel\EventSourcing\Projection\RetryStrategy\DefaultRetryStrategy; + +$retryStrategy = new DefaultRetryStrategy( + baseDelay: 5, + delayFactor: 2, + maxAttempts: 5, +); +``` + +!!! tip + + You can reactivate the projection manually or remove it and rebuild it from scratch. + ### Projectionist Now we can create the projectionist and plug together the necessary services. The event store is needed to load the events, the Projection Store to store the projection state -and the respective projectors. +and the respective projectors. Optionally, we can also pass a retry strategy. ```php use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; @@ -436,7 +467,8 @@ use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; $projectionist = new DefaultProjectionist( $eventStore, $projectionStore, - [$projector1, $projector2, $projector3] + [$projector1, $projector2, $projector3], + $retryStrategy, ); ``` @@ -516,6 +548,9 @@ foreach ($projections as $projection) { } ``` -!!! note +## Learn more - There are also [cli commands](./cli.md) for all commands. +* [How to use CLI commands](./cli.md) +* [How to use Pipeline](./pipeline.md) +* [How to use Event Bus](./event_bus.md) +* [How to Test](./testing.md) \ No newline at end of file diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index 0098786de..75ec4f2e8 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -193,7 +193,7 @@ public function run( new ProjectionCriteria( ids: $criteria->ids, groups: $criteria->groups, - status: [ProjectionStatus::Active, ProjectionStatus::Error], + status: [ProjectionStatus::Active], ), function (array $projections) use ($limit): void { if (count($projections) === 0) { diff --git a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php index 1d382559b..67c0f6490 100644 --- a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php +++ b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php @@ -1477,6 +1477,109 @@ class { ], $projections); } + public function testRetry(): void + { + $projectionId = 'test'; + $projector = new #[ProjectionAttribute('test')] + class { + #[Subscribe(ProfileVisited::class)] + public function subscribe(): void + { + throw new RuntimeException('ERROR2'); + } + }; + + $retry1 = new Retry(1, new DateTimeImmutable()); + $retry2 = new Retry(2, new DateTimeImmutable()); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Error, + 0, + new ProjectionError('ERROR', ProjectionStatus::Active), + $retry1, + ), + ]); + + $retryStrategy = $this->prophesize(RetryStrategy::class); + $retryStrategy->shouldRetry($retry1)->willReturn(true); + $retryStrategy->nextAttempt($retry1)->willReturn($retry2); + + $projectionist = new DefaultProjectionist( + $streamableStore->reveal(), + $projectionStore, + [$projector], + $retryStrategy->reveal(), + ); + + $projectionist->run(); + + self::assertCount(2, $projectionStore->updatedProjections); + + [$update1, $update2] = $projectionStore->updatedProjections; + + self::assertEquals($update1, new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 0, + null, + $retry1, + )); + + self::assertEquals(ProjectionStatus::Error, $update2->status()); + self::assertEquals(ProjectionStatus::Active, $update2->projectionError()?->previousStatus); + self::assertEquals('ERROR2', $update2->projectionError()?->errorMessage); + self::assertEquals($retry2, $update2->retry()); + } + + public function testShouldNotRetry(): void + { + $projectionId = 'test'; + $projector = new #[ProjectionAttribute('test')] + class { + }; + + $retry = new Retry(1, new DateTimeImmutable()); + + $streamableStore = $this->prophesize(Store::class); + + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Error, + 0, + new ProjectionError('ERROR', ProjectionStatus::Active), + $retry, + ), + ]); + + $retryStrategy = $this->prophesize(RetryStrategy::class); + $retryStrategy->shouldRetry($retry)->willReturn(false); + + $projectionist = new DefaultProjectionist( + $streamableStore->reveal(), + $projectionStore, + [$projector], + $retryStrategy->reveal(), + ); + + $projectionist->run(); + + self::assertEquals([], $projectionStore->updatedProjections); + } + #[DataProvider('methodProvider')] public function testCriteria(string $method): void {