diff --git a/docs/pages/event_bus.md b/docs/pages/event_bus.md index 045ca59f4..45f33f86e 100644 --- a/docs/pages/event_bus.md +++ b/docs/pages/event_bus.md @@ -51,7 +51,7 @@ use Patchlevel\EventSourcing\EventBus\Message; $message = Message::create(new NameChanged('foo')) // ... - ->withCustomHeader('application-id', 'app'); + ->withHeader('application-id', 'app'); ``` !!! note diff --git a/docs/pages/message_decorator.md b/docs/pages/message_decorator.md index c8b799043..258b515c1 100644 --- a/docs/pages/message_decorator.md +++ b/docs/pages/message_decorator.md @@ -74,7 +74,7 @@ final class OnSystemRecordedDecorator implements MessageDecorator { public function __invoke(Message $message): Message { - return $message->withCustomHeader('system', 'accounting_system'); + return $message->withHeader('system', 'accounting_system'); } } ``` diff --git a/src/EventBus/Decorator/SplitStreamDecorator.php b/src/EventBus/Decorator/SplitStreamDecorator.php index 04f3ae0e8..382ec8cec 100644 --- a/src/EventBus/Decorator/SplitStreamDecorator.php +++ b/src/EventBus/Decorator/SplitStreamDecorator.php @@ -19,10 +19,6 @@ public function __invoke(Message $message): Message $event = $message->event(); $metadata = $this->eventMetadataFactory->metadata($event::class); - if ($metadata->splitStream) { - return $message->withNewStreamStart(true); - } - - return $message; + return $message->withNewStreamStart($metadata->splitStream); } } diff --git a/src/EventBus/HeaderNotFound.php b/src/EventBus/HeaderNotFound.php index cf5367179..149133304 100644 --- a/src/EventBus/HeaderNotFound.php +++ b/src/EventBus/HeaderNotFound.php @@ -34,6 +34,16 @@ public static function recordedOn(): self return new self(Message::HEADER_RECORDED_ON); } + public static function newStreamStart(): self + { + return new self(Message::HEADER_NEW_STREAM_START); + } + + public static function archived(): self + { + return new self(Message::HEADER_ARCHIVED); + } + public static function custom(string $name): self { return new self($name); diff --git a/src/EventBus/Message.php b/src/EventBus/Message.php index 2c0f390a5..7c98c15c0 100644 --- a/src/EventBus/Message.php +++ b/src/EventBus/Message.php @@ -17,7 +17,8 @@ * playhead?: positive-int, * recordedOn?: DateTimeImmutable, * newStreamStart?: bool, - * archived?: bool + * archived?: bool, + * ... * } */ final class Message @@ -29,16 +30,8 @@ final class Message public const HEADER_ARCHIVED = 'archived'; public const HEADER_NEW_STREAM_START = 'newStreamStart'; - private string|null $aggregateName = null; - private string|null $aggregateId = null; - /** @var positive-int|null */ - private int|null $playhead = null; - private DateTimeImmutable|null $recordedOn = null; - private bool $newStreamStart = false; - private bool $archived = false; - - /** @var array */ - private array $customHeaders = []; + /** @var Headers */ + private array $headers = []; /** @param T $event */ public function __construct( @@ -58,6 +51,12 @@ public static function create(object $event): self return new self($event); } + /** @param Headers $headers */ + public static function createWithHeaders(object $event, array $headers): self + { + return self::create($event)->withHeaders($headers); + } + /** @return T */ public function event(): object { @@ -67,7 +66,7 @@ public function event(): object /** @throws HeaderNotFound */ public function aggregateName(): string { - $value = $this->aggregateName; + $value = $this->headers[self::HEADER_AGGREGATE_NAME] ?? null; if ($value === null) { throw HeaderNotFound::aggregateName(); @@ -78,16 +77,13 @@ public function aggregateName(): string public function withAggregateName(string $value): self { - $message = clone $this; - $message->aggregateName = $value; - - return $message; + return $this->withHeader(self::HEADER_AGGREGATE_NAME, $value); } /** @throws HeaderNotFound */ public function aggregateId(): string { - $value = $this->aggregateId; + $value = $this->headers[self::HEADER_AGGREGATE_ID] ?? null; if ($value === null) { throw HeaderNotFound::aggregateId(); @@ -98,10 +94,7 @@ public function aggregateId(): string public function withAggregateId(string $value): self { - $message = clone $this; - $message->aggregateId = $value; - - return $message; + return $this->withHeader(self::HEADER_AGGREGATE_ID, $value); } /** @@ -111,7 +104,7 @@ public function withAggregateId(string $value): self */ public function playhead(): int { - $value = $this->playhead; + $value = $this->headers[self::HEADER_PLAYHEAD] ?? null; if ($value === null) { throw HeaderNotFound::playhead(); @@ -123,16 +116,13 @@ public function playhead(): int /** @param positive-int $value */ public function withPlayhead(int $value): self { - $message = clone $this; - $message->playhead = $value; - - return $message; + return $this->withHeader(self::HEADER_PLAYHEAD, $value); } /** @throws HeaderNotFound */ public function recordedOn(): DateTimeImmutable { - $value = $this->recordedOn; + $value = $this->headers[self::HEADER_RECORDED_ON] ?? null; if ($value === null) { throw HeaderNotFound::recordedOn(); @@ -143,136 +133,88 @@ public function recordedOn(): DateTimeImmutable public function withRecordedOn(DateTimeImmutable $value): self { - $message = clone $this; - $message->recordedOn = $value; - - return $message; + return $this->withHeader(self::HEADER_RECORDED_ON, $value); } public function newStreamStart(): bool { - return $this->newStreamStart; + $value = $this->headers[self::HEADER_NEW_STREAM_START] ?? null; + + if ($value === null) { + throw HeaderNotFound::newStreamStart(); + } + + return $value; } public function withNewStreamStart(bool $value): self { - $message = clone $this; - $message->newStreamStart = $value; - - return $message; + return $this->withHeader(self::HEADER_NEW_STREAM_START, $value); } public function archived(): bool { - return $this->archived; + $value = $this->headers[self::HEADER_ARCHIVED] ?? null; + + if ($value === null) { + throw HeaderNotFound::archived(); + } + + return $value; } public function withArchived(bool $value): self { - $message = clone $this; - $message->archived = $value; - - return $message; + return $this->withHeader(self::HEADER_ARCHIVED, $value); } /** @throws HeaderNotFound */ - public function customHeader(string $name): mixed + public function header(string $name): mixed { - if (!array_key_exists($name, $this->customHeaders)) { + if (!array_key_exists($name, $this->headers)) { throw HeaderNotFound::custom($name); } - return $this->customHeaders[$name]; + return $this->headers[$name]; } - public function withCustomHeader(string $name, mixed $value): self + public function withHeader(string $name, mixed $value): self { $message = clone $this; - $message->customHeaders[$name] = $value; + $message->headers[$name] = $value; return $message; } - /** @return array */ - public function customHeaders(): array + /** @return Headers */ + public function headers(): array { - return $this->customHeaders; + return $this->headers; } - /** @param array $headers */ - public function withCustomHeaders(array $headers): self + /** @param Headers $headers */ + public function withHeaders(array $headers): self { $message = clone $this; - $message->customHeaders += $headers; + $message->headers += $headers; return $message; } - /** @return Headers */ - public function headers(): array - { - $headers = $this->customHeaders; - - if ($this->aggregateName !== null) { - $headers[self::HEADER_AGGREGATE_NAME] = $this->aggregateName; - } - - if ($this->aggregateId !== null) { - $headers[self::HEADER_AGGREGATE_ID] = $this->aggregateId; - } - - if ($this->playhead !== null) { - $headers[self::HEADER_PLAYHEAD] = $this->playhead; - } - - if ($this->recordedOn !== null) { - $headers[self::HEADER_RECORDED_ON] = $this->recordedOn; - } - - $headers[self::HEADER_NEW_STREAM_START] = $this->newStreamStart; - $headers[self::HEADER_ARCHIVED] = $this->archived; - - return $headers; - } - - /** @param Headers $headers */ - public static function createWithHeaders(object $event, array $headers): self + /** @return array */ + public function customHeaders(): array { - $message = self::create($event); - - if (array_key_exists(self::HEADER_AGGREGATE_NAME, $headers)) { - $message = $message->withAggregateName($headers[self::HEADER_AGGREGATE_NAME]); - } - - if (array_key_exists(self::HEADER_AGGREGATE_ID, $headers)) { - $message = $message->withAggregateId($headers[self::HEADER_AGGREGATE_ID]); - } - - if (array_key_exists(self::HEADER_PLAYHEAD, $headers)) { - $message = $message->withPlayhead($headers[self::HEADER_PLAYHEAD]); - } - - if (array_key_exists(self::HEADER_RECORDED_ON, $headers)) { - $message = $message->withRecordedOn($headers[self::HEADER_RECORDED_ON]); - } - - if (array_key_exists(self::HEADER_NEW_STREAM_START, $headers)) { - $message = $message->withNewStreamStart($headers[self::HEADER_NEW_STREAM_START]); - } - - if (array_key_exists(self::HEADER_ARCHIVED, $headers)) { - $message = $message->withArchived($headers[self::HEADER_ARCHIVED]); - } + $headers = $this->headers; unset( $headers[self::HEADER_AGGREGATE_NAME], $headers[self::HEADER_AGGREGATE_ID], $headers[self::HEADER_PLAYHEAD], $headers[self::HEADER_RECORDED_ON], - $headers[self::HEADER_NEW_STREAM_START], $headers[self::HEADER_ARCHIVED], + $headers[self::HEADER_NEW_STREAM_START], ); - return $message->withCustomHeaders($headers); + return $headers; } } diff --git a/src/Outbox/DoctrineOutboxStore.php b/src/Outbox/DoctrineOutboxStore.php index b24e53346..2f1db03b5 100644 --- a/src/Outbox/DoctrineOutboxStore.php +++ b/src/Outbox/DoctrineOutboxStore.php @@ -79,7 +79,7 @@ function (array $data) use ($platform) { ->withAggregateId($data['aggregate_id']) ->withPlayhead(DoctrineHelper::normalizePlayhead($data['playhead'], $platform)) ->withRecordedOn(DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform)) - ->withCustomHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform)); + ->withHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform)); }, $result, ); diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index 2f8d8f800..ce475ba4c 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -9,6 +9,7 @@ use Patchlevel\EventSourcing\Clock\SystemClock; use Patchlevel\EventSourcing\EventBus\Decorator\MessageDecorator; use Patchlevel\EventSourcing\EventBus\EventBus; +use Patchlevel\EventSourcing\EventBus\HeaderNotFound; use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata; use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound; @@ -222,7 +223,9 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $ ->withAggregateName($aggregateName) ->withAggregateId($aggregateId) ->withPlayhead(++$playhead) - ->withRecordedOn($clock->now()); + ->withRecordedOn($clock->now()) + ->withArchived(false) + ->withNewStreamStart(false); if ($messageDecorator) { return $messageDecorator($message); @@ -371,7 +374,13 @@ private function archive(Message ...$messages): void $lastMessageWithNewStreamStart = null; foreach ($messages as $message) { - if (!$message->newStreamStart()) { + try { + $newStreamStart = $message->newStreamStart(); + } catch (HeaderNotFound) { + continue; + } + + if (!$newStreamStart) { continue; } diff --git a/src/Store/DoctrineDbalStoreStream.php b/src/Store/DoctrineDbalStoreStream.php index 45c3ca979..fdfbd255b 100644 --- a/src/Store/DoctrineDbalStoreStream.php +++ b/src/Store/DoctrineDbalStoreStream.php @@ -103,7 +103,9 @@ private function buildGenerator( ->withAggregateId($data['aggregate_id']) ->withPlayhead(DoctrineHelper::normalizePlayhead($data['playhead'], $platform)) ->withRecordedOn(DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform)) - ->withCustomHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform)); + ->withArchived(DoctrineHelper::normalizeArchived($data['archived'], $platform)) + ->withNewStreamStart(DoctrineHelper::normalizeNewStreamStart($data['new_stream_start'], $platform)) + ->withHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform)); } } } diff --git a/src/Store/DoctrineHelper.php b/src/Store/DoctrineHelper.php index a37e9cbe3..30e766f6c 100644 --- a/src/Store/DoctrineHelper.php +++ b/src/Store/DoctrineHelper.php @@ -48,4 +48,26 @@ public static function normalizeCustomHeaders(string $customHeaders, AbstractPla return $normalizedCustomHeaders; } + + public static function normalizeArchived(mixed $value, AbstractPlatform $platform): bool + { + $normalizedValue = Type::getType(Types::BOOLEAN)->convertToPHPValue($value, $platform); + + if (!is_bool($normalizedValue)) { + throw new InvalidType('archived', 'boolean'); + } + + return $normalizedValue; + } + + public static function normalizeNewStreamStart(mixed $value, AbstractPlatform $platform): bool + { + $normalizedValue = Type::getType(Types::BOOLEAN)->convertToPHPValue($value, $platform); + + if (!is_bool($normalizedValue)) { + throw new InvalidType('new_stream_start', 'boolean'); + } + + return $normalizedValue; + } } diff --git a/tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php b/tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php index 2ca0baaf2..08862446e 100644 --- a/tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php +++ b/tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php @@ -11,6 +11,6 @@ final class FooMessageDecorator implements MessageDecorator { public function __invoke(Message $message): Message { - return $message->withCustomHeader('foo', 'bar'); + return $message->withHeader('foo', 'bar'); } } diff --git a/tests/Unit/EventBus/Decorator/SplitStreamDecoratorTest.php b/tests/Unit/EventBus/Decorator/SplitStreamDecoratorTest.php index c77843e5f..6f4fcdaff 100644 --- a/tests/Unit/EventBus/Decorator/SplitStreamDecoratorTest.php +++ b/tests/Unit/EventBus/Decorator/SplitStreamDecoratorTest.php @@ -29,9 +29,9 @@ public function testWithoutSplittingStream(): void ); $decorator = new SplitStreamDecorator(new AttributeEventMetadataFactory()); - $decorator($message); + $decoratedMessage = $decorator($message); - self::assertFalse($message->newStreamStart()); + self::assertFalse($decoratedMessage->newStreamStart()); } public function testSplittingStream(): void diff --git a/tests/Unit/EventBus/MessageTest.php b/tests/Unit/EventBus/MessageTest.php index 0cb1901c5..bcfbc195b 100644 --- a/tests/Unit/EventBus/MessageTest.php +++ b/tests/Unit/EventBus/MessageTest.php @@ -71,9 +71,9 @@ public function testCreateMessageWithCustomHeader(): void { $message = Message::create(new class { }) - ->withCustomHeader('custom-field', 'foo-bar'); + ->withHeader('custom-field', 'foo-bar'); - self::assertEquals('foo-bar', $message->customHeader('custom-field')); + self::assertEquals('foo-bar', $message->header('custom-field')); self::assertEquals( ['custom-field' => 'foo-bar'], $message->customHeaders(), @@ -84,9 +84,9 @@ public function testCreateMessageWithCustomHeaders(): void { $message = Message::create(new class { }) - ->withCustomHeaders(['custom-field' => 'foo-bar']); + ->withHeaders(['custom-field' => 'foo-bar']); - self::assertEquals('foo-bar', $message->customHeader('custom-field')); + self::assertEquals('foo-bar', $message->header('custom-field')); self::assertEquals( ['custom-field' => 'foo-bar'], $message->customHeaders(), @@ -127,13 +127,7 @@ public function testEmptyAllHeaders(): void $message = Message::create(new class { }); - self::assertSame( - [ - 'newStreamStart' => false, - 'archived' => false, - ], - $message->headers(), - ); + self::assertSame([], $message->headers()); } public function testAllHeaders(): void @@ -148,17 +142,17 @@ public function testAllHeaders(): void ->withRecordedOn($recordedAt) ->withArchived(true) ->withNewStreamStart(true) - ->withCustomHeader('foo', 'bar'); + ->withHeader('foo', 'bar'); self::assertSame( [ - 'foo' => 'bar', 'aggregateName' => 'profile', 'aggregateId' => '1', 'playhead' => 3, 'recordedOn' => $recordedAt, - 'newStreamStart' => true, 'archived' => true, + 'newStreamStart' => true, + 'foo' => 'bar', ], $message->headers(), ); @@ -169,13 +163,7 @@ public function testCreateWithEmptyHeaders(): void $message = Message::createWithHeaders(new class { }, []); - self::assertSame( - [ - 'newStreamStart' => false, - 'archived' => false, - ], - $message->headers(), - ); + self::assertSame([], $message->headers()); } public function testCreateWithAllHeaders(): void @@ -236,6 +224,6 @@ public function testCustomHeaderNotFound(): void $this->expectException(HeaderNotFound::class); /** @psalm-suppress UnusedMethodCall */ - $message->customHeader('foo'); + $message->header('foo'); } } diff --git a/tests/Unit/Outbox/DoctrineOutboxStoreTest.php b/tests/Unit/Outbox/DoctrineOutboxStoreTest.php index 1929f1d54..d0196813e 100644 --- a/tests/Unit/Outbox/DoctrineOutboxStoreTest.php +++ b/tests/Unit/Outbox/DoctrineOutboxStoreTest.php @@ -191,9 +191,7 @@ public function testRetrieveOutboxMessages(): void ->withAggregateName('profile') ->withAggregateId('1') ->withPlayhead(1) - ->withRecordedOn($recordedOn) - ->withNewStreamStart(false) - ->withArchived(false); + ->withRecordedOn($recordedOn); $queryBuilder = $this->prophesize(QueryBuilder::class); $queryBuilder->select('*')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); diff --git a/tests/Unit/Repository/DefaultRepositoryTest.php b/tests/Unit/Repository/DefaultRepositoryTest.php index 11da91146..24f795eba 100644 --- a/tests/Unit/Repository/DefaultRepositoryTest.php +++ b/tests/Unit/Repository/DefaultRepositoryTest.php @@ -211,7 +211,7 @@ public function testDecorator(): void return false; } - if ($message->customHeader('test') !== 'foo') { + if ($message->header('test') !== 'foo') { return false; } @@ -235,7 +235,7 @@ public function testDecorator(): void return false; } - if ($message->customHeader('test') !== 'foo') { + if ($message->header('test') !== 'foo') { return false; } @@ -246,7 +246,7 @@ public function testDecorator(): void $decorator = new class implements MessageDecorator { public function __invoke(Message $message): Message { - return $message->withCustomHeader('test', 'foo'); + return $message->withHeader('test', 'foo'); } }; diff --git a/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php b/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php index 23df1a614..ab5802465 100644 --- a/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php +++ b/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php @@ -38,7 +38,7 @@ public function testSerialize(): void $content = $nativeSerializer->serialize($message); - self::assertEquals('YTozOntzOjU6ImV2ZW50IjtzOjE1OiJwcm9maWxlLnZpc2l0ZWQiO3M6NzoicGF5bG9hZCI7czoyMDoieyJwcm9maWxlSWQiOiAiZm9vIn0iO3M6NzoiaGVhZGVycyI7YTozOntzOjEwOiJyZWNvcmRlZE9uIjtPOjE3OiJEYXRlVGltZUltbXV0YWJsZSI6Mzp7czo0OiJkYXRlIjtzOjI2OiIyMDIwLTAxLTAxIDIwOjAwOjAwLjAwMDAwMCI7czoxMzoidGltZXpvbmVfdHlwZSI7aToxO3M6ODoidGltZXpvbmUiO3M6NjoiKzAxOjAwIjt9czoxNDoibmV3U3RyZWFtU3RhcnQiO2I6MDtzOjg6ImFyY2hpdmVkIjtiOjA7fX0=', $content); + self::assertEquals('YTozOntzOjU6ImV2ZW50IjtzOjE1OiJwcm9maWxlLnZpc2l0ZWQiO3M6NzoicGF5bG9hZCI7czoyMDoieyJwcm9maWxlSWQiOiAiZm9vIn0iO3M6NzoiaGVhZGVycyI7YToxOntzOjEwOiJyZWNvcmRlZE9uIjtPOjE3OiJEYXRlVGltZUltbXV0YWJsZSI6Mzp7czo0OiJkYXRlIjtzOjI2OiIyMDIwLTAxLTAxIDIwOjAwOjAwLjAwMDAwMCI7czoxMzoidGltZXpvbmVfdHlwZSI7aToxO3M6ODoidGltZXpvbmUiO3M6NjoiKzAxOjAwIjt9fX0=', $content); } public function testDeserialize(): void