diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 790f9a724..4850d4dac 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -106,6 +106,7 @@ nav: - Upcasting: upcasting.md - Message Decorator: message_decorator.md - Split Stream: split_stream.md + - Pipeline / ACL: pipeline.md - Time / Clock: clock.md - Testing: testing.md - CLI: cli.md diff --git a/docs/pages/message.md b/docs/pages/message.md index 398ddb170..bf0e4faff 100644 --- a/docs/pages/message.md +++ b/docs/pages/message.md @@ -97,169 +97,7 @@ use Patchlevel\EventSourcing\Message\Message; /** @var Message $message */ $message->header(ApplicationHeader::class); ``` -## Translator -Translator can be used to manipulate, filter or expand messages or events. -This can be used for anti-corruption layers, data migration, or to fix errors in the event stream. - -### Exclude - -With this translator you can exclude certain events. - -```php -use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator; - -$translator = new ExcludeEventTranslator([EmailChanged::class]); -``` -### Include - -With this translator you can only allow certain events. - -```php -use Patchlevel\EventSourcing\Message\Translator\IncludeEventTranslator; - -$translator = new IncludeEventTranslator([ProfileCreated::class]); -``` -### Filter - -If the translator `ExcludeEventTranslator` and `IncludeEventTranslator` are not sufficient, -you can also write your own filter. -This translator expects a callback that returns either true to allow events or false to not allow them. - -```php -use Patchlevel\EventSourcing\Message\Translator\FilterEventTranslator; - -$translator = new FilterEventTranslator(static function (object $event) { - if (!$event instanceof ProfileCreated) { - return true; - } - - return $event->allowNewsletter(); -}); -``` -### Exclude Events with Header - -With this translator you can exclude event with specific header. - -```php -use Patchlevel\EventSourcing\Message\Translator\ExcludeEventWithHeaderTranslator; -use Patchlevel\EventSourcing\Store\ArchivedHeader; - -$translator = new ExcludeEventWithHeaderTranslator(ArchivedHeader::class); -``` -### Only Events with Header - -With this translator you can only allow events with a specific header. - -```php -use Patchlevel\EventSourcing\Message\Translator\IncludeEventWithHeaderTranslator; - -$translator = new IncludeEventWithHeaderTranslator(ArchivedHeader::class); -``` -### Replace - -If you want to replace an event, you can use the `ReplaceEventTranslator`. -The first parameter you have to define is the event class that you want to replace. -And as a second parameter a callback, that the old event awaits and a new event returns. - -```php -use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator; - -$translator = new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) { - return new NewVisited($oldVisited->profileId()); -}); -``` -### Until - -A use case could also be that you want to look at the projection from a previous point in time. -You can use the `UntilEventTranslator` to only allow events that were `recorded` before this point in time. - -```php -use Patchlevel\EventSourcing\Message\Translator\UntilEventTranslator; - -$translator = new UntilEventTranslator(new DateTimeImmutable('2020-01-01 12:00:00')); -``` -### Recalculate playhead - -This translator can be used to recalculate the playhead. -The playhead must always be in ascending order so that the data is valid. -Some translator can break this order and the translator `RecalculatePlayheadTranslator` can fix this problem. - -```php -use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator; - -$translator = new RecalculatePlayheadTranslator(); -``` -!!! tip - - If you migrate your event stream, you can use the `RecalculatePlayheadTranslator` to fix the playhead. - -### Chain - -If you want to group your translator, you can use one or more `ChainTranslator`. - -```php -use Patchlevel\EventSourcing\Message\Translator\ChainTranslator; -use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator; -use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator; - -$translator = new ChainTranslator([ - new ExcludeEventTranslator([EmailChanged::class]), - new RecalculatePlayheadTranslator(), -]); -``` -### Custom Translator - -You can also write a custom translator. The translator gets a message and can return `n` messages. -There are the following possibilities: - -* Return only the message to an array to leave it unchanged. -* Put another message in the array to swap the message. -* Return an empty array to remove the message. -* Or return multiple messages to enrich the stream. - -In our case, the domain has changed a bit. -In the beginning we had a `ProfileCreated` event that just created a profile. -Now we have a `ProfileRegistered` and a `ProfileActivated` event, -which should replace the `ProfileCreated` event. - -```php -use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Message\Translator\Translator; - -final class SplitProfileCreatedTranslator implements Translator -{ - public function __invoke(Message $message): array - { - $event = $message->event(); - - if (!$event instanceof ProfileCreated) { - return [$message]; - } - - $profileRegisteredMessage = Message::createWithHeaders( - new ProfileRegistered($event->id(), $event->name()), - $message->headers(), - ); - - $profileActivatedMessage = Message::createWithHeaders( - new ProfileActivated($event->id()), - $message->headers(), - ); - - return [$profileRegisteredMessage, $profileActivatedMessage]; - } -} -``` -!!! warning - - Since we changed the number of messages, we have to recalculate the playhead. - -!!! tip - - You don't have to migrate the store directly for every change, - but you can also use the [upcasting](upcasting.md) feature. - ## Learn more * [How to decorate messages](message_decorator.md) @@ -267,3 +105,4 @@ final class SplitProfileCreatedTranslator implements Translator * [How to store messages](store.md) * [How to use subscriptions](subscription.md) * [How to use the event bus](event_bus.md) +* [How to migrate messages](pipeline.md) diff --git a/docs/pages/pipeline.md b/docs/pages/pipeline.md new file mode 100644 index 000000000..8fe975d67 --- /dev/null +++ b/docs/pages/pipeline.md @@ -0,0 +1,409 @@ +# Pipeline / Anti Corruption Layer + +A store is immutable, i.e. it cannot be changed afterwards. +This includes both manipulating events and deleting them. + +Instead, you can duplicate the store and manipulate the events in the process. +Thus the old store remains untouched and you can test the new store beforehand, +whether the migration worked. + +In this example the event `PrivacyAdded` is removed and the event `OldVisited` is replaced by `NewVisited`: + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware; +use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware; +use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware; +use Patchlevel\EventSourcing\Pipeline\Pipeline; +use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; +use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget; + +$pipeline = new Pipeline( + new StoreSource($oldStore), + new StoreTarget($newStore), + [ + new ExcludeEventMiddleware([PrivacyAdded::class]), + new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) { + return new NewVisited($oldVisited->profileId()); + }), + new RecalculatePlayheadMiddleware(), + ] +); +``` + +!!! danger + + Under no circumstances may the same store be used that is used for the source. + Otherwise the store will be broken afterwards! + +The pipeline can also be used to create or rebuild a projection: + +```php +use Patchlevel\EventSourcing\Pipeline\Pipeline; +use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; +use Patchlevel\EventSourcing\Pipeline\Target\ProjectionTarget; + +$pipeline = new Pipeline( + new StoreSource($store), + new ProjectionTarget($projection) +); +``` + +The principle remains the same. +There is a source where the data comes from. +A target where the data should flow. +And any number of middlewares to do something with the data beforehand. + +## Source + +The first thing you need is a source of where the data should come from. + +### Store + +The `StoreSource` is the standard source to load all events from the database. + +```php +use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; + +$source = new StoreSource($store); +``` + +### In Memory + +There is an `InMemorySource` that receives the messages in an array. This source can be used to write pipeline tests. + +```php +use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\Pipeline\Source\InMemorySource; + +$source = new InMemorySource([ + new Message( + Profile::class, + '1', + 1, + new ProfileCreated(Email::fromString('david.badura@patchlevel.de')), + ), + // ... +]); +``` + +### Custom Source + +You can also create your own source class. It has to inherit from `Source`. +Here you can, for example, create a migration from another event sourcing system or similar system. + +```php +use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\Pipeline\Source\Source; + +$source = new class implements Source { + /** + * @return Generator + */ + public function load(): Generator + { + yield new Message( + Profile::class, + '1', + 0, + new ProfileCreated('1', ['name' => 'David']) + ); + } + + public function count(): int + { + reutrn 1; + } +} +``` + +## Target + +After you have a source, you still need the destination of the pipeline. + +### Store + +You can use a store to save the final result. + +```php +use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget; + +$target = new StoreTarget($store); +``` + +!!! danger + + Under no circumstances may the same store be used that is used for the source. + Otherwise the store will be broken afterwards! + +!!! note + + It does not matter whether the previous store was a SingleTable or a MultiTable. + You can switch back and forth between both store types using the pipeline. + +### Projection + +A projection can also be used as a target. +For example, to set up a new projection or to build a new projection. + +```php +use Patchlevel\EventSourcing\Pipeline\Target\ProjectionTarget; + +$target = new ProjectionTarget($projection); +``` + +### Projection Handler + +If you want to build or create all projections from scratch, +then you can also use the ProjectionRepositoryTarget. +In this, the individual projections are iterated and the events are then passed on. + +```php +use Patchlevel\EventSourcing\Pipeline\Target\ProjectionHandlerTarget; + +$target = new ProjectionHandlerTarget($projectionHandler); +``` + +### In Memory + +There is also an in-memory variant for the target. This target can also be used for tests. +With the `messages` method you get all `Messages` that have reached the target. + +```php +use Patchlevel\EventSourcing\Pipeline\Target\InMemoryTarget; + +$target = new InMemoryTarget(); + +// run pipeline + +$messages = $target->messages(); +``` + +### Custom Target + +You can also define your own target. To do this, you need to implement the `Target` interface. + +```php +use Patchlevel\EventSourcing\EventBus\Message; + +final class OtherStoreTarget implements Target +{ + private OtherStore $store; + + public function __construct(OtherStore $store) + { + $this->store = $store; + } + + public function save(Message $message): void + { + $this->store->save($message); + } +} +``` + +## Middlewares + +Middelwares can be used to manipulate, delete or expand messages or events during the process. + +!!! warning + + It is important to know that some middlewares require recalculation from the playhead, + if the target is a store. This is a numbering of the events that must be in ascending order. + A corresponding note is supplied with every middleware. + +### Exclude + +With this middleware you can exclude certain events. + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware; + +$middleware = new ExcludeEventMiddleware([EmailChanged::class]); +``` + +!!! warning + + After this middleware, the playhead must be recalculated! + +### Include + + +With this middleware you can only allow certain events. + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventMiddleware; + +$middleware = new IncludeEventMiddleware([ProfileCreated::class]); +``` + +!!! warning + + After this middleware, the playhead must be recalculated! + +### Filter + +If the middlewares `ExcludeEventMiddleware` and `IncludeEventMiddleware` are not sufficient, +you can also write your own filter. +This middleware expects a callback that returns either true to allow events or false to not allow them. + +```php +use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\Middleware\FilterEventMiddleware; + +$middleware = new FilterEventMiddleware(function (AggregateChanged $event) { + if (!$event instanceof ProfileCreated) { + return true; + } + + return $event->allowNewsletter(); +}); +``` + +!!! warning + + After this middleware, the playhead must be recalculated! + +### Exclude Archived Events + +With this middleware you can exclude archived events. + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeArchivedEventMiddleware; + +$middleware = new ExcludeArchivedEventMiddleware(); +``` + +!!! warning + + After this middleware, the playhead must be recalculated! + +### Only Archived Events + + +With this middleware you can only allow archived events. + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\OnlyArchivedEventMiddleware; + +$middleware = new OnlyArchivedEventMiddleware(); +``` + +!!! warning + + After this middleware, the playhead must be recalculated! + +### Replace + +If you want to replace an event, you can use the `ReplaceEventMiddleware`. +The first parameter you have to define is the event class that you want to replace. +And as a second parameter a callback, that the old event awaits and a new event returns. + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware; + +$middleware = new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) { + return new NewVisited($oldVisited->profileId()); +}); +``` + +!!! note + + The middleware takes over the playhead and recordedAt information. + +### Until + +A use case could also be that you want to look at the projection from a previous point in time. +You can use the `UntilEventMiddleware` to only allow events that were `recorded` before this point in time. + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\ClassRenameMiddleware; + +$middleware = new UntilEventMiddleware(new DateTimeImmutable('2020-01-01 12:00:00')); +``` + +!!! warning + + After this middleware, the playhead must be recalculated! + +### Recalculate playhead + +This middleware can be used to recalculate the playhead. +The playhead must always be in ascending order so that the data is valid. +Some middleware can break this order and the middleware `RecalculatePlayheadMiddleware` can fix this problem. + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware; + +$middleware = new RecalculatePlayheadMiddleware(); +``` + +!!! note + + You only need to add this middleware once at the end of the pipeline. + +### Chain + +If you want to group your middleware, you can use one or more `ChainMiddleware`. + +```php +use Patchlevel\EventSourcing\Pipeline\Middleware\ChainMiddleware; +use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware; +use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware; + +$middleware = new ChainMiddleware([ + new ExcludeEventMiddleware([EmailChanged::class]), + new RecalculatePlayheadMiddleware() +]); +``` + +### Custom middleware + +You can also write a custom middleware. The middleware gets a message and can return `N` messages. +There are the following possibilities: + +* Return only the message to an array to leave it unchanged. +* Put another message in the array to swap the message. +* Return an empty array to remove the message. +* Or return multiple messages to enrich the stream. + +In our case, the domain has changed a bit. +In the beginning we had a `ProfileCreated` event that just created a profile. +Now we have a `ProfileRegistered` and a `ProfileActivated` event, +which should replace the `ProfileCreated` event. + +```php +use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware; + +final class SplitProfileCreatedMiddleware implements Middleware +{ + public function __invoke(Message $message): array + { + $event = $message->event(); + + if (!$event instanceof ProfileCreated) { + return [$message]; + } + + $profileRegisteredMessage = Message::createWithHeaders( + new ProfileRegistered($event->id(), $event->name()), + $message->headers() + ); + + $profileActivatedMessage = Message::createWithHeaders( + new ProfileActivated($event->id()), + $message->headers() + ); + + return [$profileRegisteredMessage, $profileActivatedMessage]; + } +} +``` + +!!! warning + + Since we changed the number of messages, we have to recalculate the playhead. + +!!! note + + You can find more about messages [here](event_bus.md). \ No newline at end of file diff --git a/src/Message/Message.php b/src/Message/Message.php index 40635c267..0085354c4 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -84,6 +84,15 @@ public function withHeader(object $header): self return $message; } + /** @param class-string $name */ + public function removeHeader(string $name): self + { + $message = clone $this; + unset($message->headers[$name]); + + return $message; + } + /** @return list */ public function headers(): array { diff --git a/src/Message/Translator/ChainTranslator.php b/src/Message/Translator/ChainTranslator.php index 708bdb3b4..03931c81b 100644 --- a/src/Message/Translator/ChainTranslator.php +++ b/src/Message/Translator/ChainTranslator.php @@ -8,6 +8,9 @@ use function array_values; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ChainMiddleware instead + */ final class ChainTranslator implements Translator { /** @param iterable $translators */ diff --git a/src/Message/Translator/ExcludeEventTranslator.php b/src/Message/Translator/ExcludeEventTranslator.php index f4a26f632..aeaaadc9d 100644 --- a/src/Message/Translator/ExcludeEventTranslator.php +++ b/src/Message/Translator/ExcludeEventTranslator.php @@ -6,6 +6,9 @@ use Patchlevel\EventSourcing\Message\Message; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware instead + */ final class ExcludeEventTranslator implements Translator { /** @param list $classes */ diff --git a/src/Message/Translator/ExcludeEventWithHeaderTranslator.php b/src/Message/Translator/ExcludeEventWithHeaderTranslator.php index 4fa4b7854..fcd928956 100644 --- a/src/Message/Translator/ExcludeEventWithHeaderTranslator.php +++ b/src/Message/Translator/ExcludeEventWithHeaderTranslator.php @@ -6,6 +6,9 @@ use Patchlevel\EventSourcing\Message\Message; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventWithHeaderMiddleware instead + */ final class ExcludeEventWithHeaderTranslator implements Translator { /** @param class-string $header */ diff --git a/src/Message/Translator/FilterEventTranslator.php b/src/Message/Translator/FilterEventTranslator.php index b8f3c5b78..b93261cb9 100644 --- a/src/Message/Translator/FilterEventTranslator.php +++ b/src/Message/Translator/FilterEventTranslator.php @@ -6,6 +6,9 @@ use Patchlevel\EventSourcing\Message\Message; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\FilterEventMiddleware instead + */ final class FilterEventTranslator implements Translator { /** @var callable(object $event):bool */ diff --git a/src/Message/Translator/IncludeEventTranslator.php b/src/Message/Translator/IncludeEventTranslator.php index ae2a412f5..4bd1dc230 100644 --- a/src/Message/Translator/IncludeEventTranslator.php +++ b/src/Message/Translator/IncludeEventTranslator.php @@ -6,6 +6,9 @@ use Patchlevel\EventSourcing\Message\Message; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventWithHeaderMiddleware instead + */ final class IncludeEventTranslator implements Translator { /** @param list $classes */ diff --git a/src/Message/Translator/IncludeEventWithHeaderTranslator.php b/src/Message/Translator/IncludeEventWithHeaderTranslator.php index 8c88514ae..6dcd58b5b 100644 --- a/src/Message/Translator/IncludeEventWithHeaderTranslator.php +++ b/src/Message/Translator/IncludeEventWithHeaderTranslator.php @@ -6,6 +6,9 @@ use Patchlevel\EventSourcing\Message\Message; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventWithHeaderMiddleware instead + */ final class IncludeEventWithHeaderTranslator implements Translator { /** @param class-string $header */ diff --git a/src/Message/Translator/RecalculatePlayheadTranslator.php b/src/Message/Translator/RecalculatePlayheadTranslator.php index 27185baaa..de3011650 100644 --- a/src/Message/Translator/RecalculatePlayheadTranslator.php +++ b/src/Message/Translator/RecalculatePlayheadTranslator.php @@ -11,6 +11,9 @@ use function array_key_exists; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware instead + */ final class RecalculatePlayheadTranslator implements Translator { /** @var array */ diff --git a/src/Message/Translator/ReplaceEventTranslator.php b/src/Message/Translator/ReplaceEventTranslator.php index 3e13fa416..d31076175 100644 --- a/src/Message/Translator/ReplaceEventTranslator.php +++ b/src/Message/Translator/ReplaceEventTranslator.php @@ -6,7 +6,11 @@ use Patchlevel\EventSourcing\Message\Message; -/** @template T of object */ +/** + * @template T of object + * + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware instead + */ final class ReplaceEventTranslator implements Translator { /** @var callable(T $event):object */ diff --git a/src/Message/Translator/Translator.php b/src/Message/Translator/Translator.php index 66fe20057..7450c944f 100644 --- a/src/Message/Translator/Translator.php +++ b/src/Message/Translator/Translator.php @@ -6,6 +6,9 @@ use Patchlevel\EventSourcing\Message\Message; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware instead + */ interface Translator { /** @return list */ diff --git a/src/Message/Translator/UntilEventTranslator.php b/src/Message/Translator/UntilEventTranslator.php index 3cdf739ad..1d14ca6a3 100644 --- a/src/Message/Translator/UntilEventTranslator.php +++ b/src/Message/Translator/UntilEventTranslator.php @@ -10,6 +10,9 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Store\StreamHeader; +/** + * @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\UntilEventMiddleware instead + */ final class UntilEventTranslator implements Translator { public function __construct( diff --git a/src/Pipeline/Middleware/AggregateToStreamHeaderMiddleware.php b/src/Pipeline/Middleware/AggregateToStreamHeaderMiddleware.php new file mode 100644 index 000000000..9620a041f --- /dev/null +++ b/src/Pipeline/Middleware/AggregateToStreamHeaderMiddleware.php @@ -0,0 +1,35 @@ + */ + public function __invoke(Message $message): array + { + if (!$message->hasHeader(AggregateHeader::class)) { + return [$message]; + } + + $aggregateHeader = $message->header(AggregateHeader::class); + + return [ + $message + ->removeHeader(AggregateHeader::class) + ->withHeader(new StreamHeader( + $aggregateHeader->streamName(), + $aggregateHeader->playhead, + $aggregateHeader->recordedOn, + )) + ]; + } +} diff --git a/src/Pipeline/Middleware/ChainMiddleware.php b/src/Pipeline/Middleware/ChainMiddleware.php new file mode 100644 index 000000000..50a06d515 --- /dev/null +++ b/src/Pipeline/Middleware/ChainMiddleware.php @@ -0,0 +1,46 @@ + $translators */ + public function __construct( + private readonly iterable $translators, + ) { + } + + /** @return list */ + public function __invoke(Message $message): array + { + $messages = [$message]; + + foreach ($this->translators as $middleware) { + $messages = $this->process($middleware, $messages); + } + + return $messages; + } + + /** + * @param list $messages + * + * @return list + */ + private function process(Middleware $translator, array $messages): array + { + $result = []; + + foreach ($messages as $message) { + $result += $translator($message); + } + + return array_values($result); + } +} diff --git a/src/Pipeline/Middleware/ExcludeEventMiddleware.php b/src/Pipeline/Middleware/ExcludeEventMiddleware.php new file mode 100644 index 000000000..0fcb96376 --- /dev/null +++ b/src/Pipeline/Middleware/ExcludeEventMiddleware.php @@ -0,0 +1,28 @@ + $classes */ + public function __construct( + private readonly array $classes, + ) { + } + + /** @return list */ + public function __invoke(Message $message): array + { + foreach ($this->classes as $class) { + if ($message->event() instanceof $class) { + return []; + } + } + + return [$message]; + } +} diff --git a/src/Pipeline/Middleware/ExcludeEventWithHeaderMiddleware.php b/src/Pipeline/Middleware/ExcludeEventWithHeaderMiddleware.php new file mode 100644 index 000000000..608612285 --- /dev/null +++ b/src/Pipeline/Middleware/ExcludeEventWithHeaderMiddleware.php @@ -0,0 +1,26 @@ + */ + public function __invoke(Message $message): array + { + if ($message->hasHeader($this->header)) { + return []; + } + + return [$message]; + } +} diff --git a/src/Pipeline/Middleware/FilterEventMiddleware.php b/src/Pipeline/Middleware/FilterEventMiddleware.php new file mode 100644 index 000000000..9aeaefb55 --- /dev/null +++ b/src/Pipeline/Middleware/FilterEventMiddleware.php @@ -0,0 +1,31 @@ +callable = $callable; + } + + /** @return list */ + public function __invoke(Message $message): array + { + $result = ($this->callable)($message->event()); + + if ($result) { + return [$message]; + } + + return []; + } +} diff --git a/src/Pipeline/Middleware/IncludeEventMiddleware.php b/src/Pipeline/Middleware/IncludeEventMiddleware.php new file mode 100644 index 000000000..7b3262423 --- /dev/null +++ b/src/Pipeline/Middleware/IncludeEventMiddleware.php @@ -0,0 +1,28 @@ + $classes */ + public function __construct( + private readonly array $classes, + ) { + } + + /** @return list */ + public function __invoke(Message $message): array + { + foreach ($this->classes as $class) { + if ($message->event() instanceof $class) { + return [$message]; + } + } + + return []; + } +} diff --git a/src/Pipeline/Middleware/IncludeEventWithHeaderMiddleware.php b/src/Pipeline/Middleware/IncludeEventWithHeaderMiddleware.php new file mode 100644 index 000000000..3ad46446b --- /dev/null +++ b/src/Pipeline/Middleware/IncludeEventWithHeaderMiddleware.php @@ -0,0 +1,26 @@ + */ + public function __invoke(Message $message): array + { + if ($message->hasHeader($this->header)) { + return [$message]; + } + + return []; + } +} diff --git a/src/Pipeline/Middleware/Middleware.php b/src/Pipeline/Middleware/Middleware.php new file mode 100644 index 000000000..490f19514 --- /dev/null +++ b/src/Pipeline/Middleware/Middleware.php @@ -0,0 +1,13 @@ + */ + public function __invoke(Message $message): array; +} diff --git a/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php new file mode 100644 index 000000000..cabb99f93 --- /dev/null +++ b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php @@ -0,0 +1,76 @@ + */ + private array $index = []; + + /** @return list */ + public function __invoke(Message $message): array + { + try { + $header = $message->header(AggregateHeader::class); + } catch (HeaderNotFound) { + try { + $header = $message->header(StreamHeader::class); + } catch (HeaderNotFound) { + return [$message]; + } + } + + $stream = $header instanceof StreamHeader ? $header->streamName : $header->streamName(); + + $playhead = $this->nextPlayhead($stream); + + if ($header->playhead === $playhead) { + return [$message]; + } + + if ($header instanceof StreamHeader) { + return [ + $message->withHeader(new StreamHeader( + $header->streamName, + $playhead, + $header->recordedOn, + )), + ]; + } + + return [ + $message->withHeader(new AggregateHeader( + $header->aggregateName, + $header->aggregateId, + $playhead, + $header->recordedOn, + )), + ]; + } + + public function reset(): void + { + $this->index = []; + } + + /** @return positive-int */ + private function nextPlayhead(string $stream): int + { + if (!array_key_exists($stream, $this->index)) { + $this->index[$stream] = 1; + } else { + $this->index[$stream]++; + } + + return $this->index[$stream]; + } +} diff --git a/src/Pipeline/Middleware/ReplaceEventMiddleware.php b/src/Pipeline/Middleware/ReplaceEventMiddleware.php new file mode 100644 index 000000000..9dc92fdb5 --- /dev/null +++ b/src/Pipeline/Middleware/ReplaceEventMiddleware.php @@ -0,0 +1,40 @@ + $class + * @param callable(T $event):object $callable + */ + public function __construct( + private readonly string $class, + callable $callable, + ) { + $this->callable = $callable; + } + + /** @return list */ + public function __invoke(Message $message): array + { + $event = $message->event(); + + if (!$event instanceof $this->class) { + return [$message]; + } + + $callable = $this->callable; + $newEvent = $callable($event); + + return [Message::createWithHeaders($newEvent, $message->headers())]; + } +} diff --git a/src/Pipeline/Middleware/UntilEventMiddleware.php b/src/Pipeline/Middleware/UntilEventMiddleware.php new file mode 100644 index 000000000..44cc201ba --- /dev/null +++ b/src/Pipeline/Middleware/UntilEventMiddleware.php @@ -0,0 +1,41 @@ + */ + public function __invoke(Message $message): array + { + try { + $header = $message->header(AggregateHeader::class); + } catch (HeaderNotFound) { + try { + $header = $message->header(StreamHeader::class); + } catch (HeaderNotFound) { + return [$message]; + } + } + + $recordedOn = $header->recordedOn; + + if ($recordedOn < $this->until) { + return [$message]; + } + + return []; + } +} diff --git a/src/Pipeline/Pipeline.php b/src/Pipeline/Pipeline.php new file mode 100644 index 000000000..bc0cefc18 --- /dev/null +++ b/src/Pipeline/Pipeline.php @@ -0,0 +1,60 @@ +|Middleware $middlewares */ + public function __construct( + private readonly Source $source, + private readonly Target $target, + array|Middleware $middlewares = [], + private readonly int $bufferSize = 1_000, + ) { + if (is_array($middlewares)) { + $this->middleware = new ChainMiddleware($middlewares); + } else { + $this->middleware = $middlewares; + } + } + + public function run(): void + { + $buffer = []; + + foreach ($this->source->load() as $message) { + $result = ($this->middleware)($message); + + array_push($buffer, ...$result); + + if (count($buffer) >= $this->bufferSize) { + $this->target->save(...$result); + $buffer = []; + } + } + + if (count($buffer) > 0) { + $this->target->save(...$buffer); + } + } + + public static function execute( + Source $source, + Target $target, + array|Middleware $middlewares = [], + $bufferSize = 1_000, + ): void + { + $pipeline = new self($source, $target, $middlewares, $bufferSize); + $pipeline->run(); + } +} \ No newline at end of file diff --git a/src/Pipeline/Source/InMemorySource.php b/src/Pipeline/Source/InMemorySource.php new file mode 100644 index 000000000..720050140 --- /dev/null +++ b/src/Pipeline/Source/InMemorySource.php @@ -0,0 +1,22 @@ + $messages */ + public function __construct( + private readonly array $messages + ) { + } + + /** @return iterable */ + public function load(): iterable + { + return $this->messages; + } +} \ No newline at end of file diff --git a/src/Pipeline/Source/Source.php b/src/Pipeline/Source/Source.php new file mode 100644 index 000000000..1870b8d78 --- /dev/null +++ b/src/Pipeline/Source/Source.php @@ -0,0 +1,13 @@ + */ + public function load(): iterable; +} \ No newline at end of file diff --git a/src/Pipeline/Source/StoreSource.php b/src/Pipeline/Source/StoreSource.php new file mode 100644 index 000000000..a21825795 --- /dev/null +++ b/src/Pipeline/Source/StoreSource.php @@ -0,0 +1,21 @@ +store->load(); + } +} \ No newline at end of file diff --git a/src/Pipeline/Target/InMemoryTarget.php b/src/Pipeline/Target/InMemoryTarget.php new file mode 100644 index 000000000..ad7a102ac --- /dev/null +++ b/src/Pipeline/Target/InMemoryTarget.php @@ -0,0 +1,31 @@ + */ + private array $messages = []; + + public function save(Message ...$message): void + { + foreach ($message as $m) { + $this->messages[] = $m; + } + } + + /** @return list */ + public function messages(): array + { + return $this->messages; + } + + public function clear(): void + { + $this->messages = []; + } +} \ No newline at end of file diff --git a/src/Pipeline/Target/StoreTarget.php b/src/Pipeline/Target/StoreTarget.php new file mode 100644 index 000000000..ef9e60a9b --- /dev/null +++ b/src/Pipeline/Target/StoreTarget.php @@ -0,0 +1,21 @@ +store->save(...$message); + } +} \ No newline at end of file diff --git a/src/Pipeline/Target/Target.php b/src/Pipeline/Target/Target.php new file mode 100644 index 000000000..2a9be09df --- /dev/null +++ b/src/Pipeline/Target/Target.php @@ -0,0 +1,12 @@ +connection; + } + private function createTriggerFunctionName(): string { $tableConfig = explode('.', $this->config['table_name']); diff --git a/src/Store/StreamDoctrineDbalStore.php b/src/Store/StreamDoctrineDbalStore.php index d9c1cd166..703c561fb 100644 --- a/src/Store/StreamDoctrineDbalStore.php +++ b/src/Store/StreamDoctrineDbalStore.php @@ -435,6 +435,11 @@ public function setupSubscription(): void )); } + public function connection(): Connection + { + return $this->connection; + } + private function createTriggerFunctionName(): string { $tableConfig = explode('.', $this->config['table_name']); diff --git a/tests/Integration/Subscription/Subscriber/MigrateAggregateToStreamStoreSubscriber.php b/tests/Integration/Subscription/Subscriber/MigrateAggregateToStreamStoreSubscriber.php new file mode 100644 index 000000000..20d2b41bd --- /dev/null +++ b/tests/Integration/Subscription/Subscriber/MigrateAggregateToStreamStoreSubscriber.php @@ -0,0 +1,91 @@ + + */ + private array $messages = []; + + public function __construct( + private readonly StreamDoctrineDbalStore $targetStore, + ) { + $this->schemaDirector = new DoctrineSchemaDirector( + $targetStore->connection(), + new ChainDoctrineSchemaConfigurator([ + $targetStore, + ]), + ); + } + + #[Subscribe('*')] + public function handle(Message $message): void + { + $this->messages[] = $message; + } + + public function beginBatch(): void + { + $this->messages = []; + } + + public function commitBatch(): void + { + $messages = $this->messages; + $this->messages = []; + + Pipeline::execute( + new InMemorySource($messages), + new StoreTarget($this->targetStore), + new AggregateToStreamHeaderMiddleware(), + self::BATCH_SIZE * 10, // make sure we have only one batch + ); + } + + public function rollbackBatch(): void + { + $this->messages = []; + } + + public function forceCommit(): bool + { + return count($this->messages) >= self::BATCH_SIZE; + } + + #[Setup] + public function setup(): void + { + $this->schemaDirector->create(); + } + + #[Teardown] + public function teardown(): void + { + $this->schemaDirector->drop(); + } +} diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 08af8133e..fe5a45676 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -22,6 +22,7 @@ use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; +use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; @@ -33,6 +34,7 @@ use Patchlevel\EventSourcing\Subscription\Subscription; use Patchlevel\EventSourcing\Tests\DbalManager; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerSubscriber; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\MigrateAggregateToStreamStoreSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProjection; @@ -927,6 +929,122 @@ public function testBlueGreenDeploymentRollback(): void ); } + public function testPipeline(): void + { + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $store = new DoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $targetStore = new StreamDoctrineDbalStore( + $this->projectionConnection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + config: ['table_name' => 'new_eventstore'], + ); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + ]), + ); + + $schemaDirector->create(); + + $engine = new DefaultSubscriptionEngine( + $store, + $subscriptionStore, + new MetadataSubscriberAccessorRepository([new MigrateAggregateToStreamStoreSubscriber($targetStore)]), + ); + + self::assertEquals( + [ + new Subscription( + 'migrate', + 'default', + RunMode::Once, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + self::assertTrue( + $this->projectionConnection->createSchemaManager()->tableExists('new_eventstore'), + ); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + + for ($i = 1; $i < 1_000; $i++) { + $profile->changeName(sprintf('John %d', $i)); + } + + $repository->save($profile); + + $result = $engine->boot(); + + self::assertEquals(1_000, $result->processedMessages); + + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'migrate', + 'default', + RunMode::Once, + Status::Finished, + 1_000, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + // target store check + + + $result = $engine->remove(); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'migrate', + 'default', + RunMode::Once, + Status::New, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + self::assertFalse( + $this->projectionConnection->createSchemaManager()->tableExists('new_eventstore'), + ); + } + /** @param list $subscriptions */ private static function findSubscription(array $subscriptions, string $id): Subscription {