diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index f74a24b91..adfe32de7 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -666,24 +666,41 @@ $subscriptionEngine = new DefaultSubscriptionEngine( $retryStrategy, ); ``` -### Catchup Subscription Engine +### Catch up Subscription Engine If aggregates are used in the processors and new events are generated there, then they are not part of the current subscription engine run and will only be processed during the next run or boot. This is usually not a problem in dev or prod environment because a worker is used and these events will be processed at some point. But in testing it is not so easy. -For this reason, we have the `CatchupSubscriptionEngine`. +For this reason, we have the `CatchUpSubscriptionEngine` decorator. ```php -use Patchlevel\EventSourcing\Subscription\Engine\CatchupSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; /** @var SubscriptionEngine $subscriptionStore */ -$catchupSubscriptionEngine = new CatchupSubscriptionEngine($subscriptionEngine); +$catchupSubscriptionEngine = new CatchUpSubscriptionEngine($subscriptionEngine); ``` !!! tip - You can use the `CatchupSubscriptionEngine` in your tests to process the events immediately. + You can use the `CatchUpSubscriptionEngine` in your tests to process the events immediately. + +### Throw by error Subscription Engine + +This is another decorator for the subscription engine. It throws an exception if a subscription is in error state. +This is useful for testing or development to get directly feedback if something is wrong. + +```php +use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine; + +/** @var SubscriptionEngine $subscriptionStore */ +$throwByErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine); +``` +!!! warning + + This is only for testing or development. Don't use it in production. + The subscription engine has an build in retry strategy to retry subscriptions that have failed. ## Usage diff --git a/src/Subscription/Engine/ErrorDetected.php b/src/Subscription/Engine/ErrorDetected.php new file mode 100644 index 000000000..589b5753d --- /dev/null +++ b/src/Subscription/Engine/ErrorDetected.php @@ -0,0 +1,30 @@ + $errors */ + public function __construct( + public readonly array $errors, + ) { + $sentences = array_map( + static fn (Error $error) => sprintf( + 'Subscription %s: %s', + $error->subscriptionId, + $error->message, + ), + $errors, + ); + + parent::__construct("Error in subscription engine detected.\n" . implode("\n", $sentences)); + } +} diff --git a/src/Subscription/Engine/ThrowOnErrorSubscriptionEngine.php b/src/Subscription/Engine/ThrowOnErrorSubscriptionEngine.php new file mode 100644 index 000000000..35f62dbb9 --- /dev/null +++ b/src/Subscription/Engine/ThrowOnErrorSubscriptionEngine.php @@ -0,0 +1,74 @@ +throwOnError($this->parent->setup($criteria, $skipBooting)); + } + + public function boot(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): ProcessedResult + { + return $this->throwOnError($this->parent->boot($criteria, $limit)); + } + + public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): ProcessedResult + { + return $this->throwOnError($this->parent->run($criteria, $limit)); + } + + public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result + { + return $this->throwOnError($this->parent->teardown($criteria)); + } + + public function remove(SubscriptionEngineCriteria|null $criteria = null): Result + { + return $this->throwOnError($this->parent->remove($criteria)); + } + + public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result + { + return $this->throwOnError($this->parent->reactivate($criteria)); + } + + public function pause(SubscriptionEngineCriteria|null $criteria = null): Result + { + return $this->throwOnError($this->parent->pause($criteria)); + } + + /** @return list */ + public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array + { + return $this->parent->subscriptions($criteria); + } + + /** + * @param T $result + * + * @return T + * + * @template T of Result|ProcessedResult + */ + private function throwOnError(Result|ProcessedResult $result): Result|ProcessedResult + { + $errors = $result->errors; + + if ($errors !== []) { + throw new ErrorDetected($errors); + } + + return $result; + } +} diff --git a/tests/Unit/Subscription/Engine/ErrorDetectedTest.php b/tests/Unit/Subscription/Engine/ErrorDetectedTest.php new file mode 100644 index 000000000..bf878d9bd --- /dev/null +++ b/tests/Unit/Subscription/Engine/ErrorDetectedTest.php @@ -0,0 +1,30 @@ +errors); + self::assertSame( + "Error in subscription engine detected.\nSubscription id1: error1\nSubscription id2: error2", + $errorDetected->getMessage(), + ); + } +} diff --git a/tests/Unit/Subscription/Engine/ThrowOnErrorSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/ThrowOnErrorSubscriptionEngineTest.php new file mode 100644 index 000000000..4daca3e25 --- /dev/null +++ b/tests/Unit/Subscription/Engine/ThrowOnErrorSubscriptionEngineTest.php @@ -0,0 +1,266 @@ +prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result(); + + $parent->setup($criteria, true)->willReturn($expectedResult)->shouldBeCalledOnce(); + $result = $engine->setup($criteria, true); + + self::assertSame($expectedResult, $result); + } + + public function testSetupError(): void + { + $this->expectException(ErrorDetected::class); + + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result([ + new Error('id1', 'error1', new RuntimeException('error1')), + new Error('id2', 'error2', new RuntimeException('error2')), + ]); + + $parent->setup($criteria, false)->willReturn($expectedResult)->shouldBeCalledOnce(); + $engine->setup($criteria); + } + + public function testBootSuccess(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new ProcessedResult(5); + + $parent->boot($criteria, 10)->willReturn($expectedResult)->shouldBeCalledOnce(); + $result = $engine->boot($criteria, 10); + + self::assertSame($expectedResult, $result); + } + + public function testBootError(): void + { + $this->expectException(ErrorDetected::class); + + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new ProcessedResult(5, false, [ + new Error('id1', 'error1', new RuntimeException('error1')), + new Error('id2', 'error2', new RuntimeException('error2')), + ]); + + $parent->boot($criteria, 10)->willReturn($expectedResult)->shouldBeCalledOnce(); + $engine->boot($criteria, 10); + } + + public function testRunSuccess(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new ProcessedResult(5); + + $parent->run($criteria, 10)->willReturn($expectedResult)->shouldBeCalledOnce(); + $result = $engine->run($criteria, 10); + + self::assertSame($expectedResult, $result); + } + + public function testRunError(): void + { + $this->expectException(ErrorDetected::class); + + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new ProcessedResult(5, false, [ + new Error('id1', 'error1', new RuntimeException('error1')), + new Error('id2', 'error2', new RuntimeException('error2')), + ]); + + $parent->run($criteria, 10)->willReturn($expectedResult)->shouldBeCalledOnce(); + $engine->run($criteria, 10); + } + + public function testTeardownSuccess(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result(); + + $parent->teardown($criteria)->willReturn($expectedResult)->shouldBeCalledOnce(); + $result = $engine->teardown($criteria); + + self::assertSame($expectedResult, $result); + } + + public function testTeardownError(): void + { + $this->expectException(ErrorDetected::class); + + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result([ + new Error('id1', 'error1', new RuntimeException('error1')), + new Error('id2', 'error2', new RuntimeException('error2')), + ]); + + $parent->teardown($criteria)->willReturn($expectedResult)->shouldBeCalledOnce(); + $engine->teardown($criteria); + } + + public function testRemoveSuccess(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result(); + + $parent->remove($criteria)->willReturn($expectedResult)->shouldBeCalledOnce(); + $result = $engine->remove($criteria); + + self::assertSame($expectedResult, $result); + } + + public function testRemoveError(): void + { + $this->expectException(ErrorDetected::class); + + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result([ + new Error('id1', 'error1', new RuntimeException('error1')), + new Error('id2', 'error2', new RuntimeException('error2')), + ]); + + $parent->remove($criteria)->willReturn($expectedResult)->shouldBeCalledOnce(); + $engine->remove($criteria); + } + + public function testReactivateSuccess(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result(); + + $parent->reactivate($criteria)->willReturn($expectedResult)->shouldBeCalledOnce(); + $result = $engine->reactivate($criteria); + + self::assertSame($expectedResult, $result); + } + + public function testReactivateError(): void + { + $this->expectException(ErrorDetected::class); + + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result([ + new Error('id1', 'error1', new RuntimeException('error1')), + new Error('id2', 'error2', new RuntimeException('error2')), + ]); + + $parent->reactivate($criteria)->willReturn($expectedResult)->shouldBeCalledOnce(); + $engine->reactivate($criteria); + } + + public function testPauseSuccess(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result(); + + $parent->pause($criteria)->willReturn($expectedResult)->shouldBeCalledOnce(); + $result = $engine->pause($criteria); + + self::assertSame($expectedResult, $result); + } + + public function testPauseError(): void + { + $this->expectException(ErrorDetected::class); + + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $expectedResult = new Result([ + new Error('id1', 'error1', new RuntimeException('error1')), + new Error('id2', 'error2', new RuntimeException('error2')), + ]); + + $parent->pause($criteria)->willReturn($expectedResult)->shouldBeCalledOnce(); + $engine->pause($criteria); + } + + public function testSubscriptions(): void + { + $parent = $this->prophesize(SubscriptionEngine::class); + + $engine = new ThrowOnErrorSubscriptionEngine($parent->reveal()); + $criteria = new SubscriptionEngineCriteria(); + + $parent->subscriptions($criteria)->willReturn([])->shouldBeCalledOnce(); + $result = $engine->subscriptions($criteria); + + self::assertSame([], $result); + } +}