diff --git a/baseline.xml b/baseline.xml index 258e72349..6f24145b1 100644 --- a/baseline.xml +++ b/baseline.xml @@ -28,6 +28,11 @@ Headers + + + + + $aggregate::metadata() diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 31afe8e31..9f953bade 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -11,6 +11,7 @@ xsi:noNamespaceSchemaLocation="/vendor/phpunit/phpunit/phpunit.xsd" cacheDirectory=".phpunit.cache" requireCoverageMetadata="false" + displayDetailsOnTestsThatTriggerWarnings="true" > diff --git a/src/Console/InvalidArgumentGiven.php b/src/Console/InvalidArgumentGiven.php index 0e4a76560..9ad39b202 100644 --- a/src/Console/InvalidArgumentGiven.php +++ b/src/Console/InvalidArgumentGiven.php @@ -6,7 +6,7 @@ use InvalidArgumentException; -use function gettype; +use function get_debug_type; use function sprintf; final class InvalidArgumentGiven extends InvalidArgumentException @@ -19,7 +19,7 @@ public function __construct( sprintf( 'Invalid argument given: need type "%s" got "%s"', $need, - gettype($value), + get_debug_type($value), ), ); } diff --git a/src/Console/OutputStyle.php b/src/Console/OutputStyle.php index 8e5ad0a5f..69070ca3d 100644 --- a/src/Console/OutputStyle.php +++ b/src/Console/OutputStyle.php @@ -11,6 +11,8 @@ use Symfony\Component\Console\Style\SymfonyStyle; use Throwable; +use function array_keys; +use function array_values; use function sprintf; final class OutputStyle extends SymfonyStyle @@ -18,25 +20,34 @@ final class OutputStyle extends SymfonyStyle public function message(EventSerializer $serializer, Message $message): void { $event = $message->event(); - $data = $serializer->serialize($event, [Encoder::OPTION_PRETTY_PRINT => true]); + + try { + $data = $serializer->serialize($event, [Encoder::OPTION_PRETTY_PRINT => true]); + } catch (Throwable $error) { + $this->error( + sprintf( + 'Error while serializing event "%s": %s', + $message->event()::class, + $error->getMessage(), + ), + ); + + if ($this->isVeryVerbose()) { + $this->throwable($error); + } + + return; + } $this->title($data->name); - $this->horizontalTable([ - 'eventClass', - 'aggregateName', - 'aggregateId', - 'playhead', - 'recordedOn', - ], [ - [ - $event::class, - $message->aggregateName(), - $message->aggregateId(), - $message->playhead(), - $message->recordedOn()->format(DateTimeInterface::ATOM), - ], - ]); + $headers = $message->headers(); + + if (isset($headers['recordedOn']) && $headers['recordedOn'] instanceof DateTimeInterface) { + $headers['recordedOn'] = $headers['recordedOn']->format(DateTimeInterface::ATOM); + } + + $this->horizontalTable(array_keys($headers), [array_values($headers)]); $this->block($data->payload); } diff --git a/src/EventBus/Serializer/DeserializeFailed.php b/src/EventBus/Serializer/DeserializeFailed.php new file mode 100644 index 000000000..0a7aea809 --- /dev/null +++ b/src/EventBus/Serializer/DeserializeFailed.php @@ -0,0 +1,28 @@ +eventSerializer->serialize($message->event()); + + return base64_encode( + serialize( + [ + 'serializedEvent' => $serializedEvent, + 'headers' => $message->headers(), + ], + ), + ); + } + + public function deserialize(string $content): Message + { + $decodedString = base64_decode($content, true); + + if (!is_string($decodedString)) { + throw DeserializeFailed::decodeFailed(); + } + + $data = unserialize( + $decodedString, + [ + 'allowed_classes' => [ + SerializedEvent::class, + DateTimeImmutable::class, + ], + ], + ); + + if ( + !is_array($data) + || !isset($data['serializedEvent'], $data['headers']) + || !$data['serializedEvent'] instanceof SerializedEvent + || !is_array($data['headers']) + ) { + throw DeserializeFailed::invalidData($data); + } + + $event = $this->eventSerializer->deserialize($data['serializedEvent']); + + return Message::createWithHeaders($event, $data['headers']); + } +} diff --git a/src/WatchServer/MessageSerializer.php b/src/EventBus/Serializer/MessageSerializer.php similarity index 80% rename from src/WatchServer/MessageSerializer.php rename to src/EventBus/Serializer/MessageSerializer.php index 3787051fc..0cfca701d 100644 --- a/src/WatchServer/MessageSerializer.php +++ b/src/EventBus/Serializer/MessageSerializer.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Patchlevel\EventSourcing\WatchServer; +namespace Patchlevel\EventSourcing\EventBus\Serializer; use Patchlevel\EventSourcing\EventBus\Message; diff --git a/src/EventBus/Serializer/PhpNativeMessageSerializer.php b/src/EventBus/Serializer/PhpNativeMessageSerializer.php new file mode 100644 index 000000000..d2791d044 --- /dev/null +++ b/src/EventBus/Serializer/PhpNativeMessageSerializer.php @@ -0,0 +1,41 @@ + true], + ); + + if (!$message instanceof Message) { + throw DeserializeFailed::invalidData($message); + } + + return $message; + } +} diff --git a/src/Outbox/DoctrineOutboxStore.php b/src/Outbox/DoctrineOutboxStore.php index b24e53346..a722211c9 100644 --- a/src/Outbox/DoctrineOutboxStore.php +++ b/src/Outbox/DoctrineOutboxStore.php @@ -7,11 +7,10 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Types\Types; +use Patchlevel\EventSourcing\EventBus\HeaderNotFound; use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer; use Patchlevel\EventSourcing\Schema\SchemaConfigurator; -use Patchlevel\EventSourcing\Serializer\EventSerializer; -use Patchlevel\EventSourcing\Serializer\SerializedEvent; -use Patchlevel\EventSourcing\Store\DoctrineHelper; use Patchlevel\EventSourcing\Store\WrongQueryResult; use function array_map; @@ -20,9 +19,11 @@ final class DoctrineOutboxStore implements OutboxStore, SchemaConfigurator { + public const HEADER_OUTBOX_IDENTIFIER = 'outboxIdentifier'; + public function __construct( private readonly Connection $connection, - private readonly EventSerializer $serializer, + private readonly MessageSerializer $messageSerializer, private readonly string $outboxTable = 'outbox', ) { } @@ -32,24 +33,10 @@ public function saveOutboxMessage(Message ...$messages): void $this->connection->transactional( function (Connection $connection) use ($messages): void { foreach ($messages as $message) { - $event = $message->event(); - - $data = $this->serializer->serialize($event); - $connection->insert( $this->outboxTable, [ - 'aggregate' => $message->aggregateName(), - 'aggregate_id' => $message->aggregateId(), - 'playhead' => $message->playhead(), - 'event' => $data->name, - 'payload' => $data->payload, - 'recorded_on' => $message->recordedOn(), - 'custom_headers' => $message->customHeaders(), - ], - [ - 'recorded_on' => Types::DATETIMETZ_IMMUTABLE, - 'custom_headers' => Types::JSON, + 'message' => $this->messageSerializer->serialize($message), ], ); } @@ -66,20 +53,14 @@ public function retrieveOutboxMessages(int|null $limit = null): array ->setMaxResults($limit) ->getSQL(); - /** @var list $result */ + /** @var list $result */ $result = $this->connection->fetchAllAssociative($sql); - $platform = $this->connection->getDatabasePlatform(); return array_map( - function (array $data) use ($platform) { - $event = $this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload'])); - - return Message::create($event) - ->withAggregateName($data['aggregate']) - ->withAggregateId($data['aggregate_id']) - ->withPlayhead(DoctrineHelper::normalizePlayhead($data['playhead'], $platform)) - ->withRecordedOn(DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform)) - ->withCustomHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform)); + function (array $data) { + $message = $this->messageSerializer->deserialize($data['message']); + + return $message->withCustomHeader(self::HEADER_OUTBOX_IDENTIFIER, $data['id']); }, $result, ); @@ -90,14 +71,8 @@ public function markOutboxMessageConsumed(Message ...$messages): void $this->connection->transactional( function (Connection $connection) use ($messages): void { foreach ($messages as $message) { - $connection->delete( - $this->outboxTable, - [ - 'aggregate' => $message->aggregateName(), - 'aggregate_id' => $message->aggregateId(), - 'playhead' => $message->playhead(), - ], - ); + $id = $this->extractId($message); + $connection->delete($this->outboxTable, ['id' => $id]); } }, ); @@ -123,24 +98,28 @@ public function configureSchema(Schema $schema, Connection $connection): void { $table = $schema->createTable($this->outboxTable); - $table->addColumn('aggregate', Types::STRING) - ->setLength(255) - ->setNotnull(true); - $table->addColumn('aggregate_id', Types::STRING) - ->setLength(32) - ->setNotnull(true); - $table->addColumn('playhead', Types::INTEGER) - ->setNotnull(true); - $table->addColumn('event', Types::STRING) - ->setLength(255) - ->setNotnull(true); - $table->addColumn('payload', Types::JSON) - ->setNotnull(true); - $table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE) - ->setNotnull(false); - $table->addColumn('custom_headers', Types::JSON) - ->setNotnull(true); - - $table->setPrimaryKey(['aggregate', 'aggregate_id', 'playhead']); + $table->addColumn('id', Types::INTEGER) + ->setNotnull(true) + ->setAutoincrement(true); + $table->addColumn('message', Types::STRING) + ->setNotnull(true) + ->setLength(16_000); + + $table->setPrimaryKey(['id']); + } + + private function extractId(Message $message): int + { + try { + $value = $message->customHeader(self::HEADER_OUTBOX_IDENTIFIER); + } catch (HeaderNotFound) { + throw OutboxHeaderIssue::missingHeader(self::HEADER_OUTBOX_IDENTIFIER); + } + + if (!is_int($value)) { + throw OutboxHeaderIssue::invalidHeaderType($value); + } + + return $value; } } diff --git a/src/Outbox/OutboxHeaderIssue.php b/src/Outbox/OutboxHeaderIssue.php new file mode 100644 index 000000000..0a755d51b --- /dev/null +++ b/src/Outbox/OutboxHeaderIssue.php @@ -0,0 +1,23 @@ +event(); - $serializedEvent = $this->serializer->serialize($event); - - $data = [ - 'event' => $serializedEvent->name, - 'payload' => $serializedEvent->payload, - 'headers' => $message->headers(), - ]; - - return base64_encode(serialize($data)); - } - - public function deserialize(string $content): Message - { - /** @var array{event: class-string, payload: string, headers: Headers} $data */ - $data = unserialize( - base64_decode($content), - ['allowed_classes' => [DateTimeImmutable::class]], - ); - - return Message::createWithHeaders( - $this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload'])), - $data['headers'], - ); - } -} diff --git a/src/WatchServer/SocketWatchServer.php b/src/WatchServer/SocketWatchServer.php index e8b9f1c99..1f4862abc 100644 --- a/src/WatchServer/SocketWatchServer.php +++ b/src/WatchServer/SocketWatchServer.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\WatchServer; use Closure; +use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use RuntimeException; diff --git a/src/WatchServer/SocketWatchServerClient.php b/src/WatchServer/SocketWatchServerClient.php index 1a2e5b34e..6ea61f55e 100644 --- a/src/WatchServer/SocketWatchServerClient.php +++ b/src/WatchServer/SocketWatchServerClient.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\WatchServer; use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer; use function fclose; use function restore_error_handler; diff --git a/src/WatchServer/WatchEventBus.php b/src/WatchServer/WatchEventBus.php new file mode 100644 index 000000000..82a83d7c8 --- /dev/null +++ b/src/WatchServer/WatchEventBus.php @@ -0,0 +1,27 @@ +watchServerClient->send($message); + } + } catch (SendingFailed) { + // to nothing + } + } +} diff --git a/src/WatchServer/WatchListener.php b/src/WatchServer/WatchListener.php deleted file mode 100644 index b2a861ced..000000000 --- a/src/WatchServer/WatchListener.php +++ /dev/null @@ -1,26 +0,0 @@ -client->send($message); - } catch (SendingFailed) { - // to nothing - } - } -} diff --git a/tests/Integration/Outbox/OutboxTest.php b/tests/Integration/Outbox/OutboxTest.php index 6146901b0..97f412c66 100644 --- a/tests/Integration/Outbox/OutboxTest.php +++ b/tests/Integration/Outbox/OutboxTest.php @@ -7,6 +7,7 @@ use Doctrine\DBAL\Connection; use Patchlevel\EventSourcing\EventBus\ChainEventBus; use Patchlevel\EventSourcing\EventBus\DefaultConsumer; +use Patchlevel\EventSourcing\EventBus\Serializer\PhpNativeMessageSerializer; use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore; use Patchlevel\EventSourcing\Outbox\EventBusPublisher; use Patchlevel\EventSourcing\Outbox\OutboxEventBus; @@ -58,7 +59,7 @@ public function testSuccessful(): void $outboxStore = new DoctrineOutboxStore( $this->connection, - $serializer, + new PhpNativeMessageSerializer(), 'outbox', ); diff --git a/tests/Unit/Console/InputHelperTest.php b/tests/Unit/Console/InputHelperTest.php index 44610b1da..66f1113f3 100644 --- a/tests/Unit/Console/InputHelperTest.php +++ b/tests/Unit/Console/InputHelperTest.php @@ -19,7 +19,7 @@ public function testValidString(): void public function testInvalidString(): void { $this->expectException(InvalidArgumentGiven::class); - $this->expectExceptionMessage('Invalid argument given: need type "string" got "integer"'); + $this->expectExceptionMessage('Invalid argument given: need type "string" got "int"'); InputHelper::string(1); } @@ -37,7 +37,7 @@ public function testValidNullableStringIsNull(): void public function testInvalidNullableString(): void { $this->expectException(InvalidArgumentGiven::class); - $this->expectExceptionMessage('Invalid argument given: need type "string|null" got "integer"'); + $this->expectExceptionMessage('Invalid argument given: need type "string|null" got "int"'); InputHelper::nullableString(1); } @@ -50,7 +50,7 @@ public function testValidBoolean(): void public function testInvalidBoolean(): void { $this->expectException(InvalidArgumentGiven::class); - $this->expectExceptionMessage('Invalid argument given: need type "bool" got "integer"'); + $this->expectExceptionMessage('Invalid argument given: need type "bool" got "int"'); InputHelper::bool(1); } @@ -68,7 +68,7 @@ public function testValidNullInt(): void public function testInvalidInt(): void { $this->expectException(InvalidArgumentGiven::class); - $this->expectExceptionMessage('Invalid argument given: need type "int|null" got "boolean"'); + $this->expectExceptionMessage('Invalid argument given: need type "int|null" got "bool"'); InputHelper::nullableInt(true); } diff --git a/tests/Unit/Console/OutputStyleTest.php b/tests/Unit/Console/OutputStyleTest.php index 4b6dc6df0..d2a29f02c 100644 --- a/tests/Unit/Console/OutputStyleTest.php +++ b/tests/Unit/Console/OutputStyleTest.php @@ -15,6 +15,7 @@ use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use PHPUnit\Framework\TestCase; use Prophecy\PhpUnit\ProphecyTrait; +use RuntimeException; use Symfony\Component\Console\Input\ArrayInput; use Symfony\Component\Console\Output\BufferedOutput; @@ -23,7 +24,7 @@ final class OutputStyleTest extends TestCase { use ProphecyTrait; - public function testWrite(): void + public function testMessage(): void { $input = new ArrayInput([]); $output = new BufferedOutput(); @@ -52,8 +53,38 @@ public function testWrite(): void $content = $output->fetch(); self::assertStringContainsString('profile.created', $content); - self::assertStringContainsString('Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated', $content); - self::assertStringContainsString('Patchlevel\EventSourcing\Tests\Unit\Fixture\Profile', $content); + self::assertStringContainsString('profile', $content); self::assertStringContainsString('{"id":"1","email":"foo@bar.com"}', $content); } + + public function testMessageWithError(): void + { + $input = new ArrayInput([]); + $output = new BufferedOutput(); + + $event = new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('foo@bar.com'), + ); + + $serializer = $this->prophesize(EventSerializer::class); + $serializer + ->serialize($event, [Encoder::OPTION_PRETTY_PRINT => true]) + ->willThrow(new RuntimeException('Unknown Error')); + + $message = Message::create($event) + ->withAggregateName('profile') + ->withAggregateId('1') + ->withPlayhead(1) + ->withRecordedOn(new DateTimeImmutable()); + + $console = new OutputStyle($input, $output); + + $console->message($serializer->reveal(), $message); + + $content = $output->fetch(); + + self::assertStringContainsString('Unknown Error', $content); + self::assertStringContainsString(ProfileCreated::class, $content); + } } diff --git a/tests/Unit/EventBus/Serializer/EventSerializerMessageSerializerTest.php b/tests/Unit/EventBus/Serializer/EventSerializerMessageSerializerTest.php new file mode 100644 index 000000000..6b34a521f --- /dev/null +++ b/tests/Unit/EventBus/Serializer/EventSerializerMessageSerializerTest.php @@ -0,0 +1,113 @@ +withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->serialize($event)->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_visited', + '{id: foo}', + )); + + $serializer = new EventSerializerMessageSerializer( + $eventSerializer->reveal(), + ); + + $content = $serializer->serialize($message); + + self::assertEquals('YToyOntzOjE1OiJzZXJpYWxpemVkRXZlbnQiO086NTE6IlBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xTZXJpYWxpemVyXFNlcmlhbGl6ZWRFdmVudCI6Mjp7czo0OiJuYW1lIjtzOjE1OiJwcm9maWxlX3Zpc2l0ZWQiO3M6NzoicGF5bG9hZCI7czo5OiJ7aWQ6IGZvb30iO31zOjc6ImhlYWRlcnMiO2E6Mzp7czoxMDoicmVjb3JkZWRPbiI7TzoxNzoiRGF0ZVRpbWVJbW11dGFibGUiOjM6e3M6NDoiZGF0ZSI7czoyNjoiMjAyMC0wMS0wMSAyMDowMDowMC4wMDAwMDAiO3M6MTM6InRpbWV6b25lX3R5cGUiO2k6MTtzOjg6InRpbWV6b25lIjtzOjY6IiswMTowMCI7fXM6MTQ6Im5ld1N0cmVhbVN0YXJ0IjtiOjA7czo4OiJhcmNoaXZlZCI7YjowO319', $content); + } + + public function testDeserialize(): void + { + $event = new ProfileVisited( + ProfileId::fromString('foo'), + ); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->deserialize(new SerializedEvent( + 'profile_visited', + '{id: foo}', + ))->shouldBeCalledOnce()->willReturn($event); + + $serializer = new EventSerializerMessageSerializer( + $eventSerializer->reveal(), + ); + + $message = $serializer->deserialize('YToyOntzOjE1OiJzZXJpYWxpemVkRXZlbnQiO086NTE6IlBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xTZXJpYWxpemVyXFNlcmlhbGl6ZWRFdmVudCI6Mjp7czo0OiJuYW1lIjtzOjE1OiJwcm9maWxlX3Zpc2l0ZWQiO3M6NzoicGF5bG9hZCI7czo5OiJ7aWQ6IGZvb30iO31zOjc6ImhlYWRlcnMiO2E6Mzp7czoxMDoicmVjb3JkZWRPbiI7TzoxNzoiRGF0ZVRpbWVJbW11dGFibGUiOjM6e3M6NDoiZGF0ZSI7czoyNjoiMjAyMC0wMS0wMSAyMDowMDowMC4wMDAwMDAiO3M6MTM6InRpbWV6b25lX3R5cGUiO2k6MTtzOjg6InRpbWV6b25lIjtzOjY6IiswMTowMCI7fXM6MTQ6Im5ld1N0cmVhbVN0YXJ0IjtiOjA7czo4OiJhcmNoaXZlZCI7YjowO319'); + + self::assertEquals($event, $message->event()); + self::assertEquals([ + 'recordedOn' => new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'), + 'newStreamStart' => false, + 'archived' => false, + ], $message->headers()); + } + + public function testDeserializeDecodeFailed(): void + { + $this->expectException(DeserializeFailed::class); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $serializer = new EventSerializerMessageSerializer( + $eventSerializer->reveal(), + ); + + $serializer->deserialize('!@#%$^&*()'); + } + + public function testEquals(): void + { + $event = new ProfileVisited( + ProfileId::fromString('foo'), + ); + + $message = Message::create($event) + ->withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->serialize($event)->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_visited', + '{id: foo}', + )); + $eventSerializer->deserialize(new SerializedEvent( + 'profile_visited', + '{id: foo}', + ))->shouldBeCalledOnce()->willReturn($event); + + $serializer = new EventSerializerMessageSerializer( + $eventSerializer->reveal(), + ); + + $content = $serializer->serialize($message); + $clonedMessage = $serializer->deserialize($content); + + self::assertEquals($message, $clonedMessage); + } +} diff --git a/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php b/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php new file mode 100644 index 000000000..05d3c16ba --- /dev/null +++ b/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php @@ -0,0 +1,89 @@ +withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); + + $nativeSerializer = new PhpNativeMessageSerializer(); + + $content = $nativeSerializer->serialize($message); + + self::assertEquals('Tzo0MToiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UiOjg6e3M6NTY6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVOYW1lIjtOO3M6NTQ6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVJZCI7TjtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAcGxheWhlYWQiO047czo1MzoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAHJlY29yZGVkT24iO086MTc6IkRhdGVUaW1lSW1tdXRhYmxlIjozOntzOjQ6ImRhdGUiO3M6MjY6IjIwMjAtMDEtMDEgMjA6MDA6MDAuMDAwMDAwIjtzOjEzOiJ0aW1lem9uZV90eXBlIjtpOjE7czo4OiJ0aW1lem9uZSI7czo2OiIrMDE6MDAiO31zOjU3OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAbmV3U3RyZWFtU3RhcnQiO2I6MDtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAYXJjaGl2ZWQiO2I6MDtzOjU2OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAY3VzdG9tSGVhZGVycyI7YTowOnt9czo0ODoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAGV2ZW50IjtPOjU4OiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVWaXNpdGVkIjoxOntzOjk6InZpc2l0b3JJZCI7Tzo1MzoiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXFRlc3RzXFVuaXRcRml4dHVyZVxQcm9maWxlSWQiOjE6e3M6NTc6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVJZABpZCI7czozOiJmb28iO319fQ==', $content); + } + + public function testDeserialize(): void + { + $event = new ProfileVisited( + ProfileId::fromString('foo'), + ); + + $nativeSerializer = new PhpNativeMessageSerializer(); + + $message = $nativeSerializer->deserialize('Tzo0MToiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UiOjg6e3M6NTY6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVOYW1lIjtOO3M6NTQ6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVJZCI7TjtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAcGxheWhlYWQiO047czo1MzoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAHJlY29yZGVkT24iO086MTc6IkRhdGVUaW1lSW1tdXRhYmxlIjozOntzOjQ6ImRhdGUiO3M6MjY6IjIwMjAtMDEtMDEgMjA6MDA6MDAuMDAwMDAwIjtzOjEzOiJ0aW1lem9uZV90eXBlIjtpOjE7czo4OiJ0aW1lem9uZSI7czo2OiIrMDE6MDAiO31zOjU3OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAbmV3U3RyZWFtU3RhcnQiO2I6MDtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAYXJjaGl2ZWQiO2I6MDtzOjU2OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAY3VzdG9tSGVhZGVycyI7YTowOnt9czo0ODoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAGV2ZW50IjtPOjU4OiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVWaXNpdGVkIjoxOntzOjk6InZpc2l0b3JJZCI7Tzo1MzoiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXFRlc3RzXFVuaXRcRml4dHVyZVxQcm9maWxlSWQiOjE6e3M6NTc6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVJZABpZCI7czozOiJmb28iO319fQ=='); + + self::assertEquals($event, $message->event()); + self::assertEquals([ + 'recordedOn' => new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'), + 'newStreamStart' => false, + 'archived' => false, + ], $message->headers()); + } + + public function testDeserializeDecodeFailed(): void + { + $this->expectException(DeserializeFailed::class); + + $nativeSerializer = new PhpNativeMessageSerializer(); + + $nativeSerializer->deserialize('!@#%$^&*()'); + } + + public function testDeserializeNotAMessage(): void + { + $this->expectException(DeserializeFailed::class); + + $nativeSerializer = new PhpNativeMessageSerializer(); + + $nativeSerializer->deserialize('Tzo4OiJzdGRDbGFzcyI6MDp7fQ=='); + } + + public function testEquals(): void + { + $event = new ProfileVisited( + ProfileId::fromString('foo'), + ); + + $message = Message::create($event) + ->withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); + + $nativeSerializer = new PhpNativeMessageSerializer(); + + $content = $nativeSerializer->serialize($message); + $clonedMessage = $nativeSerializer->deserialize($content); + + self::assertEquals($message, $clonedMessage); + } +} diff --git a/tests/Unit/Outbox/DoctrineOutboxStoreTest.php b/tests/Unit/Outbox/DoctrineOutboxStoreTest.php index 1929f1d54..edf1155d1 100644 --- a/tests/Unit/Outbox/DoctrineOutboxStoreTest.php +++ b/tests/Unit/Outbox/DoctrineOutboxStoreTest.php @@ -7,16 +7,12 @@ use DateTimeImmutable; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Driver; -use Doctrine\DBAL\Platforms\AbstractPlatform; use Doctrine\DBAL\Query\QueryBuilder; -use Doctrine\DBAL\Schema\Column; use Doctrine\DBAL\Schema\Schema; -use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Types; use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer; use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore; -use Patchlevel\EventSourcing\Serializer\EventSerializer; -use Patchlevel\EventSourcing\Serializer\SerializedEvent; use Patchlevel\EventSourcing\Store\WrongQueryResult; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; @@ -44,16 +40,7 @@ public function testSaveOutboxMessage(): void $innerMockedConnection = $this->prophesize(Connection::class); $innerMockedConnection->insert( 'outbox', - [ - 'aggregate' => 'profile', - 'aggregate_id' => '1', - 'playhead' => 1, - 'event' => 'profile_created', - 'payload' => '', - 'recorded_on' => $recordedOn, - 'custom_headers' => [], - ], - ['recorded_on' => 'datetimetz_immutable', 'custom_headers' => 'json'], + ['message' => 'serialized'], )->shouldBeCalledOnce(); $driver = $this->prophesize(Driver::class); @@ -65,8 +52,8 @@ public function testSaveOutboxMessage(): void static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()) ); - $serializer = $this->prophesize(EventSerializer::class); - $serializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); + $serializer = $this->prophesize(MessageSerializer::class); + $serializer->serialize($message)->shouldBeCalledOnce()->willReturn('serialized'); $doctrineOutboxStore = new DoctrineOutboxStore( $mockedConnection->reveal(), @@ -85,12 +72,13 @@ public function testMarkOutboxMessageConsumed(): void ->withPlayhead(1) ->withRecordedOn($recordedOn) ->withNewStreamStart(false) - ->withArchived(false); + ->withArchived(false) + ->withCustomHeader(DoctrineOutboxStore::HEADER_OUTBOX_IDENTIFIER, 42); $innerMockedConnection = $this->prophesize(Connection::class); $innerMockedConnection->delete( 'outbox', - ['aggregate' => 'profile', 'aggregate_id' => '1', 'playhead' => 1], + ['id' => 42], )->shouldBeCalledOnce(); $driver = $this->prophesize(Driver::class); @@ -102,7 +90,7 @@ public function testMarkOutboxMessageConsumed(): void static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()) ); - $serializer = $this->prophesize(EventSerializer::class); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $mockedConnection->reveal(), @@ -123,7 +111,7 @@ public function testCountOutboxMessages(): void $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); $connection->fetchOne('this sql')->shouldBeCalledOnce()->willReturn('1'); - $serializer = $this->prophesize(EventSerializer::class); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -145,7 +133,7 @@ public function testCountOutboxMessagesFailure(): void $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); $connection->fetchOne('this sql')->shouldBeCalledOnce()->willReturn([]); - $serializer = $this->prophesize(EventSerializer::class); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -168,11 +156,7 @@ public function testRetrieveOutboxMessagesNoResult(): void $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); $connection->fetchAllAssociative('this sql')->shouldBeCalledOnce()->willReturn([]); - $platform = $this->prophesize(AbstractPlatform::class); - $connection->getDatabasePlatform()->shouldBeCalledOnce()->willReturn($platform->reveal()); - - $serializer = $this->prophesize(EventSerializer::class); - $serializer->deserialize(Argument::type(SerializedEvent::class))->shouldNotBeCalled(); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -193,7 +177,8 @@ public function testRetrieveOutboxMessages(): void ->withPlayhead(1) ->withRecordedOn($recordedOn) ->withNewStreamStart(false) - ->withArchived(false); + ->withArchived(false) + ->withCustomHeader(DoctrineOutboxStore::HEADER_OUTBOX_IDENTIFIER, 42); $queryBuilder = $this->prophesize(QueryBuilder::class); $queryBuilder->select('*')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); @@ -205,23 +190,13 @@ public function testRetrieveOutboxMessages(): void $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); $connection->fetchAllAssociative('this sql')->shouldBeCalledOnce()->willReturn([ [ - 'event' => 'profile_created', - 'payload' => '{"profile_id": "1", "email": "s"}', - 'aggregate' => 'profile', - 'aggregate_id' => '1', - 'playhead' => 1, - 'recorded_on' => $recordedOn->format('Y-m-d\TH:i:s.ue'), - 'custom_headers' => '{}', + 'id' => 42, + 'message' => 'serialized', ], ]); - $platform = $this->prophesize(AbstractPlatform::class); - $platform->getDateTimeTzFormatString()->shouldBeCalledOnce()->willReturn('Y-m-d\TH:i:s.ue'); - - $connection->getDatabasePlatform()->shouldBeCalledOnce()->willReturn($platform->reveal()); - - $serializer = $this->prophesize(EventSerializer::class); - $serializer->deserialize(new SerializedEvent('profile_created', '{"profile_id": "1", "email": "s"}'))->shouldBeCalledOnce()->willReturn($event); + $serializer = $this->prophesize(MessageSerializer::class); + $serializer->deserialize('serialized')->shouldBeCalledOnce()->willReturn($message); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -235,78 +210,28 @@ public function testRetrieveOutboxMessages(): void public function testConfigureSchema(): void { $connection = $this->prophesize(Connection::class); - $serializer = $this->prophesize(EventSerializer::class); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), $serializer->reveal(), ); - $table = $this->prophesize(Table::class); - $column = $this->prophesize(Column::class); - $table->addColumn('aggregate', Types::STRING)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->setLength(255)->shouldBeCalledOnce()->willReturn($column->reveal()) - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('aggregate_id', Types::STRING)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->setLength(32)->shouldBeCalledOnce()->willReturn($column->reveal()) - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('playhead', Types::INTEGER)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('event', Types::STRING)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->setLength(255)->shouldBeCalledOnce()->willReturn($column->reveal()) - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('payload', Types::JSON)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(false)->shouldBeCalledOnce() - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('custom_headers', Types::JSON)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->reveal(), - ); - - $table->setPrimaryKey(['aggregate', 'aggregate_id', 'playhead'])->shouldBeCalledOnce(); - - $schema = $this->prophesize(Schema::class); - $schema->createTable('outbox')->shouldBeCalledOnce()->willReturn($table->reveal()); - - $doctrineOutboxStore->configureSchema($schema->reveal(), $connection->reveal()); + $expectedSchema = new Schema(); + $table = $expectedSchema->createTable('outbox'); + + $table->addColumn('id', Types::INTEGER) + ->setNotnull(true) + ->setAutoincrement(true); + $table->addColumn('message', Types::STRING) + ->setNotnull(true) + ->setLength(16_000); + + $table->setPrimaryKey(['id']); + + $schema = new Schema(); + $doctrineOutboxStore->configureSchema($schema, $connection->reveal()); + + $this->assertEquals($expectedSchema, $schema); } } diff --git a/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php b/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php deleted file mode 100644 index 23df1a614..000000000 --- a/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php +++ /dev/null @@ -1,96 +0,0 @@ -withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); - - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer - ->serialize($event) - ->willReturn(new SerializedEvent('profile.visited', '{"profileId": "foo"}')) - ->shouldBeCalledOnce(); - - $nativeSerializer = new PhpNativeMessageSerializer($eventSerializer->reveal()); - - $content = $nativeSerializer->serialize($message); - - self::assertEquals('YTozOntzOjU6ImV2ZW50IjtzOjE1OiJwcm9maWxlLnZpc2l0ZWQiO3M6NzoicGF5bG9hZCI7czoyMDoieyJwcm9maWxlSWQiOiAiZm9vIn0iO3M6NzoiaGVhZGVycyI7YTozOntzOjEwOiJyZWNvcmRlZE9uIjtPOjE3OiJEYXRlVGltZUltbXV0YWJsZSI6Mzp7czo0OiJkYXRlIjtzOjI2OiIyMDIwLTAxLTAxIDIwOjAwOjAwLjAwMDAwMCI7czoxMzoidGltZXpvbmVfdHlwZSI7aToxO3M6ODoidGltZXpvbmUiO3M6NjoiKzAxOjAwIjt9czoxNDoibmV3U3RyZWFtU3RhcnQiO2I6MDtzOjg6ImFyY2hpdmVkIjtiOjA7fX0=', $content); - } - - public function testDeserialize(): void - { - $event = new ProfileVisited( - ProfileId::fromString('foo'), - ); - - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer - ->deserialize(new SerializedEvent('profile.visited', '{"profileId": "foo"}')) - ->willReturn($event) - ->shouldBeCalledOnce(); - - $nativeSerializer = new PhpNativeMessageSerializer($eventSerializer->reveal()); - - $message = $nativeSerializer->deserialize('YTozOntzOjU6ImV2ZW50IjtzOjE1OiJwcm9maWxlLnZpc2l0ZWQiO3M6NzoicGF5bG9hZCI7czoyMDoieyJwcm9maWxlSWQiOiAiZm9vIn0iO3M6NzoiaGVhZGVycyI7YTozOntzOjEwOiJyZWNvcmRlZE9uIjtPOjE3OiJEYXRlVGltZUltbXV0YWJsZSI6Mzp7czo0OiJkYXRlIjtzOjI2OiIyMDIwLTAxLTAxIDIwOjAwOjAwLjAwMDAwMCI7czoxMzoidGltZXpvbmVfdHlwZSI7aToxO3M6ODoidGltZXpvbmUiO3M6NjoiKzAxOjAwIjt9czoxNDoibmV3U3RyZWFtU3RhcnQiO2I6MDtzOjg6ImFyY2hpdmVkIjtiOjA7fX0='); - - self::assertEquals($event, $message->event()); - self::assertEquals([ - 'recordedOn' => new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'), - 'newStreamStart' => false, - 'archived' => false, - ], $message->headers()); - } - - public function testEquals(): void - { - $event = new ProfileVisited( - ProfileId::fromString('foo'), - ); - - $message = Message::create($event) - ->withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); - - $eventSerializer = $this->prophesize(EventSerializer::class); - - $eventSerializer - ->serialize($event) - ->willReturn(new SerializedEvent('profile.visited', '{"profileId": "foo"}')) - ->shouldBeCalledOnce(); - - $eventSerializer - ->deserialize(new SerializedEvent('profile.visited', '{"profileId": "foo"}')) - ->willReturn($event) - ->shouldBeCalledOnce(); - - $nativeSerializer = new PhpNativeMessageSerializer($eventSerializer->reveal()); - - $content = $nativeSerializer->serialize($message); - $clonedMessage = $nativeSerializer->deserialize($content); - - self::assertEquals($message, $clonedMessage); - } -} diff --git a/tests/Unit/WatchServer/WatchListenerTest.php b/tests/Unit/WatchServer/WatchEventBusTest.php similarity index 73% rename from tests/Unit/WatchServer/WatchListenerTest.php rename to tests/Unit/WatchServer/WatchEventBusTest.php index c84deaeba..93f7ae786 100644 --- a/tests/Unit/WatchServer/WatchListenerTest.php +++ b/tests/Unit/WatchServer/WatchEventBusTest.php @@ -8,13 +8,13 @@ use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use Patchlevel\EventSourcing\WatchServer\SendingFailed; -use Patchlevel\EventSourcing\WatchServer\WatchListener; +use Patchlevel\EventSourcing\WatchServer\WatchEventBus; use Patchlevel\EventSourcing\WatchServer\WatchServerClient; use PHPUnit\Framework\TestCase; use Prophecy\PhpUnit\ProphecyTrait; -/** @covers \Patchlevel\EventSourcing\WatchServer\WatchListener */ -final class WatchListenerTest extends TestCase +/** @covers \Patchlevel\EventSourcing\WatchServer\WatchEventBus */ +final class WatchEventBusTest extends TestCase { use ProphecyTrait; @@ -25,8 +25,8 @@ public function testListener(): void $client = $this->prophesize(WatchServerClient::class); $client->send($message)->shouldBeCalled(); - $listener = new WatchListener($client->reveal()); - $listener->__invoke($message); + $bus = new WatchEventBus($client->reveal()); + $bus->dispatch($message); } public function testIgnoreErrors(): void @@ -36,7 +36,7 @@ public function testIgnoreErrors(): void $client = $this->prophesize(WatchServerClient::class); $client->send($message)->shouldBeCalled()->willThrow(SendingFailed::class); - $listener = new WatchListener($client->reveal()); - $listener->__invoke($message); + $bus = new WatchEventBus($client->reveal()); + $bus->dispatch($message); } }