Skip to content

Commit

Permalink
rewrite split stream docs & small changes in other places
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Apr 17, 2024
1 parent 770a6f1 commit 4f4bc3a
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 97 deletions.
2 changes: 1 addition & 1 deletion docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ nav:
- Repository: repository.md
- Message: message.md
- Store: store.md
- Event Bus: event_bus.md
- Subscription: subscription.md
- Event Bus: event_bus.md
- Advanced:
- Aggregate ID: aggregate_id.md
- Normalizer: normalizer.md
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/clock.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ $clock->sleep(10); // sleep 10 seconds

* [How to test with datetime](testing.md)
* [How to normalize datetime](normalizer.md)
* [How to use messages](event_bus.md)
* [How to use messages](message.md)
* [How to decorate messages](message_decorator.md)
2 changes: 1 addition & 1 deletion docs/pages/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ $eventRegistry = (new AttributeEventRegistryFactory())->create([/* paths... */])

* [How to normalize events](normalizer.md)
* [How to dispatch events](event_bus.md)
* [How to listen on events](processor.md)
* [How to listen on events](subscription.md)
* [How to store events](store.md)
* [How to split streams](split_stream.md)
* [How to upcast events](upcasting.md)
13 changes: 6 additions & 7 deletions docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,17 +295,17 @@ $eventStore = new DoctrineDbalStore(

$hotelProjector = new HotelProjector($projectionConnection);

$projectorRepository = new MetadataSubscriberAccessorRepository([
$subscriberRepository = new MetadataSubscriberAccessorRepository([
$hotelProjector,
new SendCheckInEmailProcessor($mailer),
]);

$projectionStore = new DoctrineSubscriptionStore($connection);
$subscriptionStore = new DoctrineSubscriptionStore($connection);

$engine = new DefaultSubscriptionEngine(
$eventStore,
$projectionStore,
$projectorRepository,
$subscriptionStore,
$subscriberRepository,
);

$repositoryManager = new DefaultRepositoryManager(
Expand Down Expand Up @@ -382,7 +382,7 @@ $hotels = $hotelProjection->getHotels();
```
!!! warning

You need to run the subscription engine to update the projections.
You need to run the subscription engine to update the projections and execute the processors.

!!! note

Expand All @@ -403,6 +403,5 @@ $hotels = $hotelProjection->getHotels();
* [How to create an aggregate](aggregate.md)
* [How to create an event](events.md)
* [How to store aggregates](repository.md)
* [How to process events](subscription.md)
* [How to create a projection](subscription.md)
* [How to create a projection and processors](subscription.md)
* [How to setup the database](store.md)
1 change: 1 addition & 0 deletions docs/pages/message_decorator.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ final class OnSystemRecordedDecorator implements MessageDecorator

## Learn more

* [How to create messages](message.md)
* [How to define events](events.md)
* [How to use the event bus](event_bus.md)
* [How to configure repositories](repository.md)
Expand Down
20 changes: 17 additions & 3 deletions docs/pages/repository.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Repository

A `repository` takes care of storing and loading the `aggregates`.
He is also responsible for building [messages](event_bus.md) from the events
He is also responsible for building [messages](message.md) from the events
and optionally dispatching them to the event bus.

## Create a repository
Expand Down Expand Up @@ -63,7 +63,7 @@ $repository = $repositoryManager->get(Profile::class);
!!! warning

If you use the event bus, you should be aware that the events are dispatched synchronously.
You may encounter "at least once" problems.
You may encounter [at least once](https://softwaremill.com/message-delivery-and-deduplication-strategies/) problems.

!!! note

Expand Down Expand Up @@ -138,6 +138,10 @@ $repository = $repositoryManager->get(Profile::class);

You can find out more about message decorator [here](message_decorator.md).

!!! tip

If you have multiple decorators, you can use the `ChainMessageDecorator` to chain them.

## Use the repository

Each `repository` has three methods that are responsible for loading an `aggregate`,
Expand All @@ -159,9 +163,17 @@ $profile = Profile::create($id, '[email protected]');
/** @var Repository $repository */
$repository->save($profile);
```
!!! note
!!! Warning

All events are written to the database with one transaction in order to ensure data consistency.
If an exception occurs during the save process,
the transaction is rolled back and the aggregate is not valid anymore.
You can not save the aggregate again and you need to load it again.

!!! note

Due to the nature of the aggregate having a playhead,
we have a unique constraint that ensures that no race condition happens here.

### Load an aggregate

Expand Down Expand Up @@ -254,3 +266,5 @@ class ProfileRepository
* [How to work with the store](store.md)
* [How to use snapshots](snapshots.md)
* [How to split streams](split_stream.md)
* [How to use the event bus](event_bus.md)
* [How to create messages](message.md)
26 changes: 17 additions & 9 deletions docs/pages/snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ This is not a problem if there are a few hundred.
But if the number gets bigger at some point, then loading and rebuilding can become slow.
The `snapshot` system can be used to control this.

Normally, the events are all executed again on the aggregate in order to rebuild the current state.
!!! note

In oure benchmarks we can load 10 000 events for one aggregate in 50ms.
Of course, this can vary from system to system.

Normally, the events are all applied again on the aggregate in order to rebuild the current state.
With a `snapshot`, we can shorten the way in which we temporarily save the current state of the aggregate.
When loading it is checked whether the snapshot exists.
If a hit exists, the aggregate is built up with the help of the snapshot.
If a hit exists, the aggregate is created with the help of the snapshot.
A check is then made to see whether further events have existed since the snapshot
and these are then also executed on the aggregate.
and these are then also applied on the aggregate.
Here, however, only the last events are loaded from the database and not all.

## Configuration
Expand Down Expand Up @@ -106,13 +111,14 @@ final class Profile extends BasicAggregateRoot

!!! note

You can find more about normalizer [here](normalizer.md).
The [hydrator](https://github.com/patchlevel/hydrator) is used internally and you can use all of its features.
You can find more about normalizer also [here](normalizer.md).

### Snapshot batching

Since the loading of events in itself is quite fast and only becomes noticeably slower with thousands of events,
we do not need to create a snapshot after each event. That would also have a negative impact on performance.
Instead, we can also create a snapshot after `N` events.
Instead, we can also create a snapshot after `n` events.
The remaining events that are not in the snapshot are then loaded from store.

```php
Expand All @@ -132,9 +138,10 @@ final class Profile extends BasicAggregateRoot
Whenever something changes on the aggregate, the previous snapshot must be discarded.
You can do this by removing the entire snapshot cache when deploying.
But that can be quickly forgotten. It is much easier to specify a snapshot version.
This snapshot version is also saved. When loading, the versions are compared and if they do not match,
This snapshot version is also saved in the snapshot cache.
When loading, the versions are compared and if they do not match,
the snapshot is discarded and the aggregate is rebuilt from scratch.
The new aggregate is then saved again as a snapshot.
The new snapshot is then created automatically.

```php
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
Expand All @@ -151,6 +158,7 @@ final class Profile extends BasicAggregateRoot
!!! warning

If the snapshots are discarded, a load peak can occur since the aggregates have to be rebuilt.
You should update the snapshot version only when necessary.

!!! tip

Expand All @@ -169,7 +177,7 @@ Here are a few listed:
* [laminas cache](https://docs.laminas.dev/laminas-cache/)
* [scrapbook](https://www.scrapbook.cash/)

### psr6
### psr-6

A `Psr6SnapshotAdapter`, the associated documentation can be found [here](https://www.php-fig.org/psr/psr-6/).

Expand All @@ -180,7 +188,7 @@ use Psr\Cache\CacheItemPoolInterface;
/** @var CacheItemPoolInterface $cache */
$adapter = new Psr6SnapshotAdapter($cache);
```
### psr16
### psr-16

A `Psr16SnapshotAdapter`, the associated documentation can be found [here](https://www.php-fig.org/psr/psr-16/).

Expand Down
81 changes: 61 additions & 20 deletions docs/pages/split_stream.md
Original file line number Diff line number Diff line change
@@ -1,45 +1,86 @@
# Splitting the eventstream
# Split Stream

In some cases the business has rules which implies an restart of the event stream for an aggregate since the past events
are not relevant for the current state. For example a user decides to end his active subscription and the business rules
says if the user start a new subscription all past events should not be considered anymore. Another case could be a
banking scenario. There the business decides to save the current state every quarter for each banking account.
In some cases the business has rules which implies a restart of the event stream for an aggregate
since the past events are not relevant for the current state.
A bank is often used as an example. A bank account has hundreds of transactions,
but every bank makes a balance report at the end of the year.
In this step the current account balance is persisted.
This event is perfect to split the stream and start aggregating from this point.

Not only that some businesses requires such an action it also increases the performance for aggregate which would have a
really long event stream.
Not only that some businesses requires such an action
it also increases the performance for aggregate which would have a really long event stream.

## Flagging an event to split the stream
In the background the library will mark all past events as archived
and will not load them anymore for building the aggregate.
It will only load the events from the split event and onwards.
But subscriptions will still receive all events.
So you can create projections which are based on the full event stream.

To use this feature you need to add the `SplitStreamDecorator`. You will also need events which will trigger this
action. For that you can use the `#[SplitStream]` attribute. We decided that we are not literallty splitting the stream,
instead we are marking all past events as archived as soon as this event is saved. Then the past events will not be
loaded anymore for building the aggregate. This means that all needed data has to be present in these events which
should trigger the event split.
!!! tip

To split the stream is often a better solution than using snapshots.

## Configuration

To use this feature you need to add the `SplitStreamDecorator` in the repository manager.

```php
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Repository\MessageDecorator\SplitStreamDecorator;
use Patchlevel\EventSourcing\Store\Store;

/**
* @var AggregateRootRegistry $aggregateRootRegistry
* @var Store $store
* @var EventMetadataFactory $eventMetadataFactory
*/
$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
null,
null,
new SplitStreamDecorator($eventMetadataFactory),
);
```
!!! note

You can find out more about decorator [here](./message_decorator.md).

!!! tip

You can use multiple decorators with the `ChainMessageDecorator`.

## Usage

To use this feature you need to mark the event which should split the stream.
For that you can use the `#[SplitStream]` attribute.

```php
use Patchlevel\EventSourcing\Attribute\Event;
use Patchlevel\EventSourcing\Attribute\SplitStream;
use Patchlevel\EventSourcing\Serializer\Normalizer\IdNormalizer;

#[Event('bank_account.month_passed')]
#[Event('bank_account.balance_reported')]
#[SplitStream]
final class MonthPassed
final class BalanceReported
{
public function __construct(
#[IdNormalizer]
public AccountId $accountId,
public string $name,
public BankAccountId $bankAccountId,
public int $year,
public int $balanceInCents,
) {
}
}
```
!!! warning

The event needs all data which is relevant the aggregate to be used since all past event will not be loaded! Keep
this in mind if you want to use this feature.
The event needs all data which is relevant the aggregate to be used since all past event will not be loaded!
Keep this in mind if you want to use this feature.

!!! note

This archive flag only impacts the Store::load method which is used the build the aggregate from the stream.
This impacts only the aggregate loaded by the repository. Subscriptions will still receive all events.

3 changes: 2 additions & 1 deletion docs/pages/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ $store->transactional(static function () use ($command, $bankAccountRepository):

* [How to create events](events.md)
* [How to use repositories](repository.md)
* [How to dispatch events](event_bus.md)
* [How to create message](message.md)
* [How to create projections](subscription.md)
* [How to upcast events](upcasting.md)
* [How configure cli commands](cli.md)
3 changes: 1 addition & 2 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,5 @@ foreach ($subscriptions as $subscription) {
## Learn more

* [How to use CLI commands](./cli.md)
* [How to use Pipeline](./pipeline.md)
* [How to use Event Bus](./event_bus.md)
* [How to create Messages](./message.md)
* [How to Test](./testing.md)
53 changes: 1 addition & 52 deletions docs/pages/upcasting.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,55 +84,4 @@ $serializer = DefaultEventSerializer::createFromPaths(
['src/Domain'],
$upcaster,
);
```
## Update event stream

But what if we need it also in our stream because some other applications has also access on it? Or want to cleanup our
Upcasters since we have collected alot of them over the time? Then we can use our pipeline feature without any
middlewares to achive a complete rebuild of our stream with adjusted event data.

```php
use Patchlevel\EventSourcing\Pipeline\Pipeline;
use Patchlevel\EventSourcing\Pipeline\Source\StoreSource;
use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget;
use Patchlevel\EventSourcing\Store\Store;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
name: 'event-stream:cleanup',
description: 'rebuild event stream',
)]
final class EventStreamCleanupCommand extends Command
{
public function __construct(
private readonly Store $sourceStore,
private readonly Store $targetStore,
) {
parent::__construct();
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$pipeline = new Pipeline(
new StoreSource($this->sourceStore),
new StoreTarget($this->targetStore),
);

$pipeline->run();

return Command::SUCCESS;
}
}
```
!!! danger

Under no circumstances may the same store be used that is used for the source.
Otherwise the store will be broken afterwards!

!!! note

You can find out more about the pipeline [here](pipeline.md).

```

0 comments on commit 4f4bc3a

Please sign in to comment.