Skip to content

Commit

Permalink
update message / event bus docs
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Apr 3, 2024
1 parent c788813 commit 3abd233
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 417 deletions.
1 change: 0 additions & 1 deletion docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ nav:
- Personal Data: personal_data.md
- Upcasting: upcasting.md
- Outbox: outbox.md
- Pipeline: pipeline.md
- Message Decorator: message_decorator.md
- Split Stream: split_stream.md
- Time / Clock: clock.md
Expand Down
22 changes: 13 additions & 9 deletions docs/pages/event_bus.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# Event Bus

This library uses the core principle called [event bus](https://martinfowler.com/articles/201701-event-driven.html).
Optionally you can use an event bus to dispatch events to listeners.

For all events that are persisted (when the `save` method has been executed on the [repository](./repository.md)),
the event wrapped in a message will be dispatched to the `event bus`. All listeners are then called for each
message.
the event wrapped in a message will be dispatched to the `event bus`.
All listeners are then called for each message.

!!! tip

It is recommended to use the [subscription engine](subscription.md) to process the messages.
It is more powerful and flexible than the event bus.

## Event Bus

The event bus is responsible for dispatching the messages to the listeners.
The library also delivers a light-weight event bus for which you can register listeners and dispatch events.
The library delivers a light-weight event bus for which you can register listeners and dispatch events.

```php
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
Expand Down Expand Up @@ -134,7 +138,7 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher);

## Learn more

* [How to decorate messages](message_decorator.md)
* [How to use outbox pattern](outbox.md)
* [How to use processor](subscription.md)
* [How to use subscriptions](subscription.md)
* [How to use messages](message.md)
* [How to use the subscription engine](subscription.md)
* [How to use repositories](repository.md)
* [How to use decorate messages](message_decorator.md)
240 changes: 210 additions & 30 deletions docs/pages/message.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
# Message

A `Message` contains the event and related meta information as headers.
A message is a construct that contains additional meta information for each event in the form of headers.
The messages are created in the repository as soon as an aggregate is saved.
These messages are then stored in the store and dispatched to the event bus.

Here is a simple example without headers:

```php
use Patchlevel\EventSourcing\Message\Message;

$message = Message::create(new NameChanged('foo'));
```
!!! note


You don't have to create the message yourself, it is automatically created, saved and dispatched in
the [repository](repository.md).

You can add a header using `withHeader`:

```php
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
Expand All @@ -24,67 +32,239 @@ $message = Message::create(new NameChanged('foo'))
recordedOn: $clock->now(),
));
```

!!! note

The message object is immutable. It creates a new instance with the new data.

You can also access the headers:

!!! note

You don't have to create the message yourself, it is automatically created, saved and dispatched in
the [repository](repository.md).
```php
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\Message;

/** @var Message $message */
$message->header(AggregateHeader::class); // AggregateHeader object
$message->hasHeader(AggregateHeader::class); // true
$message->headers(); // [AggregateHeader object]
```
## Built-in headers

The message object has some built-in headers which are used internally.

* `AggregateHeader` - Contains the aggregate name, aggregate id, playhead and recorded on.
* `ArchivedHeader` - Flag if the message is archived.
* `NewStreamStartHeader` - Flag if the message is the first message in a new stream.
* `StreamStartHeader` - Flag if the message is the first message in a new stream.

## Custom headers

As already mentioned, you can enrich the `Message` with your own meta information. This is then accessible in the
message object and is also stored in the database.
You can also add custom headers to the message object. For example, you can add an application id.
To do this, you need to create a Header class.

```php
use Patchlevel\EventSourcing\Attribute\Header;

#[Header('application')]
class ApplicationHeader
{
public function __construct(
private readonly string $id,
) {
}
}
```
Then you can add the header to the message object.

```php
use Patchlevel\EventSourcing\Message\Message;

$message = Message::create(new NameChanged('foo'))
// ...
->withHeader('application-id', 'app');
->withHeader(new ApplicationHeader('app'));
```
!!! warning

The header needs to be serializable. The library uses the hydrator to serialize and deserialize the headers.
So you can add normalize attributes to the properties if needed.

!!! note

You can read about how to pass additional headers to the message object in the [message decorator](message_decorator.md) docs.

You can also access your custom headers. For this case there is also a method to only retrieve the headers which are not
used internally.
You can also access your custom headers:

```php
use Patchlevel\EventSourcing\Message\Message;

/** @var Message $message */
$message->header(ApplicationHeader::class);
```
## Translator

Translator can be used to manipulate, filter or expand messages or events.
This can be used for anti-corruption layers, data migration, or to fix errors in the event stream.

### Exclude

With this translator you can exclude certain events.

```php
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;

$translator = new ExcludeEventTranslator([EmailChanged::class]);
```
### Include

With this translator you can only allow certain events.

```php
use Patchlevel\EventSourcing\Message\Translator\IncludeEventTranslator;

$translator = new IncludeEventTranslator([ProfileCreated::class]);
```
### Filter

If the translator `ExcludeEventTranslator` and `IncludeEventTranslator` are not sufficient,
you can also write your own filter.
This translator expects a callback that returns either true to allow events or false to not allow them.

```php
use Patchlevel\EventSourcing\Message\Translator\FilterEventTranslator;

$translator = new FilterEventTranslator(static function (object $event) {
if (!$event instanceof ProfileCreated) {
return true;
}

return $event->allowNewsletter();
});
```
### Exclude Events with Header

With this translator you can exclude event with specific header.

```php
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventWithHeaderTranslator;
use Patchlevel\EventSourcing\Store\ArchivedHeader;

$translator = new ExcludeEventWithHeaderTranslator(ArchivedHeader::class);
```
### Only Events with Header

With this translator you can only allow events with a specific header.

```php
use Patchlevel\EventSourcing\Message\Translator\IncludeEventWithHeaderTranslator;

$translator = new IncludeEventWithHeaderTranslator(ArchivedHeader::class);
```
### Replace

If you want to replace an event, you can use the `ReplaceEventTranslator`.
The first parameter you have to define is the event class that you want to replace.
And as a second parameter a callback, that the old event awaits and a new event returns.

```php
use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator;

$translator = new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) {
return new NewVisited($oldVisited->profileId());
});
```
### Until

A use case could also be that you want to look at the projection from a previous point in time.
You can use the `UntilEventTranslator` to only allow events that were `recorded` before this point in time.

```php
use Patchlevel\EventSourcing\Message\Translator\UntilEventTranslator;

$translator = new UntilEventTranslator(new DateTimeImmutable('2020-01-01 12:00:00'));
```
### Recalculate playhead

This translator can be used to recalculate the playhead.
The playhead must always be in ascending order so that the data is valid.
Some translator can break this order and the translator `RecalculatePlayheadTranslator` can fix this problem.

```php
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$translator = new RecalculatePlayheadTranslator();
```
!!! tip

If you migrate your event stream, you can use the `RecalculatePlayheadTranslator` to fix the playhead.

### Chain

If you want to group your translator, you can use one or more `ChainTranslator`.

```php
$message->header('application-id'); // app
$message->customHeaders(); // ['application-id' => 'app']
use Patchlevel\EventSourcing\Message\Translator\ChainTranslator;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$translator = new ChainTranslator([
new ExcludeEventTranslator([EmailChanged::class]),
new RecalculatePlayheadTranslator(),
]);
```
If you want *all* the headers you can also retrieve them.
### Custom Translator

You can also write a custom translator. The translator gets a message and can return `n` messages.
There are the following possibilities:

* Return only the message to an array to leave it unchanged.
* Put another message in the array to swap the message.
* Return an empty array to remove the message.
* Or return multiple messages to enrich the stream.

In our case, the domain has changed a bit.
In the beginning we had a `ProfileCreated` event that just created a profile.
Now we have a `ProfileRegistered` and a `ProfileActivated` event,
which should replace the `ProfileCreated` event.

```php
$headers = $message->headers();
/*
[
'aggregateName' => 'profile',
'aggregateId' => '1',
// {...},
'application-id' => 'app'
]
*/
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Message\Translator\Translator;

final class SplitProfileCreatedTranslator implements Translator
{
public function __invoke(Message $message): array
{
$event = $message->event();

if (!$event instanceof ProfileCreated) {
return [$message];
}

$profileRegisteredMessage = Message::createWithHeaders(
new ProfileRegistered($event->id(), $event->name()),
$message->headers(),
);

$profileActivatedMessage = Message::createWithHeaders(
new ProfileActivated($event->id()),
$message->headers(),
);

return [$profileRegisteredMessage, $profileActivatedMessage];
}
}
```
!!! warning

Relying on internal meta data could be dangerous as they could be changed. So be cautios if you want to implement logic on them.
Since we changed the number of messages, we have to recalculate the playhead.

!!! tip

You don't have to migrate the store directly for every change,
but you can also use the [upcasting](upcasting.md) feature.

## Learn more

* [How to decorate messages](message_decorator.md)
* [How to use outbox pattern](outbox.md)
* [How to use processor](subscription.md)
* [How to use subscriptions](subscription.md)
* [How to use the repository](repository.md)
* [How to use the event bus](event_bus.md)
* [How to decorate messages](message_decorator.md)
* [How to use the normalizer](normalizer.md)
* [How to use the upcasting](upcasting.md)
Loading

0 comments on commit 3abd233

Please sign in to comment.