diff --git a/src/EventBus/Message.php b/src/EventBus/Message.php index f020f4771..3236d14cc 100644 --- a/src/EventBus/Message.php +++ b/src/EventBus/Message.php @@ -60,7 +60,11 @@ public function event(): object return $this->event; } - /** @return class-string */ + /** + * @return class-string + * + * @throws HeaderNotFound + */ public function aggregateClass(): string { $value = $this->aggregateClass; @@ -81,6 +85,7 @@ public function withAggregateClass(string $value): self return $message; } + /** @throws HeaderNotFound */ public function aggregateId(): string { $value = $this->aggregateId; @@ -100,7 +105,11 @@ public function withAggregateId(string $value): self return $message; } - /** @return positive-int */ + /** + * @return positive-int + * + * @throws HeaderNotFound + */ public function playhead(): int { $value = $this->playhead; @@ -121,6 +130,7 @@ public function withPlayhead(int $value): self return $message; } + /** @throws HeaderNotFound */ public function recordedOn(): DateTimeImmutable { $value = $this->recordedOn; @@ -166,6 +176,7 @@ public function withArchived(bool $value): self return $message; } + /** @throws HeaderNotFound */ public function customHeader(string $name): mixed { if (array_keys($this->customHeaders, $name)) { diff --git a/src/Projection/Projectionist/Projectionist.php b/src/Projection/Projectionist/Projectionist.php index 99c580ecf..df2fb4737 100644 --- a/src/Projection/Projectionist/Projectionist.php +++ b/src/Projection/Projectionist/Projectionist.php @@ -9,13 +9,22 @@ interface Projectionist { + /** + * @throws ProjectionistError + * @throws ProjectorNotFound + */ public function boot( ProjectionCriteria $criteria = new ProjectionCriteria(), int|null $limit = null, bool $throwByError = false, ): void; - /** @param positive-int $limit */ + /** + * @param positive-int $limit + * + * @throws ProjectionistError + * @throws ProjectorNotFound + */ public function run( ProjectionCriteria $criteria = new ProjectionCriteria(), int|null $limit = null, diff --git a/src/Repository/AggregateAlreadyExists.php b/src/Repository/AggregateAlreadyExists.php new file mode 100644 index 000000000..b9cb48faf --- /dev/null +++ b/src/Repository/AggregateAlreadyExists.php @@ -0,0 +1,25 @@ + $aggregate */ + public function __construct(string $aggregate, AggregateRootId $id) + { + parent::__construct( + sprintf( + 'aggregate %s with id %s already exists', + $aggregate, + $id->toString(), + ), + ); + } +} diff --git a/src/Repository/AggregateOutdated.php b/src/Repository/AggregateOutdated.php new file mode 100644 index 000000000..d3d00b665 --- /dev/null +++ b/src/Repository/AggregateOutdated.php @@ -0,0 +1,25 @@ + $aggregate */ + public function __construct(string $aggregate, AggregateRootId $id) + { + parent::__construct( + sprintf( + 'Aggregate %s with id %s is outdated. There are new events in the store. Please reload the aggregate.', + $aggregate, + $id->toString(), + ), + ); + } +} diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index c6eb7fe28..de58dedfc 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -18,6 +18,7 @@ use Patchlevel\EventSourcing\Store\CriteriaBuilder; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Store\Stream; +use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Psr\Clock\ClockInterface; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -142,8 +143,9 @@ public function save(AggregateRoot $aggregate): void } $playhead = $aggregate->playhead() - $eventCount; + $newAggregate = $playhead === 0; - if (!isset($this->aggregateIsValid[$aggregate]) && $playhead !== 0) { + if (!isset($this->aggregateIsValid[$aggregate]) && !$newAggregate) { throw new AggregateUnknown($aggregate::class, $aggregate->aggregateRootId()); } @@ -176,8 +178,17 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator, $events, ); - $this->store->transactional(function () use ($messages): void { - $this->store->save(...$messages); + $this->store->transactional(function () use ($messages, $aggregate, $newAggregate): void { + try { + $this->store->save(...$messages); + } catch (UniqueConstraintViolation) { + if ($newAggregate) { + throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId()); + } + + throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId()); + } + $this->archive(...$messages); $this->eventBus->dispatch(...$messages); }); diff --git a/src/Repository/Repository.php b/src/Repository/Repository.php index 1ad8f36c4..91483a7b8 100644 --- a/src/Repository/Repository.php +++ b/src/Repository/Repository.php @@ -10,11 +10,24 @@ /** @template T of AggregateRoot */ interface Repository { - /** @return T */ + /** + * @return T + * + * @throws AggregateNotFound + */ public function load(AggregateRootId $id): AggregateRoot; public function has(AggregateRootId $id): bool; - /** @param T $aggregate */ + /** + * @param T $aggregate + * + * @throws WrongAggregate + * @throws AggregateDetached + * @throws AggregateUnknown + * @throws PlayheadMismatch + * @throws AggregateAlreadyExists + * @throws AggregateOutdated + */ public function save(AggregateRoot $aggregate): void; } diff --git a/src/Serializer/Encoder/Encoder.php b/src/Serializer/Encoder/Encoder.php index 6bcdbe6bf..ed13dd4c5 100644 --- a/src/Serializer/Encoder/Encoder.php +++ b/src/Serializer/Encoder/Encoder.php @@ -11,6 +11,8 @@ interface Encoder /** * @param array $data * @param array $options + * + * @throws EncodeNotPossible */ public function encode(array $data, array $options = []): string; @@ -18,6 +20,8 @@ public function encode(array $data, array $options = []): string; * @param array $options * * @return array + * + * @throws DecodeNotPossible */ public function decode(string $data, array $options = []): array; } diff --git a/src/Snapshot/SnapshotStore.php b/src/Snapshot/SnapshotStore.php index d6d14d339..89c53addb 100644 --- a/src/Snapshot/SnapshotStore.php +++ b/src/Snapshot/SnapshotStore.php @@ -10,6 +10,10 @@ interface SnapshotStore { + /** + * @throws SnapshotNotConfigured + * @throws AdapterNotFound + */ public function save(AggregateRoot $aggregateRoot): void; /** @@ -18,6 +22,9 @@ public function save(AggregateRoot $aggregateRoot): void; * @return T * * @throws SnapshotNotFound + * @throws SnapshotVersionInvalid + * @throws SnapshotNotConfigured + * @throws AdapterNotFound * * @template T of AggregateRoot */ diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 96e644020..0802f16ee 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -6,6 +6,7 @@ use Closure; use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Types\Type; @@ -183,7 +184,11 @@ function (Connection $connection) use ($messages): void { implode("),\n(", $placeholders), ); - $connection->executeStatement($query, $parameters, $types); + try { + $connection->executeStatement($query, $parameters, $types); + } catch (UniqueConstraintViolationException $e) { + throw new UniqueConstraintViolation($e); + } }, ); } diff --git a/src/Store/Store.php b/src/Store/Store.php index 1c3091d62..1297006ef 100644 --- a/src/Store/Store.php +++ b/src/Store/Store.php @@ -18,6 +18,10 @@ public function load( public function count(Criteria|null $criteria = null): int; + /** + * @throws MissingDataForStorage + * @throws UniqueConstraintViolation + */ public function save(Message ...$messages): void; /** diff --git a/src/Store/UniqueConstraintViolation.php b/src/Store/UniqueConstraintViolation.php new file mode 100644 index 000000000..24cbaa1f8 --- /dev/null +++ b/src/Store/UniqueConstraintViolation.php @@ -0,0 +1,15 @@ +save($aggregate); } + public function testDuplicate(): void + { + $this->expectException(AggregateAlreadyExists::class); + + $store = $this->prophesize(Store::class); + $store->save( + Argument::type(Message::class), + )->willThrow(new UniqueConstraintViolation()); + + $store->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]() + ); + + $eventBus = $this->prophesize(EventBus::class); + $eventBus->dispatch(Argument::type('object'))->shouldNotBeCalled(); + + $repository = new DefaultRepository( + $store->reveal(), + $eventBus->reveal(), + Profile::metadata(), + ); + + $aggregate = Profile::createProfile( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ); + + $repository->save($aggregate); + } + + public function testOutdated(): void + { + $this->expectException(AggregateOutdated::class); + + $store = $this->prophesize(Store::class); + + $store->save( + Argument::that(static function (Message $message) { + return $message->playhead() === 1; + }), + )->shouldBeCalled(); + + $store->save( + Argument::that(static function (Message $message) { + return $message->playhead() === 2; + }), + )->willThrow(new UniqueConstraintViolation()); + + $store->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]() + ); + + $eventBus = $this->prophesize(EventBus::class); + $eventBus->dispatch(Argument::type('object'))->shouldBeCalled(); + + $repository = new DefaultRepository( + $store->reveal(), + $eventBus->reveal(), + Profile::metadata(), + ); + + $aggregate = Profile::createProfile( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ); + + $repository->save($aggregate); + + $aggregate->visitProfile(ProfileId::fromString('2')); + + $repository->save($aggregate); + } + public function testSaveAggregateWithSplitStream(): void { $store = $this->prophesize(Store::class);