From f69120c60a6ac168df45ce4be888573ff60288bd Mon Sep 17 00:00:00 2001 From: David Badura Date: Sat, 14 Dec 2024 14:36:58 +0100 Subject: [PATCH 1/7] poc replace child aggregates with micro aggregates --- src/Aggregate/StreamNameTranslator.php | 18 +++---- src/Attribute/Stream.php | 18 +++++++ .../AggregateRoot/AggregateRootMetadata.php | 1 + .../AttributeAggregateRootMetadataFactory.php | 20 +++++++ src/Repository/DefaultRepository.php | 9 ++-- .../ChildAggregateIntegrationTest.php | 53 +++++++++++-------- .../ChildAggregate/PersonalInformation.php | 25 ++++++--- tests/Integration/ChildAggregate/Profile.php | 18 ++----- 8 files changed, 104 insertions(+), 58 deletions(-) create mode 100644 src/Attribute/Stream.php diff --git a/src/Aggregate/StreamNameTranslator.php b/src/Aggregate/StreamNameTranslator.php index 00c74bf98..bb35f6a90 100644 --- a/src/Aggregate/StreamNameTranslator.php +++ b/src/Aggregate/StreamNameTranslator.php @@ -4,6 +4,9 @@ namespace Patchlevel\EventSourcing\Aggregate; +use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata; + +use function str_replace; use function strpos; use function substr; @@ -15,20 +18,13 @@ private function __construct() } /** @pure */ - public static function streamName(string $aggregate, string $aggregateId): string - { - return $aggregate . '-' . $aggregateId; - } - - public static function aggregateName(string $stream): string + public static function streamName(string|AggregateRootMetadata $aggregate, string $aggregateId): string { - $pos = strpos($stream, '-'); - - if ($pos === false) { - throw new InvalidAggregateStreamName($stream); + if ($aggregate instanceof AggregateRootMetadata && $aggregate->streamName !== null) { + return str_replace('{id}', $aggregateId, $aggregate->streamName); } - return substr($stream, 0, $pos); + return $aggregate . '-' . $aggregateId; } public static function aggregateId(string $stream): string diff --git a/src/Attribute/Stream.php b/src/Attribute/Stream.php new file mode 100644 index 000000000..c6bf8bea5 --- /dev/null +++ b/src/Attribute/Stream.php @@ -0,0 +1,18 @@ + $name */ + public function __construct( + public readonly string $name, + ) { + } +} diff --git a/src/Metadata/AggregateRoot/AggregateRootMetadata.php b/src/Metadata/AggregateRoot/AggregateRootMetadata.php index ce51220d6..45ada1f0d 100644 --- a/src/Metadata/AggregateRoot/AggregateRootMetadata.php +++ b/src/Metadata/AggregateRoot/AggregateRootMetadata.php @@ -22,6 +22,7 @@ public function __construct( public readonly Snapshot|null $snapshot, /** @var list */ public readonly array $childAggregates = [], + public readonly string|null $streamName = null, ) { } } diff --git a/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php b/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php index 4c62c3cc7..0d3ab327f 100644 --- a/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php +++ b/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php @@ -10,6 +10,7 @@ use Patchlevel\EventSourcing\Attribute\ChildAggregate; use Patchlevel\EventSourcing\Attribute\Id; use Patchlevel\EventSourcing\Attribute\Snapshot as AttributeSnapshot; +use Patchlevel\EventSourcing\Attribute\Stream; use Patchlevel\EventSourcing\Attribute\SuppressMissingApply; use ReflectionClass; use ReflectionIntersectionType; @@ -50,6 +51,7 @@ public function metadata(string $aggregate): AggregateRootMetadata [$suppressEvents, $suppressAll] = $this->findSuppressMissingApply($reflectionClass); $applyMethods = $this->findApplyMethods($reflectionClass, $aggregate, $childAggregates); $snapshot = $this->findSnapshot($reflectionClass); + $streamName = $this->findStreamName($reflectionClass); $metadata = new AggregateRootMetadata( $aggregate, @@ -60,6 +62,7 @@ public function metadata(string $aggregate): AggregateRootMetadata $suppressAll, $snapshot, array_map(static fn (array $list) => $list[0], $childAggregates), + $streamName ?? $aggregateName . '-{id}', ); $this->aggregateMetadata[$aggregate] = $metadata; @@ -139,6 +142,23 @@ private function findSnapshot(ReflectionClass $reflector): Snapshot|null ); } + private function findStreamName(ReflectionClass $reflector): string|null + { + $attributes = $reflector->getAttributes(Stream::class); + + if ($attributes === []) { + return null; + } + + $streamName = $attributes[0]->newInstance()->name; + + if (class_exists($streamName) && is_a($streamName, AggregateRoot::class, true)) { + return $this->metadata($streamName)->streamName; + } + + return $attributes[0]->newInstance()->name; + } + /** @return list */ private function findChildAggregates(ReflectionClass $reflector): array { diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index c5e1808e5..9971f5e9a 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -113,7 +113,7 @@ public function load(AggregateRootId $id): AggregateRoot if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString())) + ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) ->archived(false) ->build(); } else { @@ -178,7 +178,7 @@ public function has(AggregateRootId $id): bool { if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString())) + ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) ->build(); } else { $criteria = (new CriteriaBuilder()) @@ -242,8 +242,7 @@ public function save(AggregateRoot $aggregate): void $clock = $this->clock; $aggregateName = $this->metadata->name; - - $streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($aggregateName, $aggregateId) : null; + $streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($this->metadata, $aggregateId) : null; $messages = array_map( static function (object $event) use ( @@ -338,7 +337,7 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id): if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString())) + ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) ->fromPlayhead($aggregate->playhead()) ->build(); } else { diff --git a/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php b/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php index a7f4c568c..84e8824e9 100644 --- a/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php +++ b/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php @@ -11,7 +11,7 @@ use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Snapshot\Adapter\InMemorySnapshotAdapter; use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore; -use Patchlevel\EventSourcing\Store\DoctrineDbalStore; +use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager; @@ -39,7 +39,7 @@ public function tearDown(): void public function testSuccessful(): void { - $store = new DoctrineDbalStore( + $store = new StreamDoctrineDbalStore( $this->connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), ); @@ -54,7 +54,10 @@ public function testSuccessful(): void $manager = new RunSubscriptionEngineRepositoryManager( new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), + new AggregateRootRegistry([ + 'profile' => Profile::class, + 'personal_information' => PersonalInformation::class, + ]), $store, null, null, @@ -62,7 +65,8 @@ public function testSuccessful(): void $engine, ); - $repository = $manager->get(Profile::class); + $profileRepository = $manager->get(Profile::class); + $personalInformationRepository = $manager->get(PersonalInformation::class); $schemaDirector = new DoctrineSchemaDirector( $this->connection, @@ -74,8 +78,11 @@ public function testSuccessful(): void $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); - $profile->changeName('Snow'); - $repository->save($profile); + $profileRepository->save($profile); + + $personalInformation = $personalInformationRepository->load($profileId); + $personalInformation->changeName('Snow'); + $personalInformationRepository->save($personalInformation); $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', @@ -87,18 +94,18 @@ public function testSuccessful(): void self::assertSame($profileId->toString(), $result['id']); self::assertSame('Snow', $result['name']); - $repository = $manager->get(Profile::class); - $profile = $repository->load($profileId); + $profile = $profileRepository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); self::assertInstanceOf(Profile::class, $profile); self::assertEquals($profileId, $profile->aggregateRootId()); self::assertSame(2, $profile->playhead()); - self::assertSame('Snow', $profile->name()); + self::assertSame('Snow', $personalInformation->name()); } public function testSnapshot(): void { - $store = new DoctrineDbalStore( + $store = new StreamDoctrineDbalStore( $this->connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), ); @@ -113,7 +120,10 @@ public function testSnapshot(): void $manager = new RunSubscriptionEngineRepositoryManager( new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), + new AggregateRootRegistry([ + 'profile' => Profile::class, + 'personal_information' => PersonalInformation::class, + ]), $store, null, new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), @@ -121,7 +131,8 @@ public function testSnapshot(): void $engine, ); - $repository = $manager->get(Profile::class); + $profileRepository = $manager->get(Profile::class); + $personalInformationRepository = $manager->get(PersonalInformation::class); $schemaDirector = new DoctrineSchemaDirector( $this->connection, @@ -133,7 +144,7 @@ public function testSnapshot(): void $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); - $repository->save($profile); + $profileRepository->save($profile); $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', @@ -145,22 +156,22 @@ public function testSnapshot(): void self::assertSame($profileId->toString(), $result['id']); self::assertSame('John', $result['name']); - $repository = $manager->get(Profile::class); - // create snapshot - $repository->load($profileId); + $profileRepository->load($profileId); + $personalInformationRepository->load($profileId); // load from snapshot - $profile = $repository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); - $profile->changeName('Snow'); - $repository->save($profile); + $personalInformation->changeName('Snow'); + $personalInformationRepository->save($personalInformation); - $profile = $repository->load($profileId); + $profile = $profileRepository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); self::assertInstanceOf(Profile::class, $profile); self::assertEquals($profileId, $profile->aggregateRootId()); self::assertSame(2, $profile->playhead()); - self::assertSame('Snow', $profile->name()); + self::assertSame('Snow', $personalInformation->name()); } } diff --git a/tests/Integration/ChildAggregate/PersonalInformation.php b/tests/Integration/ChildAggregate/PersonalInformation.php index 89d61a31e..d1e83c520 100644 --- a/tests/Integration/ChildAggregate/PersonalInformation.php +++ b/tests/Integration/ChildAggregate/PersonalInformation.php @@ -4,19 +4,32 @@ namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate; -use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate; +use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; +use Patchlevel\EventSourcing\Attribute\Aggregate; use Patchlevel\EventSourcing\Attribute\Apply; +use Patchlevel\EventSourcing\Attribute\Id; +use Patchlevel\EventSourcing\Attribute\Stream; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; +use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; -final class PersonalInformation extends BasicChildAggregate +#[Aggregate('personal_information')] +#[Stream(Profile::class)] +final class PersonalInformation extends BasicAggregateRoot { - public function __construct( - private string $name, - ) { + #[Id] + private ProfileId $id; + + private string $name; + + #[Apply(ProfileCreated::class)] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + $this->name = $event->name; } #[Apply(NameChanged::class)] - public function applyNameChanged(NameChanged $event): void + protected function applyNameChanged(NameChanged $event): void { $this->name = $event->name; } diff --git a/tests/Integration/ChildAggregate/Profile.php b/tests/Integration/ChildAggregate/Profile.php index bd0b438cc..dc88958c3 100644 --- a/tests/Integration/ChildAggregate/Profile.php +++ b/tests/Integration/ChildAggregate/Profile.php @@ -7,21 +7,20 @@ use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; use Patchlevel\EventSourcing\Attribute\Aggregate; use Patchlevel\EventSourcing\Attribute\Apply; -use Patchlevel\EventSourcing\Attribute\ChildAggregate; use Patchlevel\EventSourcing\Attribute\Id; use Patchlevel\EventSourcing\Attribute\Snapshot; +use Patchlevel\EventSourcing\Attribute\SuppressMissingApply; +use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; #[Aggregate('profile')] #[Snapshot('default', 1)] +#[SuppressMissingApply([NameChanged::class])] final class Profile extends BasicAggregateRoot { #[Id] private ProfileId $id; - #[ChildAggregate] - protected PersonalInformation $personalInformation; - public static function create(ProfileId $id, string $name): self { $self = new self(); @@ -34,16 +33,5 @@ public static function create(ProfileId $id, string $name): self protected function applyProfileCreated(ProfileCreated $event): void { $this->id = $event->profileId; - $this->personalInformation = new PersonalInformation($event->name); - } - - public function name(): string - { - return $this->personalInformation->name(); - } - - public function changeName(string $name): void - { - $this->personalInformation->changeName($name); } } From 599187f5cf430172c9049da8301ae15f2c185e2a Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 18 Dec 2024 12:14:47 +0100 Subject: [PATCH 2/7] use stream name everywhere --- src/Aggregate/AggregateHeader.php | 2 +- src/Aggregate/StreamNameTranslator.php | 40 ------------------- src/Console/Command/ShowAggregateCommand.php | 20 ++++++---- src/Console/Command/WatchCommand.php | 23 ++--------- .../AggregateRoot/AggregateRootMetadata.php | 21 +++++++++- .../AttributeAggregateRootMetadataFactory.php | 3 +- src/Repository/DefaultRepository.php | 9 ++--- .../AggregateIdArgumentResolver.php | 17 +------- .../ChildAggregate/Events/NameChanged.php | 2 + .../ChildAggregate/PersonalInformation.php | 2 +- .../Projection/ProfileProjector.php | 5 +-- 11 files changed, 48 insertions(+), 96 deletions(-) delete mode 100644 src/Aggregate/StreamNameTranslator.php diff --git a/src/Aggregate/AggregateHeader.php b/src/Aggregate/AggregateHeader.php index 9fe5f1ec3..6b23f1deb 100644 --- a/src/Aggregate/AggregateHeader.php +++ b/src/Aggregate/AggregateHeader.php @@ -22,6 +22,6 @@ public function __construct( public function streamName(): string { - return StreamNameTranslator::streamName($this->aggregateName, $this->aggregateId); + return $this->aggregateName . '-' . $this->aggregateId; } } diff --git a/src/Aggregate/StreamNameTranslator.php b/src/Aggregate/StreamNameTranslator.php deleted file mode 100644 index bb35f6a90..000000000 --- a/src/Aggregate/StreamNameTranslator.php +++ /dev/null @@ -1,40 +0,0 @@ -streamName !== null) { - return str_replace('{id}', $aggregateId, $aggregate->streamName); - } - - return $aggregate . '-' . $aggregateId; - } - - public static function aggregateId(string $stream): string - { - $pos = strpos($stream, '-'); - - if ($pos === false) { - throw new InvalidAggregateStreamName($stream); - } - - return substr($stream, $pos + 1); - } -} diff --git a/src/Console/Command/ShowAggregateCommand.php b/src/Console/Command/ShowAggregateCommand.php index 1b71c30ce..6cb5fe716 100644 --- a/src/Console/Command/ShowAggregateCommand.php +++ b/src/Console/Command/ShowAggregateCommand.php @@ -4,11 +4,12 @@ namespace Patchlevel\EventSourcing\Console\Command; -use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator; use Patchlevel\EventSourcing\Console\InputHelper; use Patchlevel\EventSourcing\Console\OutputStyle; use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; +use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadataFactory; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; +use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootMetadataFactory; use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion; use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion; @@ -38,6 +39,7 @@ public function __construct( private readonly EventSerializer $eventSerializer, private readonly HeadersSerializer $headersSerializer, private readonly AggregateRootRegistry $aggregateRootRegistry, + private readonly AggregateRootMetadataFactory $aggregateRootMetadataFactory = new AttributeAggregateRootMetadataFactory(), ) { parent::__construct(); } @@ -65,10 +67,6 @@ protected function execute(InputInterface $input, OutputInterface $output): int } $id = InputHelper::nullableString($input->getArgument('id')); - if ($id === null) { - $question = new Question('Enter the aggregate id'); - $id = InputHelper::string($console->askQuestion($question)); - } if (!$this->aggregateRootRegistry->hasAggregateName($aggregate)) { $console->error(sprintf('aggregate type "%s" not exists', $aggregate)); @@ -77,14 +75,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int } if ($this->store instanceof StreamStore) { + $aggregateClass = $this->aggregateRootRegistry->aggregateClass($aggregate); + $streamName = $this->aggregateRootMetadataFactory->metadata($aggregateClass)->streamName($id); + $stream = $this->store->load( new Criteria( - new StreamCriterion( - StreamNameTranslator::streamName($aggregate, $id), - ), + new StreamCriterion($streamName), ), ); } else { + if ($id === null) { + $question = new Question('Enter the aggregate id'); + $id = InputHelper::string($console->askQuestion($question)); + } + $stream = $this->store->load( new Criteria( new AggregateNameCriterion($aggregate), diff --git a/src/Console/Command/WatchCommand.php b/src/Console/Command/WatchCommand.php index e7a82af6e..91fc67af8 100644 --- a/src/Console/Command/WatchCommand.php +++ b/src/Console/Command/WatchCommand.php @@ -4,7 +4,6 @@ namespace Patchlevel\EventSourcing\Console\Command; -use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator; use Patchlevel\EventSourcing\Console\InputHelper; use Patchlevel\EventSourcing\Console\OutputStyle; use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer; @@ -12,7 +11,6 @@ use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\StreamStore; use Patchlevel\EventSourcing\Store\SubscriptionStore; use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; @@ -88,24 +86,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int $criteriaBuilder = new CriteriaBuilder(); - if ($stream !== null) { - $criteriaBuilder->streamName($stream); - } - - if ($this->store instanceof StreamStore) { - if ($aggregate !== null || $aggregateId !== null) { - if ($aggregate === null || $aggregateId === null) { - $console->error('You must provide both aggregate and aggregate-id or none of them'); - - return 1; - } - - $criteriaBuilder->streamName(StreamNameTranslator::streamName($aggregate, $aggregateId)); - } - } else { - $criteriaBuilder->aggregateName($aggregate); - $criteriaBuilder->aggregateId($aggregateId); - } + $criteriaBuilder->streamName($stream); + $criteriaBuilder->aggregateName($aggregate); + $criteriaBuilder->aggregateId($aggregateId); $criteria = $criteriaBuilder->build(); diff --git a/src/Metadata/AggregateRoot/AggregateRootMetadata.php b/src/Metadata/AggregateRoot/AggregateRootMetadata.php index 45ada1f0d..4072e75f5 100644 --- a/src/Metadata/AggregateRoot/AggregateRootMetadata.php +++ b/src/Metadata/AggregateRoot/AggregateRootMetadata.php @@ -6,9 +6,14 @@ use Patchlevel\EventSourcing\Aggregate\AggregateRoot; +use function str_contains; +use function str_replace; + /** @template T of AggregateRoot */ final class AggregateRootMetadata { + public readonly string $streamName; + public function __construct( /** @var class-string */ public readonly string $className, @@ -22,7 +27,21 @@ public function __construct( public readonly Snapshot|null $snapshot, /** @var list */ public readonly array $childAggregates = [], - public readonly string|null $streamName = null, + string|null $streamName = null, ) { + $this->streamName = $streamName ?? $this->name . '-{id}'; + } + + public function streamName(string|null $aggregateId = null): string + { + if ($aggregateId === null) { + if (str_contains($this->streamName, '{id}')) { + throw new AggregateIdMissing($this->className); + } + + return $this->streamName; + } + + return str_replace('{id}', $aggregateId, $this->streamName); } } diff --git a/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php b/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php index 0d3ab327f..e622b4910 100644 --- a/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php +++ b/src/Metadata/AggregateRoot/AttributeAggregateRootMetadataFactory.php @@ -51,7 +51,6 @@ public function metadata(string $aggregate): AggregateRootMetadata [$suppressEvents, $suppressAll] = $this->findSuppressMissingApply($reflectionClass); $applyMethods = $this->findApplyMethods($reflectionClass, $aggregate, $childAggregates); $snapshot = $this->findSnapshot($reflectionClass); - $streamName = $this->findStreamName($reflectionClass); $metadata = new AggregateRootMetadata( $aggregate, @@ -62,7 +61,7 @@ public function metadata(string $aggregate): AggregateRootMetadata $suppressAll, $snapshot, array_map(static fn (array $list) => $list[0], $childAggregates), - $streamName ?? $aggregateName . '-{id}', + $this->findStreamName($reflectionClass), ); $this->aggregateMetadata[$aggregate] = $metadata; diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index 9971f5e9a..c17b3c4c9 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -7,7 +7,6 @@ use Patchlevel\EventSourcing\Aggregate\AggregateHeader; use Patchlevel\EventSourcing\Aggregate\AggregateRoot; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; -use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator; use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Message\Message; @@ -113,7 +112,7 @@ public function load(AggregateRootId $id): AggregateRoot if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) + ->streamName($this->metadata->streamName($id->toString())) ->archived(false) ->build(); } else { @@ -178,7 +177,7 @@ public function has(AggregateRootId $id): bool { if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) + ->streamName($this->metadata->streamName($id->toString())) ->build(); } else { $criteria = (new CriteriaBuilder()) @@ -242,7 +241,7 @@ public function save(AggregateRoot $aggregate): void $clock = $this->clock; $aggregateName = $this->metadata->name; - $streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($this->metadata, $aggregateId) : null; + $streamName = $this->useStreamHeader ? $this->metadata->streamName($aggregateId) : null; $messages = array_map( static function (object $event) use ( @@ -337,7 +336,7 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id): if ($this->useStreamHeader) { $criteria = (new CriteriaBuilder()) - ->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString())) + ->streamName($this->metadata->streamName($id->toString())) ->fromPlayhead($aggregate->playhead()) ->build(); } else { diff --git a/src/Subscription/Subscriber/ArgumentResolver/AggregateIdArgumentResolver.php b/src/Subscription/Subscriber/ArgumentResolver/AggregateIdArgumentResolver.php index ef56019c4..2ed963cfd 100644 --- a/src/Subscription/Subscriber/ArgumentResolver/AggregateIdArgumentResolver.php +++ b/src/Subscription/Subscriber/ArgumentResolver/AggregateIdArgumentResolver.php @@ -6,11 +6,8 @@ use Patchlevel\EventSourcing\Aggregate\AggregateHeader; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; -use Patchlevel\EventSourcing\Aggregate\StreamNameTranslator; -use Patchlevel\EventSourcing\Message\HeaderNotFound; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata; -use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use function class_exists; use function is_a; @@ -21,19 +18,9 @@ public function resolve(ArgumentMetadata $argument, Message $message): Aggregate { /** @var class-string $class */ $class = $argument->type; + $id = $message->header(AggregateHeader::class)->aggregateId; - try { - $id = $message->header(AggregateHeader::class)->aggregateId; - - return $class::fromString($id); - } catch (HeaderNotFound) { - // do nothing - } - - $stream = $message->header(StreamNameHeader::class)->streamName; - $aggregateId = StreamNameTranslator::aggregateId($stream); - - return $class::fromString($aggregateId); + return $class::fromString($id); } public function support(ArgumentMetadata $argument, string $eventClass): bool diff --git a/tests/Integration/ChildAggregate/Events/NameChanged.php b/tests/Integration/ChildAggregate/Events/NameChanged.php index 75c85e6f5..3e5c665d5 100644 --- a/tests/Integration/ChildAggregate/Events/NameChanged.php +++ b/tests/Integration/ChildAggregate/Events/NameChanged.php @@ -5,11 +5,13 @@ namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events; use Patchlevel\EventSourcing\Attribute\Event; +use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\ProfileId; #[Event('profile.name_changed')] final class NameChanged { public function __construct( + public ProfileId $profileId, public string $name, ) { } diff --git a/tests/Integration/ChildAggregate/PersonalInformation.php b/tests/Integration/ChildAggregate/PersonalInformation.php index d1e83c520..dfee9bf16 100644 --- a/tests/Integration/ChildAggregate/PersonalInformation.php +++ b/tests/Integration/ChildAggregate/PersonalInformation.php @@ -41,6 +41,6 @@ public function name(): string public function changeName(string $name): void { - $this->recordThat(new NameChanged($name)); + $this->recordThat(new NameChanged($this->id, $name)); } } diff --git a/tests/Integration/ChildAggregate/Projection/ProfileProjector.php b/tests/Integration/ChildAggregate/Projection/ProfileProjector.php index 57dd12331..7ef232458 100644 --- a/tests/Integration/ChildAggregate/Projection/ProfileProjector.php +++ b/tests/Integration/ChildAggregate/Projection/ProfileProjector.php @@ -12,7 +12,6 @@ use Patchlevel\EventSourcing\Attribute\Teardown; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; -use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\ProfileId; #[Projector('profile-1')] final class ProfileProjector @@ -52,12 +51,12 @@ public function handleProfileCreated(ProfileCreated $profileCreated): void } #[Subscribe(NameChanged::class)] - public function handleNameChanged(NameChanged $nameChanged, ProfileId $profileId): void + public function handleNameChanged(NameChanged $nameChanged): void { $this->connection->update( 'projection_profile', ['name' => $nameChanged->name], - ['id' => $profileId->toString()], + ['id' => $nameChanged->profileId->toString()], ); } } From 45f5b76f77b90f09098b751617076a44c74d5c95 Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 18 Dec 2024 13:37:02 +0100 Subject: [PATCH 3/7] add own integration tests --- .../ChildAggregateIntegrationTest.php | 53 +++--- .../ChildAggregate/Events/NameChanged.php | 2 - .../ChildAggregate/PersonalInformation.php | 27 +-- tests/Integration/ChildAggregate/Profile.php | 18 +- .../Projection/ProfileProjector.php | 5 +- .../ChildAggregateIntegrationTest.php | 177 ++++++++++++++++++ .../MicroAggregate/Events/NameChanged.php | 18 ++ .../MicroAggregate/Events/ProfileCreated.php | 18 ++ .../MicroAggregate/PersonalInformation.php | 46 +++++ tests/Integration/MicroAggregate/Profile.php | 37 ++++ .../Integration/MicroAggregate/ProfileId.php | 13 ++ .../Projection/ProfileProjector.php | 62 ++++++ .../MicroAggregate/SendEmailMock.php | 25 +++ 13 files changed, 442 insertions(+), 59 deletions(-) create mode 100644 tests/Integration/MicroAggregate/ChildAggregateIntegrationTest.php create mode 100644 tests/Integration/MicroAggregate/Events/NameChanged.php create mode 100644 tests/Integration/MicroAggregate/Events/ProfileCreated.php create mode 100644 tests/Integration/MicroAggregate/PersonalInformation.php create mode 100644 tests/Integration/MicroAggregate/Profile.php create mode 100644 tests/Integration/MicroAggregate/ProfileId.php create mode 100644 tests/Integration/MicroAggregate/Projection/ProfileProjector.php create mode 100644 tests/Integration/MicroAggregate/SendEmailMock.php diff --git a/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php b/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php index 84e8824e9..a7f4c568c 100644 --- a/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php +++ b/tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php @@ -11,7 +11,7 @@ use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Snapshot\Adapter\InMemorySnapshotAdapter; use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore; -use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; +use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager; @@ -39,7 +39,7 @@ public function tearDown(): void public function testSuccessful(): void { - $store = new StreamDoctrineDbalStore( + $store = new DoctrineDbalStore( $this->connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), ); @@ -54,10 +54,7 @@ public function testSuccessful(): void $manager = new RunSubscriptionEngineRepositoryManager( new DefaultRepositoryManager( - new AggregateRootRegistry([ - 'profile' => Profile::class, - 'personal_information' => PersonalInformation::class, - ]), + new AggregateRootRegistry(['profile' => Profile::class]), $store, null, null, @@ -65,8 +62,7 @@ public function testSuccessful(): void $engine, ); - $profileRepository = $manager->get(Profile::class); - $personalInformationRepository = $manager->get(PersonalInformation::class); + $repository = $manager->get(Profile::class); $schemaDirector = new DoctrineSchemaDirector( $this->connection, @@ -78,11 +74,8 @@ public function testSuccessful(): void $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); - $profileRepository->save($profile); - - $personalInformation = $personalInformationRepository->load($profileId); - $personalInformation->changeName('Snow'); - $personalInformationRepository->save($personalInformation); + $profile->changeName('Snow'); + $repository->save($profile); $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', @@ -94,18 +87,18 @@ public function testSuccessful(): void self::assertSame($profileId->toString(), $result['id']); self::assertSame('Snow', $result['name']); - $profile = $profileRepository->load($profileId); - $personalInformation = $personalInformationRepository->load($profileId); + $repository = $manager->get(Profile::class); + $profile = $repository->load($profileId); self::assertInstanceOf(Profile::class, $profile); self::assertEquals($profileId, $profile->aggregateRootId()); self::assertSame(2, $profile->playhead()); - self::assertSame('Snow', $personalInformation->name()); + self::assertSame('Snow', $profile->name()); } public function testSnapshot(): void { - $store = new StreamDoctrineDbalStore( + $store = new DoctrineDbalStore( $this->connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), ); @@ -120,10 +113,7 @@ public function testSnapshot(): void $manager = new RunSubscriptionEngineRepositoryManager( new DefaultRepositoryManager( - new AggregateRootRegistry([ - 'profile' => Profile::class, - 'personal_information' => PersonalInformation::class, - ]), + new AggregateRootRegistry(['profile' => Profile::class]), $store, null, new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), @@ -131,8 +121,7 @@ public function testSnapshot(): void $engine, ); - $profileRepository = $manager->get(Profile::class); - $personalInformationRepository = $manager->get(PersonalInformation::class); + $repository = $manager->get(Profile::class); $schemaDirector = new DoctrineSchemaDirector( $this->connection, @@ -144,7 +133,7 @@ public function testSnapshot(): void $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); - $profileRepository->save($profile); + $repository->save($profile); $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', @@ -156,22 +145,22 @@ public function testSnapshot(): void self::assertSame($profileId->toString(), $result['id']); self::assertSame('John', $result['name']); + $repository = $manager->get(Profile::class); + // create snapshot - $profileRepository->load($profileId); - $personalInformationRepository->load($profileId); + $repository->load($profileId); // load from snapshot - $personalInformation = $personalInformationRepository->load($profileId); + $profile = $repository->load($profileId); - $personalInformation->changeName('Snow'); - $personalInformationRepository->save($personalInformation); + $profile->changeName('Snow'); + $repository->save($profile); - $profile = $profileRepository->load($profileId); - $personalInformation = $personalInformationRepository->load($profileId); + $profile = $repository->load($profileId); self::assertInstanceOf(Profile::class, $profile); self::assertEquals($profileId, $profile->aggregateRootId()); self::assertSame(2, $profile->playhead()); - self::assertSame('Snow', $personalInformation->name()); + self::assertSame('Snow', $profile->name()); } } diff --git a/tests/Integration/ChildAggregate/Events/NameChanged.php b/tests/Integration/ChildAggregate/Events/NameChanged.php index 3e5c665d5..75c85e6f5 100644 --- a/tests/Integration/ChildAggregate/Events/NameChanged.php +++ b/tests/Integration/ChildAggregate/Events/NameChanged.php @@ -5,13 +5,11 @@ namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events; use Patchlevel\EventSourcing\Attribute\Event; -use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\ProfileId; #[Event('profile.name_changed')] final class NameChanged { public function __construct( - public ProfileId $profileId, public string $name, ) { } diff --git a/tests/Integration/ChildAggregate/PersonalInformation.php b/tests/Integration/ChildAggregate/PersonalInformation.php index dfee9bf16..89d61a31e 100644 --- a/tests/Integration/ChildAggregate/PersonalInformation.php +++ b/tests/Integration/ChildAggregate/PersonalInformation.php @@ -4,32 +4,19 @@ namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate; -use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; -use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate; use Patchlevel\EventSourcing\Attribute\Apply; -use Patchlevel\EventSourcing\Attribute\Id; -use Patchlevel\EventSourcing\Attribute\Stream; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; -use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; -#[Aggregate('personal_information')] -#[Stream(Profile::class)] -final class PersonalInformation extends BasicAggregateRoot +final class PersonalInformation extends BasicChildAggregate { - #[Id] - private ProfileId $id; - - private string $name; - - #[Apply(ProfileCreated::class)] - protected function applyProfileCreated(ProfileCreated $event): void - { - $this->id = $event->profileId; - $this->name = $event->name; + public function __construct( + private string $name, + ) { } #[Apply(NameChanged::class)] - protected function applyNameChanged(NameChanged $event): void + public function applyNameChanged(NameChanged $event): void { $this->name = $event->name; } @@ -41,6 +28,6 @@ public function name(): string public function changeName(string $name): void { - $this->recordThat(new NameChanged($this->id, $name)); + $this->recordThat(new NameChanged($name)); } } diff --git a/tests/Integration/ChildAggregate/Profile.php b/tests/Integration/ChildAggregate/Profile.php index dc88958c3..bd0b438cc 100644 --- a/tests/Integration/ChildAggregate/Profile.php +++ b/tests/Integration/ChildAggregate/Profile.php @@ -7,20 +7,21 @@ use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; use Patchlevel\EventSourcing\Attribute\Aggregate; use Patchlevel\EventSourcing\Attribute\Apply; +use Patchlevel\EventSourcing\Attribute\ChildAggregate; use Patchlevel\EventSourcing\Attribute\Id; use Patchlevel\EventSourcing\Attribute\Snapshot; -use Patchlevel\EventSourcing\Attribute\SuppressMissingApply; -use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; #[Aggregate('profile')] #[Snapshot('default', 1)] -#[SuppressMissingApply([NameChanged::class])] final class Profile extends BasicAggregateRoot { #[Id] private ProfileId $id; + #[ChildAggregate] + protected PersonalInformation $personalInformation; + public static function create(ProfileId $id, string $name): self { $self = new self(); @@ -33,5 +34,16 @@ public static function create(ProfileId $id, string $name): self protected function applyProfileCreated(ProfileCreated $event): void { $this->id = $event->profileId; + $this->personalInformation = new PersonalInformation($event->name); + } + + public function name(): string + { + return $this->personalInformation->name(); + } + + public function changeName(string $name): void + { + $this->personalInformation->changeName($name); } } diff --git a/tests/Integration/ChildAggregate/Projection/ProfileProjector.php b/tests/Integration/ChildAggregate/Projection/ProfileProjector.php index 7ef232458..57dd12331 100644 --- a/tests/Integration/ChildAggregate/Projection/ProfileProjector.php +++ b/tests/Integration/ChildAggregate/Projection/ProfileProjector.php @@ -12,6 +12,7 @@ use Patchlevel\EventSourcing\Attribute\Teardown; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated; +use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\ProfileId; #[Projector('profile-1')] final class ProfileProjector @@ -51,12 +52,12 @@ public function handleProfileCreated(ProfileCreated $profileCreated): void } #[Subscribe(NameChanged::class)] - public function handleNameChanged(NameChanged $nameChanged): void + public function handleNameChanged(NameChanged $nameChanged, ProfileId $profileId): void { $this->connection->update( 'projection_profile', ['name' => $nameChanged->name], - ['id' => $nameChanged->profileId->toString()], + ['id' => $profileId->toString()], ); } } diff --git a/tests/Integration/MicroAggregate/ChildAggregateIntegrationTest.php b/tests/Integration/MicroAggregate/ChildAggregateIntegrationTest.php new file mode 100644 index 000000000..17fd33a59 --- /dev/null +++ b/tests/Integration/MicroAggregate/ChildAggregateIntegrationTest.php @@ -0,0 +1,177 @@ +connection = DbalManager::createConnection(); + } + + public function tearDown(): void + { + $this->connection->close(); + SendEmailMock::reset(); + } + + public function testSuccessful(): void + { + $store = new StreamDoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $profileProjector = new ProfileProjector($this->connection); + + $engine = new ThrowOnErrorSubscriptionEngine(new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([$profileProjector]), + )); + + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry([ + 'profile' => Profile::class, + 'personal_information' => PersonalInformation::class, + ]), + $store, + null, + null, + ), + $engine, + ); + + $profileRepository = $manager->get(Profile::class); + $personalInformationRepository = $manager->get(PersonalInformation::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + $store, + ); + + $schemaDirector->create(); + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $profileRepository->save($profile); + + $personalInformation = $personalInformationRepository->load($profileId); + $personalInformation->changeName('Snow'); + $personalInformationRepository->save($personalInformation); + + $result = $this->connection->fetchAssociative( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsArray($result); + self::assertArrayHasKey('id', $result); + self::assertSame($profileId->toString(), $result['id']); + self::assertSame('Snow', $result['name']); + + $profile = $profileRepository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(2, $profile->playhead()); + self::assertSame('Snow', $personalInformation->name()); + } + + public function testSnapshot(): void + { + $store = new StreamDoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $profileProjection = new ProfileProjector($this->connection); + + $engine = new DefaultSubscriptionEngine( + $store, + new InMemorySubscriptionStore(), + new MetadataSubscriberAccessorRepository([$profileProjection]), + ); + + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry([ + 'profile' => Profile::class, + 'personal_information' => PersonalInformation::class, + ]), + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + ), + $engine, + ); + + $profileRepository = $manager->get(Profile::class); + $personalInformationRepository = $manager->get(PersonalInformation::class); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + $store, + ); + + $schemaDirector->create(); + $engine->setup(skipBooting: true); + + $profileId = ProfileId::generate(); + $profile = Profile::create($profileId, 'John'); + $profileRepository->save($profile); + + $result = $this->connection->fetchAssociative( + 'SELECT * FROM projection_profile WHERE id = ?', + [$profileId->toString()], + ); + + self::assertIsArray($result); + self::assertArrayHasKey('id', $result); + self::assertSame($profileId->toString(), $result['id']); + self::assertSame('John', $result['name']); + + // create snapshot + $profileRepository->load($profileId); + $personalInformationRepository->load($profileId); + + // load from snapshot + $personalInformation = $personalInformationRepository->load($profileId); + + $personalInformation->changeName('Snow'); + $personalInformationRepository->save($personalInformation); + + $profile = $profileRepository->load($profileId); + $personalInformation = $personalInformationRepository->load($profileId); + + self::assertInstanceOf(Profile::class, $profile); + self::assertEquals($profileId, $profile->aggregateRootId()); + self::assertSame(2, $profile->playhead()); + self::assertSame('Snow', $personalInformation->name()); + } +} diff --git a/tests/Integration/MicroAggregate/Events/NameChanged.php b/tests/Integration/MicroAggregate/Events/NameChanged.php new file mode 100644 index 000000000..a3ce48da5 --- /dev/null +++ b/tests/Integration/MicroAggregate/Events/NameChanged.php @@ -0,0 +1,18 @@ +id = $event->profileId; + $this->name = $event->name; + } + + #[Apply(NameChanged::class)] + protected function applyNameChanged(NameChanged $event): void + { + $this->name = $event->name; + } + + public function name(): string + { + return $this->name; + } + + public function changeName(string $name): void + { + $this->recordThat(new NameChanged($this->id, $name)); + } +} diff --git a/tests/Integration/MicroAggregate/Profile.php b/tests/Integration/MicroAggregate/Profile.php new file mode 100644 index 000000000..42e52e7bc --- /dev/null +++ b/tests/Integration/MicroAggregate/Profile.php @@ -0,0 +1,37 @@ +recordThat(new ProfileCreated($id, $name)); + + return $self; + } + + #[Apply(ProfileCreated::class)] + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId; + } +} diff --git a/tests/Integration/MicroAggregate/ProfileId.php b/tests/Integration/MicroAggregate/ProfileId.php new file mode 100644 index 000000000..eb0ee0191 --- /dev/null +++ b/tests/Integration/MicroAggregate/ProfileId.php @@ -0,0 +1,13 @@ +addColumn('id', 'string')->setLength(36); + $table->addColumn('name', 'string')->setLength(255); + $table->setPrimaryKey(['id']); + + $this->connection->createSchemaManager()->createTable($table); + } + + #[Teardown] + public function drop(): void + { + $this->connection->createSchemaManager()->dropTable('projection_profile'); + } + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->connection->executeStatement( + 'INSERT INTO projection_profile (id, name) VALUES(:id, :name);', + [ + 'id' => $profileCreated->profileId->toString(), + 'name' => $profileCreated->name, + ], + ); + } + + #[Subscribe(NameChanged::class)] + public function handleNameChanged(NameChanged $nameChanged): void + { + $this->connection->update( + 'projection_profile', + ['name' => $nameChanged->name], + ['id' => $nameChanged->profileId->toString()], + ); + } +} diff --git a/tests/Integration/MicroAggregate/SendEmailMock.php b/tests/Integration/MicroAggregate/SendEmailMock.php new file mode 100644 index 000000000..3212ad12c --- /dev/null +++ b/tests/Integration/MicroAggregate/SendEmailMock.php @@ -0,0 +1,25 @@ + Date: Wed, 18 Dec 2024 13:47:16 +0100 Subject: [PATCH 4/7] fix ci --- src/Console/Command/ShowAggregateCommand.php | 10 +++++++++- .../AggregateRoot/AggregateRootMetadata.php | 2 +- .../MissingAggregateIdForStreamName.php | 17 +++++++++++++++++ ...st.php => MicroAggregateIntegrationTest.php} | 2 +- 4 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 src/Metadata/AggregateRoot/MissingAggregateIdForStreamName.php rename tests/Integration/MicroAggregate/{ChildAggregateIntegrationTest.php => MicroAggregateIntegrationTest.php} (99%) diff --git a/src/Console/Command/ShowAggregateCommand.php b/src/Console/Command/ShowAggregateCommand.php index 6cb5fe716..94e7d3b59 100644 --- a/src/Console/Command/ShowAggregateCommand.php +++ b/src/Console/Command/ShowAggregateCommand.php @@ -74,6 +74,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 1; } + $streamName = null; + if ($this->store instanceof StreamStore) { $aggregateClass = $this->aggregateRootRegistry->aggregateClass($aggregate); $streamName = $this->aggregateRootMetadataFactory->metadata($aggregateClass)->streamName($id); @@ -109,7 +111,13 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 0; } - $console->error(sprintf('aggregate "%s" => "%s" not found', $aggregate, $id)); + if ($id !== null) { + $console->error(sprintf('aggregate "%s" => "%s" not found', $aggregate, $id)); + } elseif ($streamName !== null) { + $console->error(sprintf('aggregate for stream "%s" not found', $streamName)); + } else { + $console->error('aggregate not found'); + } return 1; } diff --git a/src/Metadata/AggregateRoot/AggregateRootMetadata.php b/src/Metadata/AggregateRoot/AggregateRootMetadata.php index 4072e75f5..afe79ff98 100644 --- a/src/Metadata/AggregateRoot/AggregateRootMetadata.php +++ b/src/Metadata/AggregateRoot/AggregateRootMetadata.php @@ -36,7 +36,7 @@ public function streamName(string|null $aggregateId = null): string { if ($aggregateId === null) { if (str_contains($this->streamName, '{id}')) { - throw new AggregateIdMissing($this->className); + throw new MissingAggregateIdForStreamName($this->streamName); } return $this->streamName; diff --git a/src/Metadata/AggregateRoot/MissingAggregateIdForStreamName.php b/src/Metadata/AggregateRoot/MissingAggregateIdForStreamName.php new file mode 100644 index 000000000..cae407132 --- /dev/null +++ b/src/Metadata/AggregateRoot/MissingAggregateIdForStreamName.php @@ -0,0 +1,17 @@ + Date: Wed, 18 Dec 2024 15:18:54 +0100 Subject: [PATCH 5/7] add docs --- docs/pages/aggregate.md | 269 ++++++++++++++++++++++++++++++---------- 1 file changed, 201 insertions(+), 68 deletions(-) diff --git a/docs/pages/aggregate.md b/docs/pages/aggregate.md index fafe6209a..bf2b07a69 100644 --- a/docs/pages/aggregate.md +++ b/docs/pages/aggregate.md @@ -358,88 +358,45 @@ final class Profile extends BasicAggregateRoot When all events are suppressed, debugging becomes more difficult if you forget an apply method. -## Child Aggregates +## Stream Name -??? example "Experimental" - - This feature is still experimental and may change in the future. - Use it with caution. - -In some cases, it makes sense to split an aggregate into several smaller aggregates. -This can be the case if the aggregate becomes too large or if the aggregate is used in different contexts. -Child aggregates can be used for this purpose and work in the same way as the root aggregate. - -In the following example, we have an `Order` aggregate that has a `Shipping` child aggregate. +The stream name is the name of the stream in the event store. +By default, the stream name has the format `aggregateName-aggregateId`. +But you can also define your own stream name with the `Stream` attribute. +You can use the placeholder `{id}` to insert the aggregate id into the stream name. ```php -use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate; +use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; +use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Attribute\Stream; -final class Shipping extends BasicChildAggregate +#[Aggregate('profile')] +#[Stream('profile-{id}')] +final class Profile extends BasicAggregateRoot { - private bool $arrived = false; - - public function __construct( - private string $trackingId, - ) { - } - - public function arrive(): void - { - $this->recordThat(new Arrived()); - } - - #[Apply] - public function applyArrived(Arrived $event): void - { - $this->arrived = true; - } - - public function isArrived(): bool - { - return $this->arrived; - } + // ... } ``` -!!! warning - - The apply method must be public, otherwise the root aggregate cannot call it. - -!!! note - - Supress missing apply methods need to be defined in the root aggregate. - -And the `Order` aggregate root looks like this: +You can use also an aggregate class for the stream name. +In this case you use the stream name from another aggregate. +This is useful if you want to store multiple aggregates in the same stream, +for example if you want to use the micro aggregate pattern. ```php use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; -use Patchlevel\EventSourcing\Aggregate\Uuid; -use Patchlevel\EventSourcing\Attribute\ChildAggregate; +use Patchlevel\EventSourcing\Attribute\Aggregate; -#[Aggregate('order')] -final class Order extends BasicAggregateRoot +#[Aggregate('guest_list')] +#[Stream(Meeting::class)] +final class GuestList extends BasicAggregateRoot { - #[ChildAggregate] - private Shipping $shipping; - - public static function create(Uuid $id, string $trackingId): static - { - $self = new static(); - $self->recordThat(new OrderCreated($id, $trackingId)); - - return $self; - } - - public function applyOrderCreated(OrderCreated $event): void - { - $this->shipping = new Shipping($event->trackingId); - } - - public function arrive(): void - { - $this->shipping->arrive(); - } + // ... } ``` +!!! tip + + You can find more about splitting aggregates [here](./aggregate.md#splitting-aggregates). + ## Business rules Usually, aggregates have business rules that must be observed. Like there may not be more than 10 people in a group. @@ -688,6 +645,182 @@ Or for test purposes the `FrozenClock`, which always returns the same time. You can find out more about clock [here](./clock.md). +## Splitting Aggregates + +In some cases, it makes sense to split an aggregate into several smaller aggregates. +This can be the case if the aggregate becomes too large or if the aggregate is used in different contexts. +We currently support two patterns for this: Micro Aggregates and Child Aggregates (experimental). + +### Micro Aggregates + +Micro Aggregates are a pattern to split an aggregate into several smaller aggregates. +Each of these aggregates is saved in the same stream. +This gives the Micro Aggregates the ability to independently manage their state and trigger their events, +but still allows the associated Micro Aggregates to listen to the events in order to enforce their own business rules. + +In the following example, we have an `Order` micro aggregate and a `Shipping` micro aggregate. +The order handle the order itself and the shipping handle the shipping of the order. + +```php +use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; +use Patchlevel\EventSourcing\Aggregate\Uuid; +use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Attribute\Apply; +use Patchlevel\EventSourcing\Attribute\Id; + +#[Aggregate('order')] +final class Order extends BasicAggregateRoot +{ + #[Id] + private Uuid $id; + + public static function create(Uuid $id): static + { + $self = new static(); + $self->recordThat(new OrderCreated($id)); + + return $self; + } + + #[Apply] + public function applyOrderCreated(OrderCreated $event): void + { + $this->id = $event->id; + } +} +``` +With this pattern, the Shipping aggregate can listen to the events of the Order aggregate. +In this case, the `Shipping` aggregate listens to the `OrderCreated` event to initialize itself. + +```php +use Patchlevel\EventSourcing\Aggregate\Uuid; +use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Attribute\Apply; +use Patchlevel\EventSourcing\Attribute\Id; +use Patchlevel\EventSourcing\Attribute\Stream; + +#[Aggregate('shipping')] +#[Stream(Order::class)] +final class Shipping extends BasicChildAggregate +{ + #[Id] + private Uuid $id; + + private bool $arrived = false; + + public function arrive(): void + { + $this->recordThat(new Arrived()); + } + + #[Apply] + public function applyOrderCreated(OrderCreated $event): void + { + $this->id = $event->id; + } + + #[Apply] + public function applyArrived(Arrived $event): void + { + $this->arrived = true; + } + + public function isArrived(): bool + { + return $this->arrived; + } +} +``` +### Child Aggregates + +??? example "Experimental" + + This feature is still experimental and may change in the future. + Use it with caution. + +Another way to split an aggregate is to use child aggregates. +The difference to Micro Aggregates, child aggregates can only be accessed by the root aggregate +and are not separate aggregates. + +In the following example, we have an `Order` aggregate that has a `Shipping` child aggregate. + +```php +use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate; +use Patchlevel\EventSourcing\Attribute\Apply; + +final class Shipping extends BasicChildAggregate +{ + private bool $arrived = false; + + public function __construct( + private string $trackingId, + ) { + } + + public function arrive(): void + { + $this->recordThat(new Arrived()); + } + + #[Apply] + public function applyArrived(Arrived $event): void + { + $this->arrived = true; + } + + public function isArrived(): bool + { + return $this->arrived; + } +} +``` +!!! warning + + The apply method must be public, otherwise the root aggregate cannot call it. + +!!! note + + Supress missing apply methods need to be defined in the root aggregate. + +And the `Order` aggregate root looks like this: + +```php +use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot; +use Patchlevel\EventSourcing\Aggregate\Uuid; +use Patchlevel\EventSourcing\Attribute\Aggregate; +use Patchlevel\EventSourcing\Attribute\Apply; +use Patchlevel\EventSourcing\Attribute\ChildAggregate; +use Patchlevel\EventSourcing\Attribute\Id; + +#[Aggregate('order')] +final class Order extends BasicAggregateRoot +{ + #[Id] + private Uuid $id; + + #[ChildAggregate] + private Shipping $shipping; + + public static function create(Uuid $id, string $trackingId): static + { + $self = new static(); + $self->recordThat(new OrderCreated($id, $trackingId)); + + return $self; + } + + #[Apply] + public function applyOrderCreated(OrderCreated $event): void + { + $this->shipping = new Shipping($event->trackingId); + } + + public function arrive(): void + { + $this->shipping->arrive(); + } +} +``` ## Aggregate Root Registry The library needs to know about all aggregates so that the correct aggregate class is used to load from the database. From efb789aceb0d6e00a47354f6c7926866c02e9b5a Mon Sep 17 00:00:00 2001 From: David Badura Date: Wed, 18 Dec 2024 15:23:56 +0100 Subject: [PATCH 6/7] fix deptrac --- deptrac-baseline.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/deptrac-baseline.yaml b/deptrac-baseline.yaml index 97559a96c..fe42e99fd 100644 --- a/deptrac-baseline.yaml +++ b/deptrac-baseline.yaml @@ -10,5 +10,7 @@ deptrac: - Patchlevel\EventSourcing\Subscription\RunMode Patchlevel\EventSourcing\Attribute\Projector: - Patchlevel\EventSourcing\Subscription\RunMode + Patchlevel\EventSourcing\Attribute\Stream: + - Patchlevel\EventSourcing\Aggregate\AggregateRoot Patchlevel\EventSourcing\Attribute\Subscriber: - Patchlevel\EventSourcing\Subscription\RunMode From 5ccd50d370fa9c679ca7894924f194f93bff43f0 Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 20 Dec 2024 12:21:35 +0100 Subject: [PATCH 7/7] add more tests --- tests/Unit/Fixture/ProfileWithStream.php | 94 +++++++++++++++++++ .../AttributeAggregateMetadataFactoryTest.php | 13 +++ .../Unit/Repository/DefaultRepositoryTest.php | 74 +++++++++++++++ 3 files changed, 181 insertions(+) create mode 100644 tests/Unit/Fixture/ProfileWithStream.php diff --git a/tests/Unit/Fixture/ProfileWithStream.php b/tests/Unit/Fixture/ProfileWithStream.php new file mode 100644 index 000000000..7a29a69d2 --- /dev/null +++ b/tests/Unit/Fixture/ProfileWithStream.php @@ -0,0 +1,94 @@ +id; + } + + public function email(): Email + { + return $this->email; + } + + public function visited(): int + { + return $this->visits; + } + + public static function createProfile(ProfileId $id, Email $email): self + { + $self = new self(); + $self->recordThat(new ProfileCreated($id, $email)); + + return $self; + } + + public function publishMessage(Message $message): void + { + $this->recordThat(new MessagePublished( + $message, + )); + } + + public function deleteMessage(MessageId $messageId): void + { + $this->recordThat(new MessageDeleted( + $messageId, + )); + } + + public function visitProfile(ProfileId $profileId): void + { + $this->recordThat(new ProfileVisited($profileId)); + } + + public function splitIt(): void + { + $this->recordThat(new SplittingEvent($this->email, $this->visits)); + } + + #[Apply(ProfileCreated::class)] + #[Apply(ProfileVisited::class)] + protected function applyProfileCreated(ProfileCreated|ProfileVisited $event): void + { + if ($event instanceof ProfileCreated) { + $this->id = $event->profileId; + $this->email = $event->email; + + return; + } + + $this->visits++; + } + + #[Apply(NameChanged::class)] + protected function applyNameChanged(NameChanged|ProfileVisited $event): void + { + } + + #[Apply(SplittingEvent::class)] + protected function applySplittingEvent(SplittingEvent $event): void + { + $this->email = $event->email; + $this->visits = $event->visits; + } +} diff --git a/tests/Unit/Metadata/Aggregate/AttributeAggregateMetadataFactoryTest.php b/tests/Unit/Metadata/Aggregate/AttributeAggregateMetadataFactoryTest.php index 7bf5f6402..4a47588cc 100644 --- a/tests/Unit/Metadata/Aggregate/AttributeAggregateMetadataFactoryTest.php +++ b/tests/Unit/Metadata/Aggregate/AttributeAggregateMetadataFactoryTest.php @@ -18,6 +18,7 @@ use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileWithBrokenApplyMultipleApply; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileWithBrokenApplyNoType; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileWithEmptyApply; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileWithStream; use Patchlevel\EventSourcing\Tests\Unit\Fixture\SplittingEvent; use PHPUnit\Framework\TestCase; @@ -43,6 +44,9 @@ public function testProfile(): void [MessageDeleted::class => true], $metadata->suppressEvents, ); + + self::assertSame('profile-{id}', $metadata->streamName); + self::assertSame('profile-foo', $metadata->streamName('foo')); } public function testApplyWithNoEventClass(): void @@ -62,6 +66,15 @@ public function testApplyWithNoEventClass(): void self::assertSame([], $metadata->suppressEvents); } + public function streamName(): void + { + $metadataFactory = new AttributeAggregateRootMetadataFactory(); + $metadata = $metadataFactory->metadata(ProfileWithStream::class); + + self::assertSame('other-{id}', $metadata->streamName); + self::assertSame('other-foo', $metadata->streamName('foo')); + } + public function testBrokenApplyWithNoType(): void { $metadataFactory = new AttributeAggregateRootMetadataFactory(); diff --git a/tests/Unit/Repository/DefaultRepositoryTest.php b/tests/Unit/Repository/DefaultRepositoryTest.php index 300de0df7..b99af2bc1 100644 --- a/tests/Unit/Repository/DefaultRepositoryTest.php +++ b/tests/Unit/Repository/DefaultRepositoryTest.php @@ -27,7 +27,12 @@ use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; +use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; +use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; +use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; +use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Store\StreamStore; use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Profile; @@ -35,6 +40,7 @@ use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileWithSnapshot; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileWithStream; use PHPUnit\Framework\TestCase; use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; @@ -776,4 +782,72 @@ public function testLoadAggregateWithoutSnapshot(): void self::assertEquals(ProfileId::fromString('1'), $aggregate->id()); self::assertEquals(Email::fromString('hallo@patchlevel.de'), $aggregate->email()); } + + public function testSaveAggregateInOtherStream(): void + { + $store = $this->prophesize(Store::class); + $store->willImplement(StreamStore::class); + $store->save( + Argument::that(static function (Message $message) { + if ($message->header(StreamNameHeader::class)->streamName !== 'other-1') { + return false; + } + + return $message->header(PlayheadHeader::class)->playhead === 1; + }), + Argument::that(static function (Message $message) { + if ($message->header(StreamNameHeader::class)->streamName !== 'other-1') { + return false; + } + + return $message->header(PlayheadHeader::class)->playhead === 2; + }), + )->shouldBeCalled(); + + $repository = new DefaultRepository( + $store->reveal(), + ProfileWithStream::metadata(), + ); + + $aggregate = ProfileWithStream::createProfile( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ); + + $aggregate->visitProfile(ProfileId::fromString('2')); + + $repository->save($aggregate); + } + + public function testLoadAggregateFromOtherStream(): void + { + $store = $this->prophesize(Store::class); + $store->willImplement(StreamStore::class); + + $store->load(new Criteria( + new StreamCriterion('other-1'), + new ArchivedCriterion(false), + ))->willReturn(new ArrayStream([ + Message::create( + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ), + )->withHeader(new StreamNameHeader('other-1')) + ->withHeader(new PlayheadHeader(1)) + ->withHeader(new RecordedOnHeader(new DateTimeImmutable())), + ])); + + $repository = new DefaultRepository( + $store->reveal(), + ProfileWithStream::metadata(), + ); + + $aggregate = $repository->load(ProfileId::fromString('1')); + + self::assertInstanceOf(ProfileWithStream::class, $aggregate); + self::assertSame(1, $aggregate->playhead()); + self::assertEquals(ProfileId::fromString('1'), $aggregate->id()); + self::assertEquals(Email::fromString('hallo@patchlevel.de'), $aggregate->email()); + } }