Skip to content

Commit

Permalink
add throw by error subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Apr 20, 2024
1 parent cf3c330 commit 773ce32
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 5 deletions.
27 changes: 22 additions & 5 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -666,24 +666,41 @@ $subscriptionEngine = new DefaultSubscriptionEngine(
$retryStrategy,
);
```
### Catchup Subscription Engine
### Catch up Subscription Engine

If aggregates are used in the processors and new events are generated there,
then they are not part of the current subscription engine run and will only be processed during the next run or boot.
This is usually not a problem in dev or prod environment because a worker is used
and these events will be processed at some point. But in testing it is not so easy.
For this reason, we have the `CatchupSubscriptionEngine`.
For this reason, we have the `CatchUpSubscriptionEngine` decorator.

```php
use Patchlevel\EventSourcing\Subscription\Engine\CatchupSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;

/** @var SubscriptionEngine $subscriptionStore */
$catchupSubscriptionEngine = new CatchupSubscriptionEngine($subscriptionEngine);
$catchupSubscriptionEngine = new CatchUpSubscriptionEngine($subscriptionEngine);
```
!!! tip

You can use the `CatchupSubscriptionEngine` in your tests to process the events immediately.
You can use the `CatchUpSubscriptionEngine` in your tests to process the events immediately.

### Throw by error Subscription Engine

This is another decorator for the subscription engine. It throws an exception if a subscription is in error state.
This is useful for testing or development to get directly feedback if something is wrong.

```php
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine;

/** @var SubscriptionEngine $subscriptionStore */
$throwByErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine);
```
!!! warning

This is only for testing or development. Don't use it in production.
The subscription engine has an build in retry strategy to retry subscriptions that have failed.

## Usage

Expand Down
30 changes: 30 additions & 0 deletions src/Subscription/Engine/ErrorDetected.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use RuntimeException;

use function array_map;
use function implode;
use function sprintf;

final class ErrorDetected extends RuntimeException
{
/** @param list<Error> $errors */
public function __construct(
public readonly array $errors,
) {
$sentences = array_map(
static fn (Error $error) => sprintf(
'Subscription %s: %s',
$error->subscriptionId,
$error->message,
),
$errors,
);

parent::__construct("Error in subscription engine detected.\n" . implode("\n", $sentences));
}
}
74 changes: 74 additions & 0 deletions src/Subscription/Engine/ThrowOnErrorSubscriptionEngine.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Patchlevel\EventSourcing\Subscription\Subscription;

final class ThrowOnErrorSubscriptionEngine implements SubscriptionEngine
{
public function __construct(
private readonly SubscriptionEngine $parent,
) {
}

public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result
{
return $this->throwOnError($this->parent->setup($criteria, $skipBooting));
}

public function boot(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): ProcessedResult
{
return $this->throwOnError($this->parent->boot($criteria, $limit));
}

public function run(SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null): ProcessedResult
{
return $this->throwOnError($this->parent->run($criteria, $limit));
}

public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result
{
return $this->throwOnError($this->parent->teardown($criteria));
}

public function remove(SubscriptionEngineCriteria|null $criteria = null): Result
{
return $this->throwOnError($this->parent->remove($criteria));
}

public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result
{
return $this->throwOnError($this->parent->reactivate($criteria));
}

public function pause(SubscriptionEngineCriteria|null $criteria = null): Result
{
return $this->throwOnError($this->parent->pause($criteria));
}

/** @return list<Subscription> */
public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array
{
return $this->parent->subscriptions($criteria);
}

/**
* @param T $result
*
* @return T
*
* @template T of Result|ProcessedResult
*/
private function throwOnError(Result|ProcessedResult $result): Result|ProcessedResult
{
$errors = $result->errors;

if ($errors !== []) {
throw new ErrorDetected($errors);
}

return $result;
}
}
30 changes: 30 additions & 0 deletions tests/Unit/Subscription/Engine/ErrorDetectedTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Unit\Subscription\Engine;

use Patchlevel\EventSourcing\Subscription\Engine\Error;
use Patchlevel\EventSourcing\Subscription\Engine\ErrorDetected;
use PHPUnit\Framework\TestCase;
use RuntimeException;

/** @covers \Patchlevel\EventSourcing\Subscription\Engine\ErrorDetected */
final class ErrorDetectedTest extends TestCase
{
public function testError(): void
{
$errors = [
new Error('id1', 'error1', new RuntimeException('error1')),
new Error('id2', 'error2', new RuntimeException('error2')),
];

$errorDetected = new ErrorDetected($errors);

self::assertSame($errors, $errorDetected->errors);
self::assertSame(
"Error in subscription engine detected.\nSubscription id1: error1\nSubscription id2: error2",
$errorDetected->getMessage(),
);
}
}
Loading

0 comments on commit 773ce32

Please sign in to comment.