diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index 9318678d9..15d3f0fbe 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -278,7 +278,6 @@ After we have defined everything, we still have to plug the whole thing together ```php use Doctrine\DBAL\DriverManager; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Projection\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Projection\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Projection\Store\DoctrineSubscriptionStore; @@ -320,12 +319,9 @@ $projectionist = new DefaultSubscriptionEngine( $projectorRepository, ); -$eventBus = DefaultEventBus::create(); - $repositoryManager = new DefaultRepositoryManager( $aggregateRegistry, $eventStore, - $eventBus, ); $hotelRepository = $repositoryManager->get(Hotel::class); diff --git a/docs/pages/repository.md b/docs/pages/repository.md index dd3adf485..0fee341f8 100644 --- a/docs/pages/repository.md +++ b/docs/pages/repository.md @@ -1,10 +1,8 @@ # Repository A `repository` takes care of storing and loading the `aggregates`. -He is also responsible for building [messages](event_bus.md) from the events and then dispatching them to the event bus. - -Every aggregate needs a repository to be stored. -And each repository is only responsible for one aggregate. +He is also responsible for building [messages](event_bus.md) from the events +and optionally dispatching them to the event bus. ## Create a repository @@ -13,8 +11,7 @@ This helps to build the repository correctly. The `DefaultRepositoryManager` needs some services to work. For one, it needs [AggregateRootRegistry](aggregate.md#aggregate-root-registry) so that it knows which aggregates exist. -The [store](store.md), which is then given to the repository so that it can save and load the events at the end. -And the [EventBus](event_bus.md) to publish the new events. +And the [store](store.md), which is then given to the repository so that it can save and load the events at the end. After plugging the `DefaultRepositoryManager` together, you can create the repository associated with the aggregate. @@ -24,7 +21,6 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; $repositoryManager = new DefaultRepositoryManager( $aggregateRootRegistry, $store, - $eventBus ); $repository = $repositoryManager->get(Profile::class); @@ -34,6 +30,41 @@ $repository = $repositoryManager->get(Profile::class); The same repository instance is always returned for a specific aggregate. +### Event Bus + +You can pass an event bus to the `DefaultRepositoryManager` to dispatch events synchronously. +This is useful if you want to react to events in the same transaction. + +```php +use Patchlevel\EventSourcing\EventBus\DefaultEventBus; +use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; + +$eventBus = DefaultEventBus::create([/* listeners */]); + +$repositoryManager = new DefaultRepositoryManager( + $aggregateRootRegistry, + $store, + $eventBus, +); + +$repository = $repositoryManager->get(Profile::class); +``` + +!!! warning + + If you use the event bus, you should be aware that the events are dispatched synchronously. + You may encounter "at least once" problems. + +!!! note + + You can find out more about event bus [here](event_bus.md). + +!!! tip + + In most cases it is better to react to events asynchronously, + that's why we recommend the subscription engine. + More information can be found [here](subscription.md). + ### Snapshots Loading events for an aggregate is superfast. @@ -54,7 +85,7 @@ $snapshotStore = new DefaultSnapshotStore([ $repositoryManager = new DefaultRepositoryManager( $aggregateRootRegistry, $store, - $eventBus, + null, $snapshotStore ); @@ -77,7 +108,7 @@ $decorator = new ApplicationIdDecorator(); $repositoryManager = new DefaultRepositoryManager( $aggregateRootRegistry, $store, - $eventBus, + null, null, $decorator ); @@ -98,9 +129,7 @@ saving it or checking whether it exists. An `aggregate` can be `saved`. All new events that have not yet been written to the database are fetched from the aggregate. -These events are then also append to the database. -After the events have been written, -the new events are dispatched on the [event bus](./event_bus.md). +These events are then also append to the database. ```php use Patchlevel\EventSourcing\Aggregate\Uuid; @@ -115,11 +144,6 @@ $repository->save($profile); All events are written to the database with one transaction in order to ensure data consistency. -!!! tip - - If you want to make sure that dispatching events and storing events is transaction safe, - then you should look at the [outbox](outbox.md) pattern. - ### Load an aggregate An `aggregate` can be loaded using the `load` method. diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index a98d1f89e..75019e6c9 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -49,8 +49,8 @@ final class DefaultRepository implements Repository /** @param AggregateRootMetadata $metadata */ public function __construct( private Store $store, - private EventBus $eventBus, private readonly AggregateRootMetadata $metadata, + private EventBus|null $eventBus = null, private SnapshotStore|null $snapshotStore = null, private MessageDecorator|null $messageDecorator = null, ClockInterface|null $clock = null, @@ -267,7 +267,7 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $ } $this->archive(...$messages); - $this->eventBus->dispatch(...$messages); + $this->eventBus?->dispatch(...$messages); }); $this->aggregateIsValid[$aggregate] = true; diff --git a/src/Repository/DefaultRepositoryManager.php b/src/Repository/DefaultRepositoryManager.php index afabf5daa..cdca65745 100644 --- a/src/Repository/DefaultRepositoryManager.php +++ b/src/Repository/DefaultRepositoryManager.php @@ -32,7 +32,7 @@ final class DefaultRepositoryManager implements RepositoryManager public function __construct( private AggregateRootRegistry $aggregateRootRegistry, private Store $store, - private EventBus $eventBus, + private EventBus|null $eventBus = null, private SnapshotStore|null $snapshotStore = null, private MessageDecorator|null $messageDecorator = null, ClockInterface|null $clock = null, @@ -66,8 +66,8 @@ public function get(string $aggregateClass): Repository return $this->instances[$aggregateClass] = new DefaultRepository( $this->store, - $this->eventBus, $this->metadataFactory->metadata($aggregateClass), + $this->eventBus, $this->snapshotStore, $this->messageDecorator, $this->clock, diff --git a/tests/Benchmark/SimpleSetupBench.php b/tests/Benchmark/SimpleSetupBench.php index e6a13c7f5..52a4f20a5 100644 --- a/tests/Benchmark/SimpleSetupBench.php +++ b/tests/Benchmark/SimpleSetupBench.php @@ -5,8 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; -use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; @@ -23,7 +21,6 @@ final class SimpleSetupBench { private Store $store; - private EventBus $bus; private Repository $repository; private AggregateRootId $id; @@ -32,8 +29,6 @@ public function setUp(): void { $connection = DbalManager::createConnection(); - $this->bus = DefaultEventBus::create(); - $this->store = new DoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), @@ -44,7 +39,7 @@ public function setUp(): void 'eventstore', ); - $this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata()); + $this->repository = new DefaultRepository($this->store, Profile::metadata()); $schemaDirector = new DoctrineSchemaDirector( $connection, diff --git a/tests/Benchmark/SnapshotsBench.php b/tests/Benchmark/SnapshotsBench.php index a8c898b31..755876037 100644 --- a/tests/Benchmark/SnapshotsBench.php +++ b/tests/Benchmark/SnapshotsBench.php @@ -5,8 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; -use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; @@ -26,7 +24,6 @@ final class SnapshotsBench { private Store $store; - private EventBus $bus; private SnapshotStore $snapshotStore; private Repository $repository; @@ -38,8 +35,6 @@ public function setUp(): void { $connection = DbalManager::createConnection(); - $this->bus = DefaultEventBus::create(); - $this->store = new DoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), @@ -54,7 +49,7 @@ public function setUp(): void $this->snapshotStore = new DefaultSnapshotStore(['default' => $this->adapter]); - $this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata(), $this->snapshotStore); + $this->repository = new DefaultRepository($this->store, Profile::metadata(), null, $this->snapshotStore); $schemaDirector = new DoctrineSchemaDirector( $connection, diff --git a/tests/Benchmark/SplitStreamBench.php b/tests/Benchmark/SplitStreamBench.php index c0b1ced5f..7bb3737b5 100644 --- a/tests/Benchmark/SplitStreamBench.php +++ b/tests/Benchmark/SplitStreamBench.php @@ -5,8 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; -use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepository; @@ -27,7 +25,6 @@ final class SplitStreamBench { private Store $store; - private EventBus $bus; private Repository $repository; private AggregateRootId $id; @@ -36,8 +33,6 @@ public function setUp(): void { $connection = DbalManager::createConnection(); - $this->bus = DefaultEventBus::create(); - $this->store = new DoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), @@ -50,9 +45,9 @@ public function setUp(): void $this->repository = new DefaultRepository( $this->store, - $this->bus, Profile::metadata(), null, + null, new SplitStreamDecorator( new AttributeEventMetadataFactory(), ), diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index c30a0567d..c915b2581 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -5,8 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; -use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; @@ -30,7 +28,6 @@ final class SubscriptionEngineBench { private Store $store; - private EventBus $bus; private Repository $repository; private SubscriptionEngine $subscriptionEngine; @@ -41,8 +38,6 @@ public function setUp(): void { $connection = DbalManager::createConnection(); - $this->bus = DefaultEventBus::create(); - $this->store = new DoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), @@ -55,7 +50,7 @@ public function setUp(): void 'eventstore', ); - $this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata()); + $this->repository = new DefaultRepository($this->store, Profile::metadata()); $subscriptionStore = new DoctrineSubscriptionStore( $connection, diff --git a/tests/Benchmark/blackfire.php b/tests/Benchmark/blackfire.php index aa785c9dc..9236b24bf 100644 --- a/tests/Benchmark/blackfire.php +++ b/tests/Benchmark/blackfire.php @@ -4,7 +4,6 @@ use Doctrine\DBAL\Driver\PDO\SQLite\Driver; use Doctrine\DBAL\DriverManager; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -26,8 +25,6 @@ 'path' => DB_PATH, ]); -$bus = DefaultEventBus::create(); - $store = new DoctrineDbalStore( $connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), @@ -38,7 +35,7 @@ 'eventstore', ); -$repository = new DefaultRepository($store, $bus, Profile::metadata()); +$repository = new DefaultRepository($store, Profile::metadata()); $schemaDirector = new DoctrineSchemaDirector( $connection, diff --git a/tests/Integration/BankAccountSplitStream/IntegrationTest.php b/tests/Integration/BankAccountSplitStream/IntegrationTest.php index 50a48bc7a..f4194b908 100644 --- a/tests/Integration/BankAccountSplitStream/IntegrationTest.php +++ b/tests/Integration/BankAccountSplitStream/IntegrationTest.php @@ -5,7 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream; use Doctrine\DBAL\Connection; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; @@ -63,12 +62,10 @@ public function testSuccessful(): void new MetadataSubscriberAccessorRepository([$bankAccountProjector]), ); - $eventBus = DefaultEventBus::create([$bankAccountProjector]); - $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), $store, - $eventBus, + null, null, new ChainMessageDecorator([ new SplitStreamDecorator(new AttributeEventMetadataFactory()), @@ -91,6 +88,8 @@ public function testSuccessful(): void $bankAccount->addBalance(500); $repository->save($bankAccount); + $engine->run(); + $result = $this->connection->fetchAssociative('SELECT * FROM projection_bank_account WHERE id = ?', ['1']); self::assertIsArray($result); @@ -102,7 +101,7 @@ public function testSuccessful(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), $store, - $eventBus, + null, null, new ChainMessageDecorator([ new SplitStreamDecorator(new AttributeEventMetadataFactory()), @@ -125,6 +124,8 @@ public function testSuccessful(): void $bankAccount->addBalance(200); $repository->save($bankAccount); + $engine->run(); + $result = $this->connection->fetchAssociative('SELECT * FROM projection_bank_account WHERE id = ?', ['1']); self::assertIsArray($result); @@ -136,7 +137,7 @@ public function testSuccessful(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), $store, - $eventBus, + null, null, new ChainMessageDecorator([ new SplitStreamDecorator(new AttributeEventMetadataFactory()), diff --git a/tests/Integration/Pipeline/PipelineChangeStoreTest.php b/tests/Integration/Pipeline/PipelineChangeStoreTest.php index 565ed9b98..cb9ceb23d 100644 --- a/tests/Integration/Pipeline/PipelineChangeStoreTest.php +++ b/tests/Integration/Pipeline/PipelineChangeStoreTest.php @@ -5,7 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Integration\Pipeline; use Doctrine\DBAL\Connection; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware; use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware; @@ -78,8 +77,8 @@ public function testSuccessful(): void $newSchemaDirector->create(); - $oldRepository = new DefaultRepository($oldStore, DefaultEventBus::create(), Profile::metadata()); - $newRepository = new DefaultRepository($newStore, DefaultEventBus::create(), Profile::metadata()); + $oldRepository = new DefaultRepository($oldStore, Profile::metadata()); + $newRepository = new DefaultRepository($newStore, Profile::metadata()); $profileId = ProfileId::fromString('1'); $profile = Profile::create($profileId); diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 3640c7c4f..4023f9a82 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -11,7 +11,6 @@ use Patchlevel\EventSourcing\Debug\Trace\TraceDecorator; use Patchlevel\EventSourcing\Debug\Trace\TraceHeader; use Patchlevel\EventSourcing\Debug\Trace\TraceStack; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; @@ -81,7 +80,6 @@ public function testHappyPath(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - DefaultEventBus::create(), ); $repository = $manager->get(Profile::class); @@ -210,7 +208,6 @@ public function testErrorHandling(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - DefaultEventBus::create(), ); $subscriber = new ErrorProducerSubscriber(); @@ -337,7 +334,7 @@ public function testProcessor(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - DefaultEventBus::create(), + null, null, new TraceDecorator($traceStack), ); @@ -450,7 +447,6 @@ public function testBlueGreenDeployment(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - DefaultEventBus::create(), ); $repository = $manager->get(Profile::class); @@ -612,7 +608,6 @@ public function testBlueGreenDeploymentRollback(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - DefaultEventBus::create(), ); $repository = $manager->get(Profile::class); diff --git a/tests/Unit/Repository/DefaultRepositoryTest.php b/tests/Unit/Repository/DefaultRepositoryTest.php index ec6d2e894..f45ebde4d 100644 --- a/tests/Unit/Repository/DefaultRepositoryTest.php +++ b/tests/Unit/Repository/DefaultRepositoryTest.php @@ -76,8 +76,25 @@ public function testSaveAggregate(): void static fn (array $args): mixed => $args[0](), ); - $eventBus = $this->prophesize(EventBus::class); - $eventBus->dispatch( + $repository = new DefaultRepository( + $store->reveal(), + Profile::metadata(), + ); + + $aggregate = Profile::createProfile( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ); + + $aggregate->visitProfile(ProfileId::fromString('2')); + + $repository->save($aggregate); + } + + public function testUpdateAggregate(): void + { + $store = $this->prophesize(Store::class); + $store->save( Argument::that(static function (Message $message) { if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') { return false; @@ -89,6 +106,9 @@ public function testSaveAggregate(): void return $message->header(AggregateHeader::class)->playhead === 1; }), + )->shouldBeCalled(); + + $store->save( Argument::that(static function (Message $message) { if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') { return false; @@ -102,9 +122,13 @@ public function testSaveAggregate(): void }), )->shouldBeCalled(); + $store->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0](), + ); + $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -113,12 +137,14 @@ public function testSaveAggregate(): void Email::fromString('hallo@patchlevel.de'), ); + $repository->save($aggregate); + $aggregate->visitProfile(ProfileId::fromString('2')); $repository->save($aggregate); } - public function testUpdateAggregate(): void + public function testEventBus(): void { $store = $this->prophesize(Store::class); $store->save( @@ -185,8 +211,8 @@ public function testUpdateAggregate(): void $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), + $eventBus->reveal(), ); $aggregate = Profile::createProfile( @@ -227,25 +253,6 @@ public function testDecorator(): void static fn (array $args): mixed => $args[0](), ); - $eventBus = $this->prophesize(EventBus::class); - $eventBus->dispatch( - Argument::that(static function (Message $message) { - if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') { - return false; - } - - if ($message->header(AggregateHeader::class)->aggregateId !== '1') { - return false; - } - - if ($message->header(ArchivedHeader::class)->archived !== false) { - return false; - } - - return $message->header(AggregateHeader::class)->playhead === 1; - }), - )->shouldBeCalled(); - $decorator = new class implements MessageDecorator { public function __invoke(Message $message): Message { @@ -255,9 +262,9 @@ public function __invoke(Message $message): Message $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), null, + null, $decorator, ); @@ -272,11 +279,9 @@ public function __invoke(Message $message): Message public function testSaveWrongAggregate(): void { $store = $this->prophesize(Store::class); - $eventBus = $this->prophesize(EventBus::class); $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -313,24 +318,8 @@ public function testSaveAggregateWithEmptyEventStream(): void static fn (array $args): mixed => $args[0](), ); - $eventBus = $this->prophesize(EventBus::class); - $eventBus->dispatch( - Argument::that(static function (Message $message) { - if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') { - return false; - } - - if ($message->header(AggregateHeader::class)->aggregateId !== '1') { - return false; - } - - return $message->header(AggregateHeader::class)->playhead === 1; - }), - )->shouldBeCalledOnce(); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -355,12 +344,8 @@ public function testDetachedException(): void static fn (array $args): mixed => $args[0](), ); - $eventBus = $this->prophesize(EventBus::class); - $eventBus->dispatch(Argument::type('object'))->shouldNotBeCalled(); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -389,12 +374,8 @@ public function testUnknownException(): void Argument::type(Message::class), )->shouldNotBeCalled(); - $eventBus = $this->prophesize(EventBus::class); - $eventBus->dispatch(Argument::type('object'))->shouldNotBeCalled(); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -424,12 +405,8 @@ public function testDuplicate(): void static fn (array $args): mixed => $args[0](), ); - $eventBus = $this->prophesize(EventBus::class); - $eventBus->dispatch(Argument::type('object'))->shouldNotBeCalled(); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -464,12 +441,8 @@ public function testOutdated(): void static fn (array $args): mixed => $args[0](), ); - $eventBus = $this->prophesize(EventBus::class); - $eventBus->dispatch(Argument::type('object'))->shouldBeCalled(); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -530,48 +503,11 @@ public function testSaveAggregateWithSplitStream(): void static fn (array $args): mixed => $args[0](), ); - $eventBus = $this->prophesize(EventBus::class); - $eventBus->dispatch( - Argument::that(static function (Message $message) { - if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') { - return false; - } - - if ($message->header(AggregateHeader::class)->aggregateId !== '1') { - return false; - } - - return $message->header(AggregateHeader::class)->playhead === 1; - }), - Argument::that(static function (Message $message) { - if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') { - return false; - } - - if ($message->header(AggregateHeader::class)->aggregateId !== '1') { - return false; - } - - return $message->header(AggregateHeader::class)->playhead === 2; - }), - Argument::that(static function (Message $message) { - if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') { - return false; - } - - if ($message->header(AggregateHeader::class)->aggregateId !== '1') { - return false; - } - - return $message->header(AggregateHeader::class)->playhead === 3; - }), - )->shouldBeCalled(); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), null, + null, new SplitStreamDecorator(new AttributeEventMetadataFactory()), ); @@ -600,11 +536,8 @@ public function testLoadAggregate(): void )->withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable())), ])); - $eventBus = $this->prophesize(EventBus::class); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -641,11 +574,8 @@ public function testLoadAggregateTwice(): void ]), ); - $eventBus = $this->prophesize(EventBus::class); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -666,11 +596,8 @@ public function testAggregateNotFound(): void '1', ))->willReturn(new ArrayStream()); - $eventBus = $this->prophesize(EventBus::class); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -685,11 +612,8 @@ public function testHasAggregate(): void '1', ))->willReturn(1); - $eventBus = $this->prophesize(EventBus::class); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -704,11 +628,8 @@ public function testNotHasAggregate(): void '1', ))->willReturn(0); - $eventBus = $this->prophesize(EventBus::class); - $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), Profile::metadata(), ); @@ -732,8 +653,6 @@ public function testLoadAggregateWithSnapshot(): void 1, ))->willReturn(new ArrayStream()); - $eventBus = $this->prophesize(EventBus::class); - $snapshotStore = $this->prophesize(SnapshotStore::class); $snapshotStore->load( ProfileWithSnapshot::class, @@ -742,8 +661,8 @@ public function testLoadAggregateWithSnapshot(): void $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), ProfileWithSnapshot::metadata(), + null, $snapshotStore->reveal(), ); @@ -784,8 +703,6 @@ public function testLoadAggregateWithSnapshotFirstTime(): void ]), ); - $eventBus = $this->prophesize(EventBus::class); - $snapshotStore = $this->prophesize(SnapshotStore::class); $snapshotStore->load( ProfileWithSnapshot::class, @@ -796,8 +713,8 @@ public function testLoadAggregateWithSnapshotFirstTime(): void $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), ProfileWithSnapshot::metadata(), + null, $snapshotStore->reveal(), ); @@ -842,8 +759,6 @@ public function testLoadAggregateWithSnapshotAndSaveNewVersion(): void )->withHeader(new AggregateHeader('profile', '1', 3, new DateTimeImmutable())), ])); - $eventBus = $this->prophesize(EventBus::class); - $snapshotStore = $this->prophesize(SnapshotStore::class); $snapshotStore->load( ProfileWithSnapshot::class, @@ -854,8 +769,8 @@ public function testLoadAggregateWithSnapshotAndSaveNewVersion(): void $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), ProfileWithSnapshot::metadata(), + null, $snapshotStore->reveal(), ); @@ -880,16 +795,14 @@ public function testLoadAggregateWithoutSnapshot(): void )->withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable())), ])); - $eventBus = $this->prophesize(EventBus::class); - $snapshotStore = $this->prophesize(SnapshotStore::class); $snapshotStore->load(ProfileWithSnapshot::class, ProfileId::fromString('1')) ->willThrow(SnapshotNotFound::class); $repository = new DefaultRepository( $store->reveal(), - $eventBus->reveal(), ProfileWithSnapshot::metadata(), + null, $snapshotStore->reveal(), );