From 4f4bc3a70654e72182ac4c01843f03c48664aee3 Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 17 Apr 2024 11:53:33 +0200 Subject: [PATCH] rewrite split stream docs & small changes in other places --- docs/mkdocs.yml | 2 +- docs/pages/clock.md | 2 +- docs/pages/events.md | 2 +- docs/pages/getting_started.md | 13 +++--- docs/pages/message_decorator.md | 1 + docs/pages/repository.md | 20 ++++++-- docs/pages/snapshots.md | 26 +++++++---- docs/pages/split_stream.md | 81 +++++++++++++++++++++++++-------- docs/pages/store.md | 3 +- docs/pages/subscription.md | 3 +- docs/pages/upcasting.md | 53 +-------------------- 11 files changed, 109 insertions(+), 97 deletions(-) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index a427d9f7a..3de7c1bc0 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -83,8 +83,8 @@ nav: - Repository: repository.md - Message: message.md - Store: store.md - - Event Bus: event_bus.md - Subscription: subscription.md + - Event Bus: event_bus.md - Advanced: - Aggregate ID: aggregate_id.md - Normalizer: normalizer.md diff --git a/docs/pages/clock.md b/docs/pages/clock.md index 65c6992bd..85e5de7d5 100644 --- a/docs/pages/clock.md +++ b/docs/pages/clock.md @@ -69,5 +69,5 @@ $clock->sleep(10); // sleep 10 seconds * [How to test with datetime](testing.md) * [How to normalize datetime](normalizer.md) -* [How to use messages](event_bus.md) +* [How to use messages](message.md) * [How to decorate messages](message_decorator.md) diff --git a/docs/pages/events.md b/docs/pages/events.md index 3a995aabe..2d7ad27b8 100644 --- a/docs/pages/events.md +++ b/docs/pages/events.md @@ -114,7 +114,7 @@ $eventRegistry = (new AttributeEventRegistryFactory())->create([/* paths... */]) * [How to normalize events](normalizer.md) * [How to dispatch events](event_bus.md) -* [How to listen on events](processor.md) +* [How to listen on events](subscription.md) * [How to store events](store.md) * [How to split streams](split_stream.md) * [How to upcast events](upcasting.md) diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index ff492aea5..c40060684 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -295,17 +295,17 @@ $eventStore = new DoctrineDbalStore( $hotelProjector = new HotelProjector($projectionConnection); -$projectorRepository = new MetadataSubscriberAccessorRepository([ +$subscriberRepository = new MetadataSubscriberAccessorRepository([ $hotelProjector, new SendCheckInEmailProcessor($mailer), ]); -$projectionStore = new DoctrineSubscriptionStore($connection); +$subscriptionStore = new DoctrineSubscriptionStore($connection); $engine = new DefaultSubscriptionEngine( $eventStore, - $projectionStore, - $projectorRepository, + $subscriptionStore, + $subscriberRepository, ); $repositoryManager = new DefaultRepositoryManager( @@ -382,7 +382,7 @@ $hotels = $hotelProjection->getHotels(); ``` !!! warning - You need to run the subscription engine to update the projections. + You need to run the subscription engine to update the projections and execute the processors. !!! note @@ -403,6 +403,5 @@ $hotels = $hotelProjection->getHotels(); * [How to create an aggregate](aggregate.md) * [How to create an event](events.md) * [How to store aggregates](repository.md) -* [How to process events](subscription.md) -* [How to create a projection](subscription.md) +* [How to create a projection and processors](subscription.md) * [How to setup the database](store.md) diff --git a/docs/pages/message_decorator.md b/docs/pages/message_decorator.md index 58c0a9d29..4d0e4a103 100644 --- a/docs/pages/message_decorator.md +++ b/docs/pages/message_decorator.md @@ -107,6 +107,7 @@ final class OnSystemRecordedDecorator implements MessageDecorator ## Learn more +* [How to create messages](message.md) * [How to define events](events.md) * [How to use the event bus](event_bus.md) * [How to configure repositories](repository.md) diff --git a/docs/pages/repository.md b/docs/pages/repository.md index 795b8d176..beceec9e2 100644 --- a/docs/pages/repository.md +++ b/docs/pages/repository.md @@ -1,7 +1,7 @@ # Repository A `repository` takes care of storing and loading the `aggregates`. -He is also responsible for building [messages](event_bus.md) from the events +He is also responsible for building [messages](message.md) from the events and optionally dispatching them to the event bus. ## Create a repository @@ -63,7 +63,7 @@ $repository = $repositoryManager->get(Profile::class); !!! warning If you use the event bus, you should be aware that the events are dispatched synchronously. - You may encounter "at least once" problems. + You may encounter [at least once](https://softwaremill.com/message-delivery-and-deduplication-strategies/) problems. !!! note @@ -138,6 +138,10 @@ $repository = $repositoryManager->get(Profile::class); You can find out more about message decorator [here](message_decorator.md). +!!! tip + + If you have multiple decorators, you can use the `ChainMessageDecorator` to chain them. + ## Use the repository Each `repository` has three methods that are responsible for loading an `aggregate`, @@ -159,9 +163,17 @@ $profile = Profile::create($id, 'david.badura@patchlevel.de'); /** @var Repository $repository */ $repository->save($profile); ``` -!!! note +!!! Warning All events are written to the database with one transaction in order to ensure data consistency. + If an exception occurs during the save process, + the transaction is rolled back and the aggregate is not valid anymore. + You can not save the aggregate again and you need to load it again. + +!!! note + + Due to the nature of the aggregate having a playhead, + we have a unique constraint that ensures that no race condition happens here. ### Load an aggregate @@ -254,3 +266,5 @@ class ProfileRepository * [How to work with the store](store.md) * [How to use snapshots](snapshots.md) * [How to split streams](split_stream.md) +* [How to use the event bus](event_bus.md) +* [How to create messages](message.md) diff --git a/docs/pages/snapshots.md b/docs/pages/snapshots.md index ad5853432..a6c37a315 100644 --- a/docs/pages/snapshots.md +++ b/docs/pages/snapshots.md @@ -5,12 +5,17 @@ This is not a problem if there are a few hundred. But if the number gets bigger at some point, then loading and rebuilding can become slow. The `snapshot` system can be used to control this. -Normally, the events are all executed again on the aggregate in order to rebuild the current state. +!!! note + + In oure benchmarks we can load 10 000 events for one aggregate in 50ms. + Of course, this can vary from system to system. + +Normally, the events are all applied again on the aggregate in order to rebuild the current state. With a `snapshot`, we can shorten the way in which we temporarily save the current state of the aggregate. When loading it is checked whether the snapshot exists. -If a hit exists, the aggregate is built up with the help of the snapshot. +If a hit exists, the aggregate is created with the help of the snapshot. A check is then made to see whether further events have existed since the snapshot -and these are then also executed on the aggregate. +and these are then also applied on the aggregate. Here, however, only the last events are loaded from the database and not all. ## Configuration @@ -106,13 +111,14 @@ final class Profile extends BasicAggregateRoot !!! note - You can find more about normalizer [here](normalizer.md). + The [hydrator](https://github.com/patchlevel/hydrator) is used internally and you can use all of its features. + You can find more about normalizer also [here](normalizer.md). ### Snapshot batching Since the loading of events in itself is quite fast and only becomes noticeably slower with thousands of events, we do not need to create a snapshot after each event. That would also have a negative impact on performance. -Instead, we can also create a snapshot after `N` events. +Instead, we can also create a snapshot after `n` events. The remaining events that are not in the snapshot are then loaded from store. ```php @@ -132,9 +138,10 @@ final class Profile extends BasicAggregateRoot Whenever something changes on the aggregate, the previous snapshot must be discarded. You can do this by removing the entire snapshot cache when deploying. But that can be quickly forgotten. It is much easier to specify a snapshot version. -This snapshot version is also saved. When loading, the versions are compared and if they do not match, +This snapshot version is also saved in the snapshot cache. +When loading, the versions are compared and if they do not match, the snapshot is discarded and the aggregate is rebuilt from scratch. -The new aggregate is then saved again as a snapshot. +The new snapshot is then created automatically. ```php use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; @@ -151,6 +158,7 @@ final class Profile extends BasicAggregateRoot !!! warning If the snapshots are discarded, a load peak can occur since the aggregates have to be rebuilt. + You should update the snapshot version only when necessary. !!! tip @@ -169,7 +177,7 @@ Here are a few listed: * [laminas cache](https://docs.laminas.dev/laminas-cache/) * [scrapbook](https://www.scrapbook.cash/) -### psr6 +### psr-6 A `Psr6SnapshotAdapter`, the associated documentation can be found [here](https://www.php-fig.org/psr/psr-6/). @@ -180,7 +188,7 @@ use Psr\Cache\CacheItemPoolInterface; /** @var CacheItemPoolInterface $cache */ $adapter = new Psr6SnapshotAdapter($cache); ``` -### psr16 +### psr-16 A `Psr16SnapshotAdapter`, the associated documentation can be found [here](https://www.php-fig.org/psr/psr-16/). diff --git a/docs/pages/split_stream.md b/docs/pages/split_stream.md index 6ec833a90..91c35cbdf 100644 --- a/docs/pages/split_stream.md +++ b/docs/pages/split_stream.md @@ -1,34 +1,75 @@ -# Splitting the eventstream +# Split Stream -In some cases the business has rules which implies an restart of the event stream for an aggregate since the past events -are not relevant for the current state. For example a user decides to end his active subscription and the business rules -says if the user start a new subscription all past events should not be considered anymore. Another case could be a -banking scenario. There the business decides to save the current state every quarter for each banking account. +In some cases the business has rules which implies a restart of the event stream for an aggregate +since the past events are not relevant for the current state. +A bank is often used as an example. A bank account has hundreds of transactions, +but every bank makes a balance report at the end of the year. +In this step the current account balance is persisted. +This event is perfect to split the stream and start aggregating from this point. -Not only that some businesses requires such an action it also increases the performance for aggregate which would have a -really long event stream. +Not only that some businesses requires such an action +it also increases the performance for aggregate which would have a really long event stream. -## Flagging an event to split the stream +In the background the library will mark all past events as archived +and will not load them anymore for building the aggregate. +It will only load the events from the split event and onwards. +But subscriptions will still receive all events. +So you can create projections which are based on the full event stream. -To use this feature you need to add the `SplitStreamDecorator`. You will also need events which will trigger this -action. For that you can use the `#[SplitStream]` attribute. We decided that we are not literallty splitting the stream, -instead we are marking all past events as archived as soon as this event is saved. Then the past events will not be -loaded anymore for building the aggregate. This means that all needed data has to be present in these events which -should trigger the event split. +!!! tip + + To split the stream is often a better solution than using snapshots. + +## Configuration + +To use this feature you need to add the `SplitStreamDecorator` in the repository manager. + +```php +use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; +use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory; +use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; +use Patchlevel\EventSourcing\Repository\MessageDecorator\SplitStreamDecorator; +use Patchlevel\EventSourcing\Store\Store; + +/** + * @var AggregateRootRegistry $aggregateRootRegistry + * @var Store $store + * @var EventMetadataFactory $eventMetadataFactory + */ +$repositoryManager = new DefaultRepositoryManager( + $aggregateRootRegistry, + $store, + null, + null, + new SplitStreamDecorator($eventMetadataFactory), +); +``` +!!! note + + You can find out more about decorator [here](./message_decorator.md). + +!!! tip + + You can use multiple decorators with the `ChainMessageDecorator`. + +## Usage + +To use this feature you need to mark the event which should split the stream. +For that you can use the `#[SplitStream]` attribute. ```php use Patchlevel\EventSourcing\Attribute\Event; use Patchlevel\EventSourcing\Attribute\SplitStream; use Patchlevel\EventSourcing\Serializer\Normalizer\IdNormalizer; -#[Event('bank_account.month_passed')] +#[Event('bank_account.balance_reported')] #[SplitStream] -final class MonthPassed +final class BalanceReported { public function __construct( #[IdNormalizer] - public AccountId $accountId, - public string $name, + public BankAccountId $bankAccountId, + public int $year, public int $balanceInCents, ) { } @@ -36,10 +77,10 @@ final class MonthPassed ``` !!! warning - The event needs all data which is relevant the aggregate to be used since all past event will not be loaded! Keep - this in mind if you want to use this feature. + The event needs all data which is relevant the aggregate to be used since all past event will not be loaded! + Keep this in mind if you want to use this feature. !!! note - This archive flag only impacts the Store::load method which is used the build the aggregate from the stream. + This impacts only the aggregate loaded by the repository. Subscriptions will still receive all events. \ No newline at end of file diff --git a/docs/pages/store.md b/docs/pages/store.md index 6004e8908..b6cfd9f68 100644 --- a/docs/pages/store.md +++ b/docs/pages/store.md @@ -348,6 +348,7 @@ $store->transactional(static function () use ($command, $bankAccountRepository): * [How to create events](events.md) * [How to use repositories](repository.md) -* [How to dispatch events](event_bus.md) +* [How to create message](message.md) +* [How to create projections](subscription.md) * [How to upcast events](upcasting.md) * [How configure cli commands](cli.md) diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index 9e57c9ba3..0f64fb034 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -803,6 +803,5 @@ foreach ($subscriptions as $subscription) { ## Learn more * [How to use CLI commands](./cli.md) -* [How to use Pipeline](./pipeline.md) -* [How to use Event Bus](./event_bus.md) +* [How to create Messages](./message.md) * [How to Test](./testing.md) diff --git a/docs/pages/upcasting.md b/docs/pages/upcasting.md index edd55401c..7bd9237e4 100644 --- a/docs/pages/upcasting.md +++ b/docs/pages/upcasting.md @@ -84,55 +84,4 @@ $serializer = DefaultEventSerializer::createFromPaths( ['src/Domain'], $upcaster, ); -``` -## Update event stream - -But what if we need it also in our stream because some other applications has also access on it? Or want to cleanup our -Upcasters since we have collected alot of them over the time? Then we can use our pipeline feature without any -middlewares to achive a complete rebuild of our stream with adjusted event data. - -```php -use Patchlevel\EventSourcing\Pipeline\Pipeline; -use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; -use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget; -use Patchlevel\EventSourcing\Store\Store; -use Symfony\Component\Console\Attribute\AsCommand; -use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Output\OutputInterface; - -#[AsCommand( - name: 'event-stream:cleanup', - description: 'rebuild event stream', -)] -final class EventStreamCleanupCommand extends Command -{ - public function __construct( - private readonly Store $sourceStore, - private readonly Store $targetStore, - ) { - parent::__construct(); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - $pipeline = new Pipeline( - new StoreSource($this->sourceStore), - new StoreTarget($this->targetStore), - ); - - $pipeline->run(); - - return Command::SUCCESS; - } -} -``` -!!! danger - - Under no circumstances may the same store be used that is used for the source. - Otherwise the store will be broken afterwards! - -!!! note - - You can find out more about the pipeline [here](pipeline.md). - \ No newline at end of file +``` \ No newline at end of file