Skip to content

Commit

Permalink
poc replace child aggregates with micro aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 18, 2024
1 parent 471ad54 commit 13d9b9c
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 58 deletions.
18 changes: 7 additions & 11 deletions src/Aggregate/StreamNameTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Patchlevel\EventSourcing\Aggregate;

use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata;

use function str_replace;
use function strpos;
use function substr;

Expand All @@ -15,20 +18,13 @@ private function __construct()
}

/** @pure */
public static function streamName(string $aggregate, string $aggregateId): string
{
return $aggregate . '-' . $aggregateId;
}

public static function aggregateName(string $stream): string
public static function streamName(string|AggregateRootMetadata $aggregate, string $aggregateId): string
{
$pos = strpos($stream, '-');

if ($pos === false) {
throw new InvalidAggregateStreamName($stream);
if ($aggregate instanceof AggregateRootMetadata && $aggregate->streamName !== null) {

Check failure on line 23 in src/Aggregate/StreamNameTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

ImpurePropertyFetch

src/Aggregate/StreamNameTranslator.php:23:60: ImpurePropertyFetch: Cannot access a property on a mutable object from a pure context (see https://psalm.dev/234)
return str_replace('{id}', $aggregateId, $aggregate->streamName);

Check failure on line 24 in src/Aggregate/StreamNameTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

ImpurePropertyFetch

src/Aggregate/StreamNameTranslator.php:24:54: ImpurePropertyFetch: Cannot access a property on a mutable object from a pure context (see https://psalm.dev/234)
}

return substr($stream, 0, $pos);
return $aggregate . '-' . $aggregateId;

Check failure on line 27 in src/Aggregate/StreamNameTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Binary operation "." between Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata|string and '-' results in an error.

Check failure on line 27 in src/Aggregate/StreamNameTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

PossiblyInvalidOperand

src/Aggregate/StreamNameTranslator.php:27:16: PossiblyInvalidOperand: Cannot concatenate with a Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata<Patchlevel\EventSourcing\Aggregate\AggregateRoot>|string (see https://psalm.dev/163)
}

public static function aggregateId(string $stream): string
Expand Down
18 changes: 18 additions & 0 deletions src/Attribute/Stream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

#[Attribute(Attribute::TARGET_CLASS)]
final class Stream
{
/** @param string|class-string<AggregateRoot> $name */

Check failure on line 13 in src/Attribute/Stream.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Deptrac (locked, 8.3, ubuntu-latest)

Patchlevel\EventSourcing\Attribute\Stream must not depend on Patchlevel\EventSourcing\Aggregate\AggregateRoot (Attribute on Aggregate)
public function __construct(
public readonly string $name,
) {
}
}
1 change: 1 addition & 0 deletions src/Metadata/AggregateRoot/AggregateRootMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public function __construct(
public readonly Snapshot|null $snapshot,
/** @var list<string> */
public readonly array $childAggregates = [],
public readonly string|null $streamName = null,
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Patchlevel\EventSourcing\Attribute\ChildAggregate;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\Snapshot as AttributeSnapshot;
use Patchlevel\EventSourcing\Attribute\Stream;
use Patchlevel\EventSourcing\Attribute\SuppressMissingApply;
use ReflectionClass;
use ReflectionIntersectionType;
Expand Down Expand Up @@ -50,6 +51,7 @@ public function metadata(string $aggregate): AggregateRootMetadata
[$suppressEvents, $suppressAll] = $this->findSuppressMissingApply($reflectionClass);
$applyMethods = $this->findApplyMethods($reflectionClass, $aggregate, $childAggregates);
$snapshot = $this->findSnapshot($reflectionClass);
$streamName = $this->findStreamName($reflectionClass);

$metadata = new AggregateRootMetadata(
$aggregate,
Expand All @@ -60,6 +62,7 @@ public function metadata(string $aggregate): AggregateRootMetadata
$suppressAll,
$snapshot,
array_map(static fn (array $list) => $list[0], $childAggregates),
$streamName ?? $aggregateName . '-{id}',
);

$this->aggregateMetadata[$aggregate] = $metadata;
Expand Down Expand Up @@ -139,6 +142,23 @@ private function findSnapshot(ReflectionClass $reflector): Snapshot|null
);
}

private function findStreamName(ReflectionClass $reflector): string|null
{
$attributes = $reflector->getAttributes(Stream::class);

if ($attributes === []) {
return null;
}

$streamName = $attributes[0]->newInstance()->name;

if (class_exists($streamName) && is_a($streamName, AggregateRoot::class, true)) {
return $this->metadata($streamName)->streamName;
}

return $attributes[0]->newInstance()->name;
}

/** @return list<array{string, ReflectionClass}> */
private function findChildAggregates(ReflectionClass $reflector): array
{
Expand Down
9 changes: 4 additions & 5 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public function load(AggregateRootId $id): AggregateRoot

if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString()))
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->archived(false)
->build();
} else {
Expand Down Expand Up @@ -178,7 +178,7 @@ public function has(AggregateRootId $id): bool
{
if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString()))
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->build();
} else {
$criteria = (new CriteriaBuilder())
Expand Down Expand Up @@ -242,8 +242,7 @@ public function save(AggregateRoot $aggregate): void
$clock = $this->clock;

$aggregateName = $this->metadata->name;

$streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($aggregateName, $aggregateId) : null;
$streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($this->metadata, $aggregateId) : null;

$messages = array_map(
static function (object $event) use (
Expand Down Expand Up @@ -338,7 +337,7 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id):

if ($this->useStreamHeader) {
$criteria = (new CriteriaBuilder())
->streamName(StreamNameTranslator::streamName($this->metadata->name, $id->toString()))
->streamName(StreamNameTranslator::streamName($this->metadata, $id->toString()))
->fromPlayhead($aggregate->playhead())
->build();
} else {
Expand Down
53 changes: 32 additions & 21 deletions tests/Integration/ChildAggregate/ChildAggregateIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Snapshot\Adapter\InMemorySnapshotAdapter;
use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager;
Expand Down Expand Up @@ -39,7 +39,7 @@ public function tearDown(): void

public function testSuccessful(): void
{
$store = new DoctrineDbalStore(
$store = new StreamDoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
);
Expand All @@ -54,15 +54,19 @@ public function testSuccessful(): void

$manager = new RunSubscriptionEngineRepositoryManager(
new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
new AggregateRootRegistry([
'profile' => Profile::class,
'personal_information' => PersonalInformation::class,
]),
$store,
null,
null,
),
$engine,
);

$repository = $manager->get(Profile::class);
$profileRepository = $manager->get(Profile::class);
$personalInformationRepository = $manager->get(PersonalInformation::class);

$schemaDirector = new DoctrineSchemaDirector(
$this->connection,
Expand All @@ -74,8 +78,11 @@ public function testSuccessful(): void

$profileId = ProfileId::generate();
$profile = Profile::create($profileId, 'John');
$profile->changeName('Snow');
$repository->save($profile);
$profileRepository->save($profile);

$personalInformation = $personalInformationRepository->load($profileId);
$personalInformation->changeName('Snow');
$personalInformationRepository->save($personalInformation);

$result = $this->connection->fetchAssociative(
'SELECT * FROM projection_profile WHERE id = ?',
Expand All @@ -87,18 +94,18 @@ public function testSuccessful(): void
self::assertSame($profileId->toString(), $result['id']);
self::assertSame('Snow', $result['name']);

$repository = $manager->get(Profile::class);
$profile = $repository->load($profileId);
$profile = $profileRepository->load($profileId);
$personalInformation = $personalInformationRepository->load($profileId);

self::assertInstanceOf(Profile::class, $profile);
self::assertEquals($profileId, $profile->aggregateRootId());
self::assertSame(2, $profile->playhead());
self::assertSame('Snow', $profile->name());
self::assertSame('Snow', $personalInformation->name());
}

public function testSnapshot(): void
{
$store = new DoctrineDbalStore(
$store = new StreamDoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
);
Expand All @@ -113,15 +120,19 @@ public function testSnapshot(): void

$manager = new RunSubscriptionEngineRepositoryManager(
new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
new AggregateRootRegistry([
'profile' => Profile::class,
'personal_information' => PersonalInformation::class,
]),
$store,
null,
new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]),
),
$engine,
);

$repository = $manager->get(Profile::class);
$profileRepository = $manager->get(Profile::class);
$personalInformationRepository = $manager->get(PersonalInformation::class);

$schemaDirector = new DoctrineSchemaDirector(
$this->connection,
Expand All @@ -133,7 +144,7 @@ public function testSnapshot(): void

$profileId = ProfileId::generate();
$profile = Profile::create($profileId, 'John');
$repository->save($profile);
$profileRepository->save($profile);

$result = $this->connection->fetchAssociative(
'SELECT * FROM projection_profile WHERE id = ?',
Expand All @@ -145,22 +156,22 @@ public function testSnapshot(): void
self::assertSame($profileId->toString(), $result['id']);
self::assertSame('John', $result['name']);

$repository = $manager->get(Profile::class);

// create snapshot
$repository->load($profileId);
$profileRepository->load($profileId);
$personalInformationRepository->load($profileId);

// load from snapshot
$profile = $repository->load($profileId);
$personalInformation = $personalInformationRepository->load($profileId);

$profile->changeName('Snow');
$repository->save($profile);
$personalInformation->changeName('Snow');
$personalInformationRepository->save($personalInformation);

$profile = $repository->load($profileId);
$profile = $profileRepository->load($profileId);
$personalInformation = $personalInformationRepository->load($profileId);

self::assertInstanceOf(Profile::class, $profile);
self::assertEquals($profileId, $profile->aggregateRootId());
self::assertSame(2, $profile->playhead());
self::assertSame('Snow', $profile->name());
self::assertSame('Snow', $personalInformation->name());
}
}
25 changes: 19 additions & 6 deletions tests/Integration/ChildAggregate/PersonalInformation.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,32 @@

namespace Patchlevel\EventSourcing\Tests\Integration\ChildAggregate;

use Patchlevel\EventSourcing\Aggregate\BasicChildAggregate;
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\Stream;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated;

final class PersonalInformation extends BasicChildAggregate
#[Aggregate('personal_information')]
#[Stream(Profile::class)]
final class PersonalInformation extends BasicAggregateRoot
{
public function __construct(
private string $name,
) {
#[Id]
private ProfileId $id;

private string $name;

#[Apply(ProfileCreated::class)]
protected function applyProfileCreated(ProfileCreated $event): void
{
$this->id = $event->profileId;
$this->name = $event->name;
}

#[Apply(NameChanged::class)]
public function applyNameChanged(NameChanged $event): void
protected function applyNameChanged(NameChanged $event): void
{
$this->name = $event->name;
}
Expand Down
18 changes: 3 additions & 15 deletions tests/Integration/ChildAggregate/Profile.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
use Patchlevel\EventSourcing\Attribute\Aggregate;
use Patchlevel\EventSourcing\Attribute\Apply;
use Patchlevel\EventSourcing\Attribute\ChildAggregate;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Attribute\Snapshot;
use Patchlevel\EventSourcing\Attribute\SuppressMissingApply;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\NameChanged;
use Patchlevel\EventSourcing\Tests\Integration\ChildAggregate\Events\ProfileCreated;

#[Aggregate('profile')]
#[Snapshot('default', 1)]
#[SuppressMissingApply([NameChanged::class])]
final class Profile extends BasicAggregateRoot
{
#[Id]
private ProfileId $id;

#[ChildAggregate]
protected PersonalInformation $personalInformation;

public static function create(ProfileId $id, string $name): self
{
$self = new self();
Expand All @@ -34,16 +33,5 @@ public static function create(ProfileId $id, string $name): self
protected function applyProfileCreated(ProfileCreated $event): void
{
$this->id = $event->profileId;
$this->personalInformation = new PersonalInformation($event->name);
}

public function name(): string
{
return $this->personalInformation->name();
}

public function changeName(string $name): void
{
$this->personalInformation->changeName($name);
}
}

0 comments on commit 13d9b9c

Please sign in to comment.