From b3e5fdf4c9fa6ed2d1355fd7e596dbe6ca007ce8 Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 3 Apr 2024 16:53:17 +0200 Subject: [PATCH] update message / event bus docs --- docs/mkdocs.yml | 1 - docs/pages/event_bus.md | 22 ++- docs/pages/message.md | 249 +++++++++++++++++++++++--- docs/pages/pipeline.md | 377 ---------------------------------------- 4 files changed, 235 insertions(+), 414 deletions(-) delete mode 100644 docs/pages/pipeline.md diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 879fa0d65..a427d9f7a 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -92,7 +92,6 @@ nav: - Personal Data: personal_data.md - Upcasting: upcasting.md - Outbox: outbox.md - - Pipeline: pipeline.md - Message Decorator: message_decorator.md - Split Stream: split_stream.md - Time / Clock: clock.md diff --git a/docs/pages/event_bus.md b/docs/pages/event_bus.md index 68f51e549..472810c33 100644 --- a/docs/pages/event_bus.md +++ b/docs/pages/event_bus.md @@ -1,15 +1,19 @@ # Event Bus -This library uses the core principle called [event bus](https://martinfowler.com/articles/201701-event-driven.html). +Optionally you can use an event bus to dispatch events to listeners. For all events that are persisted (when the `save` method has been executed on the [repository](./repository.md)), -the event wrapped in a message will be dispatched to the `event bus`. All listeners are then called for each -message. +the event wrapped in a message will be dispatched to the `event bus`. +All listeners are then called for each message. + +!!! tip + + It is recommended to use the [subscription engine](subscription.md) to process the messages. + It is more powerful and flexible than the event bus. ## Event Bus -The event bus is responsible for dispatching the messages to the listeners. -The library also delivers a light-weight event bus for which you can register listeners and dispatch events. +The library delivers a light-weight event bus for which you can register listeners and dispatch events. ```php use Patchlevel\EventSourcing\EventBus\DefaultEventBus; @@ -134,7 +138,7 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher); ## Learn more -* [How to decorate messages](message_decorator.md) -* [How to use outbox pattern](outbox.md) -* [How to use processor](subscription.md) -* [How to use subscriptions](subscription.md) +* [How to use messages](message.md) +* [How to use the subscription engine](subscription.md) +* [How to use repositories](repository.md) +* [How to use decorate messages](message_decorator.md) diff --git a/docs/pages/message.md b/docs/pages/message.md index 7612ef28e..875430ef1 100644 --- a/docs/pages/message.md +++ b/docs/pages/message.md @@ -1,6 +1,10 @@ # Message -A `Message` contains the event and related meta information as headers. +A message is a construct that contains additional meta information for each event in the form of headers. +The messages are created in the repository as soon as an aggregate is saved. +These messages are then stored in the store and dispatched to the event bus. + +Here is a simple example without headers: ```php use Patchlevel\EventSourcing\Message\Message; @@ -8,7 +12,12 @@ use Patchlevel\EventSourcing\Message\Message; $message = Message::create(new NameChanged('foo')); ``` +!!! note + + You don't have to create the message yourself, it is automatically created, saved and dispatched in + the [repository](repository.md). +You can add a header using `withHeader`: ```php use Patchlevel\EventSourcing\Aggregate\AggregateHeader; @@ -29,10 +38,17 @@ $message = Message::create(new NameChanged('foo')) The message object is immutable. It creates a new instance with the new data. -!!! note +You can also access the headers: - You don't have to create the message yourself, it is automatically created, saved and dispatched in - the [repository](repository.md). +```php +use Patchlevel\EventSourcing\Aggregate\AggregateHeader; +use Patchlevel\EventSourcing\Message\Message; + +/** @var Message $message */ +$message->header(AggregateHeader::class); // AggregateHeader object +$message->hasHeader(AggregateHeader::class); // true +$message->headers(); // [AggregateHeader object] +``` ## Built-in headers @@ -40,51 +56,230 @@ The message object has some built-in headers which are used internally. * `AggregateHeader` - Contains the aggregate name, aggregate id, playhead and recorded on. * `ArchivedHeader` - Flag if the message is archived. -* `NewStreamStartHeader` - Flag if the message is the first message in a new stream. +* `StreamStartHeader` - Flag if the message is the first message in a new stream. ## Custom headers -As already mentioned, you can enrich the `Message` with your own meta information. This is then accessible in the -message object and is also stored in the database. +You can also add custom headers to the message object. For example, you can add an application id. +To do this, you need to create a Header class. + +```php +use Patchlevel\EventSourcing\Attribute\Header; + +#[Header('application')] +class ApplicationHeader +{ + public function __construct( + private readonly string $id, + ) { + } +} +``` + +Then you can add the header to the message object. ```php use Patchlevel\EventSourcing\Message\Message; $message = Message::create(new NameChanged('foo')) - // ... - ->withHeader('application-id', 'app'); + ->withHeader(new ApplicationHeader('app')); ``` +!!! warning + + The header needs to be serializable. The library uses the hydrator to serialize and deserialize the headers. + So you can add normalize attributes to the properties if needed. + !!! note You can read about how to pass additional headers to the message object in the [message decorator](message_decorator.md) docs. -You can also access your custom headers. For this case there is also a method to only retrieve the headers which are not -used internally. +You can also access your custom headers: + +```php +use Patchlevel\EventSourcing\Message\Message; + +/** @var Message $message */ +$message->header(ApplicationHeader::class); +``` + +## Translator + +Translator can be used to manipulate, filter or expand messages or events. +This can be used for anti-corruption layers, data migration, or to fix errors in the event stream. + +### Exclude + +With this translator you can exclude certain events. + +```php +use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator; + +$translator = new ExcludeEventTranslator([EmailChanged::class]); +``` + +### Include + +With this translator you can only allow certain events. + +```php +use Patchlevel\EventSourcing\Message\Translator\IncludeEventTranslator; + +$translator = new IncludeEventTranslator([ProfileCreated::class]); +``` + +### Filter + +If the translator `ExcludeEventTranslator` and `IncludeEventTranslator` are not sufficient, +you can also write your own filter. +This translator expects a callback that returns either true to allow events or false to not allow them. + +```php +use Patchlevel\EventSourcing\Message\Translator\FilterEventTranslator; + +$translator = new FilterEventTranslator(static function (object $event) { + if (!$event instanceof ProfileCreated) { + return true; + } + + return $event->allowNewsletter(); +}); +``` + +### Exclude Events with Header + +With this translator you can exclude event with specific header. + +```php +use Patchlevel\EventSourcing\Message\Translator\ExcludeEventWithHeaderTranslator; +use Patchlevel\EventSourcing\Store\ArchivedHeader; + +$translator = new ExcludeEventWithHeaderTranslator(ArchivedHeader::class); +``` + +### Only Events with Header + +With this translator you can only allow events with a specific header. + +```php +use Patchlevel\EventSourcing\Message\Translator\IncludeEventWithHeaderTranslator; + +$translator = new IncludeEventWithHeaderTranslator(ArchivedHeader::class); +``` + +### Replace + +If you want to replace an event, you can use the `ReplaceEventTranslator`. +The first parameter you have to define is the event class that you want to replace. +And as a second parameter a callback, that the old event awaits and a new event returns. + +```php +use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator; + +$translator = new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) { + return new NewVisited($oldVisited->profileId()); +}); +``` + +### Until + +A use case could also be that you want to look at the projection from a previous point in time. +You can use the `UntilEventTranslator` to only allow events that were `recorded` before this point in time. + +```php +use DateTimeImmutable; +use Patchlevel\EventSourcing\Message\Translator\UntilEventTranslator; + +$translator = new UntilEventTranslator(new DateTimeImmutable('2020-01-01 12:00:00')); +``` + +### Recalculate playhead + +This translator can be used to recalculate the playhead. +The playhead must always be in ascending order so that the data is valid. +Some translator can break this order and the translator `RecalculatePlayheadTranslator` can fix this problem. + +```php +use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator; + +$translator = new RecalculatePlayheadTranslator(); +``` + +!!! tip + + If you migrate your event stream, you can use the `RecalculatePlayheadTranslator` to fix the playhead. + +### Chain + +If you want to group your translator, you can use one or more `ChainTranslator`. ```php -$message->header('application-id'); // app -$message->customHeaders(); // ['application-id' => 'app'] +use Patchlevel\EventSourcing\Message\Translator\ChainTranslator; +use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator; +use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator; + +$translator = new ChainTranslator([ + new ExcludeEventTranslator([EmailChanged::class]), + new RecalculatePlayheadTranslator(), +]); ``` -If you want *all* the headers you can also retrieve them. + +### Custom Translator + +You can also write a custom translator. The translator gets a message and can return `n` messages. +There are the following possibilities: + +* Return only the message to an array to leave it unchanged. +* Put another message in the array to swap the message. +* Return an empty array to remove the message. +* Or return multiple messages to enrich the stream. + +In our case, the domain has changed a bit. +In the beginning we had a `ProfileCreated` event that just created a profile. +Now we have a `ProfileRegistered` and a `ProfileActivated` event, +which should replace the `ProfileCreated` event. ```php -$headers = $message->headers(); -/* -[ - 'aggregateName' => 'profile', - 'aggregateId' => '1', - // {...}, - 'application-id' => 'app' -] -*/ +use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Message\Translator\Translator; + +final class SplitProfileCreatedTranslator implements Translator +{ + public function __invoke(Message $message): array + { + $event = $message->event(); + + if (!$event instanceof ProfileCreated) { + return [$message]; + } + + $profileRegisteredMessage = Message::createWithHeaders( + new ProfileRegistered($event->id(), $event->name()), + $message->headers(), + ); + + $profileActivatedMessage = Message::createWithHeaders( + new ProfileActivated($event->id()), + $message->headers(), + ); + + return [$profileRegisteredMessage, $profileActivatedMessage]; + } +} ``` !!! warning - Relying on internal meta data could be dangerous as they could be changed. So be cautios if you want to implement logic on them. + Since we changed the number of messages, we have to recalculate the playhead. + +!!! tip + + You don't have to migrate the store directly for every change, + but you can also use the [upcasting](upcasting.md) feature. ## Learn more -* [How to decorate messages](message_decorator.md) -* [How to use outbox pattern](outbox.md) -* [How to use processor](subscription.md) * [How to use subscriptions](subscription.md) +* [How to use the repository](repository.md) +* [How to use the event bus](event_bus.md) +* [How to decorate messages](message_decorator.md) +* [How to use the normalizer](normalizer.md) +* [How to use the upcasting](upcasting.md) \ No newline at end of file diff --git a/docs/pages/pipeline.md b/docs/pages/pipeline.md deleted file mode 100644 index 94e8e0606..000000000 --- a/docs/pages/pipeline.md +++ /dev/null @@ -1,377 +0,0 @@ -# Pipeline - -A store is immutable, i.e. it cannot be changed afterwards. -This includes both manipulating events and deleting them. - -Instead, you can duplicate the store and manipulate the events in the process. -Thus the old store remains untouched and you can test the new store beforehand, -whether the migration worked. - -In this example the event `PrivacyAdded` is removed and the event `OldVisited` is replaced by `NewVisited`: - -```php -use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator; -use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator; -use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator; -use Patchlevel\EventSourcing\Pipeline\Pipeline; -use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; -use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget; - -$pipeline = new Pipeline( - new StoreSource($oldStore), - new StoreTarget($newStore), - [ - new ExcludeEventTranslator([PrivacyAdded::class]), - new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) { - return new NewVisited($oldVisited->profileId()); - }), - new RecalculatePlayheadTranslator(), - ], -); -``` -!!! danger - - Under no circumstances may the same store be used that is used for the source. - Otherwise the store will be broken afterwards! - -The pipeline can also be used to create or rebuild a projection: - -```php -use Patchlevel\EventSourcing\Pipeline\Pipeline; -use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; -use Patchlevel\EventSourcing\Pipeline\Target\ConsumerTarget; - -$pipeline = new Pipeline( - new StoreSource($store), - ConsumerTarget::create([$projection]), -); -``` -The principle remains the same. -There is a source where the data comes from. -A target where the data should flow. -And any number of middlewares to do something with the data beforehand. - -## Source - -The first thing you need is a source of where the data should come from. - -### Store - -The `StoreSource` is the standard source to load all events from the database. - -```php -use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; - -$source = new StoreSource($store); -``` -### In Memory - -There is an `InMemorySource` that receives the messages in an array. This source can be used to write pipeline tests. - -```php -use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Pipeline\Source\InMemorySource; - -$source = new InMemorySource([ - new Message( - Profile::class, - '1', - 1, - new ProfileCreated(Email::fromString('david.badura@patchlevel.de')), - ), - // ... -]); -``` -### Custom Source - -You can also create your own source class. It has to inherit from `Source`. -Here you can, for example, create a migration from another event sourcing system or similar system. - -```php -use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Pipeline\Source\Source; - -$source = new class implements Source { - /** @return Generator */ - public function load(): Generator - { - yield new Message( - Profile::class, - '1', - 0, - new ProfileCreated('1', ['name' => 'David']), - ); - } - - public function count(): int - { - return 1; - } -}; -``` -## Target - -After you have a source, you still need the destination of the pipeline. - -### Store - -You can use a store to save the final result. - -```php -use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget; - -$target = new StoreTarget($store); -``` -!!! danger - - Under no circumstances may the same store be used that is used for the source. - Otherwise the store will be broken afterwards! - -!!! note - - It does not matter whether the previous store was a SingleTable or a MultiTable. - You can switch back and forth between both store types using the pipeline. - -### Consumer - -A consumer can also be used as a target. - -```php -use Patchlevel\EventSourcing\Pipeline\Target\ConsumerTarget; - -$target = new ConsumerTarget($consumer); -``` -!!! tip - - You can also use it to build a new projection from scratch. - -!!! note - - More about the consumer can be found [here](event_bus.md). - -### In Memory - -There is also an in-memory variant for the target. This target can also be used for tests. -With the `messages` method you get all `Messages` that have reached the target. - -```php -use Patchlevel\EventSourcing\Pipeline\Target\InMemoryTarget; - -$target = new InMemoryTarget(); - -// run pipeline - -$messages = $target->messages(); -``` -### Custom Target - -You can also define your own target. To do this, you need to implement the `Target` interface. - -```php -use Patchlevel\EventSourcing\Message\Message; - -final class OtherStoreTarget implements Target -{ - public function __construct(private OtherStore $store) - { - } - - public function save(Message $message): void - { - $this->store->save($message); - } -} -``` -## Middlewares - -Middelwares can be used to manipulate, delete or expand messages or events during the process. - -!!! warning - - It is important to know that some middlewares require recalculation from the playhead, - if the target is a store. This is a numbering of the events that must be in ascending order. - A corresponding note is supplied with every middleware. - -### Exclude - -With this middleware you can exclude certain events. - -```php -use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator; - -$middleware = new ExcludeEventTranslator([EmailChanged::class]); -``` -!!! warning - - After this middleware, the playhead must be recalculated! - -### Include - -With this middleware you can only allow certain events. - -```php -use Patchlevel\EventSourcing\Message\Translator\IncludeEventTranslator; - -$middleware = new IncludeEventTranslator([ProfileCreated::class]); -``` -!!! warning - - After this middleware, the playhead must be recalculated! - -### Filter - -If the middlewares `ExcludeEventMiddleware` and `IncludeEventMiddleware` are not sufficient, -you can also write your own filter. -This middleware expects a callback that returns either true to allow events or false to not allow them. - -```php -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; -use Patchlevel\EventSourcing\Message\Translator\FilterEventTranslator; - -$middleware = new FilterEventTranslator(static function (AggregateChanged $event) { - if (!$event instanceof ProfileCreated) { - return true; - } - - return $event->allowNewsletter(); -}); -``` -!!! warning - - After this middleware, the playhead must be recalculated! - -### Exclude Archived Events - -With this middleware you can exclude archived events. - -```php -use Patchlevel\EventSourcing\Message\Translator\ExcludeEventWithHeaderTranslator; - -$middleware = new ExcludeEventWithHeaderTranslator(); -``` -!!! warning - - After this middleware, the playhead must be recalculated! - -### Only Archived Events - -With this middleware you can only allow archived events. - -```php -use Patchlevel\EventSourcing\Message\Translator\IncludeEventWithHeaderTranslator; - -$middleware = new IncludeEventWithHeaderTranslator(); -``` -!!! warning - - After this middleware, the playhead must be recalculated! - -### Replace - -If you want to replace an event, you can use the `ReplaceEventMiddleware`. -The first parameter you have to define is the event class that you want to replace. -And as a second parameter a callback, that the old event awaits and a new event returns. - -```php -use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator; - -$middleware = new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) { - return new NewVisited($oldVisited->profileId()); -}); -``` -!!! note - - The middleware takes over the playhead and recordedAt information. - -### Until - -A use case could also be that you want to look at the projection from a previous point in time. -You can use the `UntilEventMiddleware` to only allow events that were `recorded` before this point in time. - -```php -$middleware = new UntilEventMiddleware(new DateTimeImmutable('2020-01-01 12:00:00')); -``` -!!! warning - - After this middleware, the playhead must be recalculated! - -### Recalculate playhead - -This middleware can be used to recalculate the playhead. -The playhead must always be in ascending order so that the data is valid. -Some middleware can break this order and the middleware `RecalculatePlayheadMiddleware` can fix this problem. - -```php -use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator; - -$middleware = new RecalculatePlayheadTranslator(); -``` -!!! note - - You only need to add this middleware once at the end of the pipeline. - -### Chain - -If you want to group your middleware, you can use one or more `ChainMiddleware`. - -```php -use Patchlevel\EventSourcing\Message\Translator\ChainTranslator; -use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator; -use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator; - -$middleware = new ChainTranslator([ - new ExcludeEventTranslator([EmailChanged::class]), - new RecalculatePlayheadTranslator(), -]); -``` -### Custom middleware - -You can also write a custom middleware. The middleware gets a message and can return `N` messages. -There are the following possibilities: - -* Return only the message to an array to leave it unchanged. -* Put another message in the array to swap the message. -* Return an empty array to remove the message. -* Or return multiple messages to enrich the stream. - -In our case, the domain has changed a bit. -In the beginning we had a `ProfileCreated` event that just created a profile. -Now we have a `ProfileRegistered` and a `ProfileActivated` event, -which should replace the `ProfileCreated` event. - -```php -use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Message\Translator\Translator; - -final class SplitProfileCreatedMiddleware implements Translator -{ - public function __invoke(Message $message): array - { - $event = $message->event(); - - if (!$event instanceof ProfileCreated) { - return [$message]; - } - - $profileRegisteredMessage = Message::createWithHeaders( - new ProfileRegistered($event->id(), $event->name()), - $message->headers(), - ); - - $profileActivatedMessage = Message::createWithHeaders( - new ProfileActivated($event->id()), - $message->headers(), - ); - - return [$profileRegisteredMessage, $profileActivatedMessage]; - } -} -``` -!!! warning - - Since we changed the number of messages, we have to recalculate the playhead. - -!!! note - - You can find more about messages [here](event_bus.md). - \ No newline at end of file