diff --git a/docs/pages/cli.md b/docs/pages/cli.md index 654b01e38..6f017744e 100644 --- a/docs/pages/cli.md +++ b/docs/pages/cli.md @@ -7,7 +7,6 @@ You can: * Create and delete `databases` * Create, update and delete `schemas` * Manage `projections` -* Consume `outbox` messages ## Database commands @@ -45,17 +44,6 @@ To manage your projectors there are the following cli commands. You can find out more about projections [here](projection.md). -## Outbox commands - -Interacting with the outbox store is also possible via the cli. - -* OutboxInfoCommand: `event-sourcing:outbox:info` -* OutboxConsumeCommand: `event-sourcing:outbox:consume` - -!!! note - - You can find out more about outbox [here](outbox.md). - ## Inspector commands The inspector is a tool to inspect the event streams. diff --git a/docs/pages/event_bus.md b/docs/pages/event_bus.md index 95d5642e8..b8508edfb 100644 --- a/docs/pages/event_bus.md +++ b/docs/pages/event_bus.md @@ -228,18 +228,6 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher); You can't use the `Subscribe` attribute with the psr-14 event bus. -## Chain Event Bus - -If you want to use multiple event buses, you can use the `ChainEventBus`. - -```php -use Patchlevel\EventSourcing\EventBus\ChainEventBus; - -$eventBus = new ChainEventBus([ - $eventBus1, - $eventBus2, -]); -``` ## Learn more diff --git a/src/Console/Command/OutboxConsumeCommand.php b/src/Console/Command/OutboxConsumeCommand.php deleted file mode 100644 index 484ac68a8..000000000 --- a/src/Console/Command/OutboxConsumeCommand.php +++ /dev/null @@ -1,93 +0,0 @@ -addOption( - 'message-limit', - null, - InputOption::VALUE_REQUIRED, - 'How many messages should be consumed in one run', - 100, - ) - ->addOption( - 'run-limit', - null, - InputOption::VALUE_OPTIONAL, - 'The maximum number of runs this command should execute', - 1, - ) - ->addOption( - 'memory-limit', - null, - InputOption::VALUE_REQUIRED, - 'How much memory consumption should the worker be terminated (e.g. 250MB)', - ) - ->addOption( - 'time-limit', - null, - InputOption::VALUE_REQUIRED, - 'What is the maximum time the worker can run in seconds', - ) - ->addOption( - 'sleep', - null, - InputOption::VALUE_REQUIRED, - 'How much time should elapse before the next job is executed in milliseconds', - 1000, - ); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - $messageLimit = InputHelper::int($input->getOption('message-limit')); - $runLimit = InputHelper::nullablePositiveInt($input->getOption('run-limit')); - $memoryLimit = InputHelper::nullableString($input->getOption('memory-limit')); - $timeLimit = InputHelper::nullablePositiveInt($input->getOption('time-limit')); - $sleep = InputHelper::positiveIntOrZero($input->getOption('sleep')); - - $logger = new ConsoleLogger($output); - - $worker = DefaultWorker::create( - function () use ($messageLimit): void { - $this->processor->process($messageLimit); - }, - [ - 'runLimit' => $runLimit, - 'memoryLimit' => $memoryLimit, - 'timeLimit' => $timeLimit, - ], - $logger, - ); - - $worker->run($sleep); - - return 0; - } -} diff --git a/src/Console/Command/OutboxInfoCommand.php b/src/Console/Command/OutboxInfoCommand.php deleted file mode 100644 index 84f9f55e8..000000000 --- a/src/Console/Command/OutboxInfoCommand.php +++ /dev/null @@ -1,52 +0,0 @@ -addOption('limit', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to be displayed'); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - $console = new OutputStyle($input, $output); - - $limit = InputHelper::nullableInt($input->getOption('limit')); - - $messages = $this->store->retrieveOutboxMessages($limit); - - foreach ($messages as $message) { - $console->message($this->eventSerializer, $this->headersSerializer, $message); - } - - return 0; - } -} diff --git a/src/EventBus/ChainEventBus.php b/src/EventBus/ChainEventBus.php deleted file mode 100644 index 5077293a1..000000000 --- a/src/EventBus/ChainEventBus.php +++ /dev/null @@ -1,21 +0,0 @@ - $eventBuses */ - public function __construct( - private readonly iterable $eventBuses, - ) { - } - - public function dispatch(Message ...$messages): void - { - foreach ($this->eventBuses as $eventBus) { - $eventBus->dispatch(...$messages); - } - } -} diff --git a/src/EventBus/Serializer/EventSerializerMessageSerializer.php b/src/EventBus/Serializer/EventSerializerMessageSerializer.php deleted file mode 100644 index 98fbfa83e..000000000 --- a/src/EventBus/Serializer/EventSerializerMessageSerializer.php +++ /dev/null @@ -1,55 +0,0 @@ -} */ -final class EventSerializerMessageSerializer implements MessageSerializer -{ - public function __construct( - private readonly EventSerializer $eventSerializer, - private readonly HeadersSerializer $headersSerializer, - private readonly Encoder $encoder, - ) { - } - - public function serialize(Message $message): string - { - return $this->encoder->encode( - [ - 'serializedEvent' => $this->eventSerializer->serialize($message->event()), - 'headers' => $this->headersSerializer->serialize($message->headers()), - ], - ); - } - - public function deserialize(string $content): Message - { - $messageData = $this->encoder->decode($content); - - if ( - !isset($messageData['serializedEvent'], $messageData['headers']) - || !is_array($messageData['serializedEvent']) - || !is_array($messageData['headers']) - || !isset($messageData['serializedEvent']['name'], $messageData['serializedEvent']['payload']) - || !is_string($messageData['serializedEvent']['name']) - || !is_string($messageData['serializedEvent']['payload']) - ) { - throw DeserializeFailed::invalidData($messageData); - } - - $event = $this->eventSerializer->deserialize(new SerializedEvent($messageData['serializedEvent']['name'], $messageData['serializedEvent']['payload'])); - $headers = $this->headersSerializer->deserialize($messageData['headers']); - - return Message::createWithHeaders($event, $headers); - } -} diff --git a/src/EventBus/Serializer/MessageSerializer.php b/src/EventBus/Serializer/MessageSerializer.php deleted file mode 100644 index 0cfca701d..000000000 --- a/src/EventBus/Serializer/MessageSerializer.php +++ /dev/null @@ -1,14 +0,0 @@ - true], - ); - - if (!$message instanceof Message) { - throw DeserializeFailed::invalidData($message); - } - - return $message; - } -} diff --git a/src/Outbox/DoctrineOutboxStore.php b/src/Outbox/DoctrineOutboxStore.php deleted file mode 100644 index 0cdd2cff4..000000000 --- a/src/Outbox/DoctrineOutboxStore.php +++ /dev/null @@ -1,121 +0,0 @@ -connection->transactional( - function (Connection $connection) use ($messages): void { - foreach ($messages as $message) { - $connection->insert( - $this->outboxTable, - [ - 'message' => $this->messageSerializer->serialize($message), - ], - ); - } - }, - ); - } - - /** @return list */ - public function retrieveOutboxMessages(int|null $limit = null): array - { - $sql = $this->connection->createQueryBuilder() - ->select('*') - ->from($this->outboxTable) - ->setMaxResults($limit) - ->getSQL(); - - /** @var list $result */ - $result = $this->connection->fetchAllAssociative($sql); - - return array_map( - function (array $data) { - $message = $this->messageSerializer->deserialize($data['message']); - - return $message->withHeader(new OutboxHeader($data['id'])); - }, - $result, - ); - } - - public function markOutboxMessageConsumed(Message ...$messages): void - { - $this->connection->transactional( - function (Connection $connection) use ($messages): void { - foreach ($messages as $message) { - $id = $this->extractId($message); - $connection->delete($this->outboxTable, ['id' => $id]); - } - }, - ); - } - - public function countOutboxMessages(): int - { - $sql = $this->connection->createQueryBuilder() - ->select('COUNT(*)') - ->from($this->outboxTable) - ->getSQL(); - - $result = $this->connection->fetchOne($sql); - - if (!is_int($result) && !is_string($result)) { - throw new WrongQueryResult(); - } - - return (int)$result; - } - - public function configureSchema(Schema $schema, Connection $connection): void - { - $table = $schema->createTable($this->outboxTable); - - $table->addColumn('id', Types::INTEGER) - ->setNotnull(true) - ->setAutoincrement(true); - $table->addColumn('message', Types::STRING) - ->setNotnull(true) - ->setLength(16_000); - - $table->setPrimaryKey(['id']); - } - - private function extractId(Message $message): int - { - try { - $outboxHeader = $message->header(OutboxHeader::class); - } catch (HeaderNotFound) { - throw OutboxHeaderIssue::missingHeader(self::HEADER_OUTBOX_IDENTIFIER); - } - - return $outboxHeader->id; - } -} diff --git a/src/Outbox/EventBusPublisher.php b/src/Outbox/EventBusPublisher.php deleted file mode 100644 index 1c8781b50..000000000 --- a/src/Outbox/EventBusPublisher.php +++ /dev/null @@ -1,21 +0,0 @@ -consumer->consume($message); - } -} diff --git a/src/Outbox/OutboxEventBus.php b/src/Outbox/OutboxEventBus.php deleted file mode 100644 index ea751eb81..000000000 --- a/src/Outbox/OutboxEventBus.php +++ /dev/null @@ -1,32 +0,0 @@ -store->saveOutboxMessage(...$messages); - - foreach ($messages as $message) { - $this->logger?->debug(sprintf( - 'EventBus: Message "%s" added to queue.', - $message->event()::class, - )); - } - } -} diff --git a/src/Outbox/OutboxHeader.php b/src/Outbox/OutboxHeader.php deleted file mode 100644 index 49588e489..000000000 --- a/src/Outbox/OutboxHeader.php +++ /dev/null @@ -1,17 +0,0 @@ - */ - public function retrieveOutboxMessages(int|null $limit = null): array; - - public function markOutboxMessageConsumed(Message ...$messages): void; - - public function countOutboxMessages(): int; -} diff --git a/src/Outbox/StoreOutboxProcessor.php b/src/Outbox/StoreOutboxProcessor.php deleted file mode 100644 index 3cd14da91..000000000 --- a/src/Outbox/StoreOutboxProcessor.php +++ /dev/null @@ -1,24 +0,0 @@ -store->retrieveOutboxMessages($limit); - - foreach ($messages as $message) { - $this->publisher->publish($message); - $this->store->markOutboxMessageConsumed($message); - } - } -} diff --git a/tests/Integration/Outbox/Aggregate/Profile.php b/tests/Integration/Outbox/Aggregate/Profile.php deleted file mode 100644 index 0c48f70a5..000000000 --- a/tests/Integration/Outbox/Aggregate/Profile.php +++ /dev/null @@ -1,44 +0,0 @@ -recordThat(new ProfileCreated($id, $name)); - - return $self; - } - - #[Apply(ProfileCreated::class)] - protected function applyProfileCreated(ProfileCreated $event): void - { - $this->id = $event->profileId; - $this->name = $event->name; - } - - public function name(): string - { - return $this->name; - } -} diff --git a/tests/Integration/Outbox/Events/ProfileCreated.php b/tests/Integration/Outbox/Events/ProfileCreated.php deleted file mode 100644 index 597b006e8..000000000 --- a/tests/Integration/Outbox/Events/ProfileCreated.php +++ /dev/null @@ -1,20 +0,0 @@ -connection = DbalManager::createConnection(); - } - - public function tearDown(): void - { - $this->connection->close(); - SendEmailMock::reset(); - } - - public function testSuccessful(): void - { - $eventSerializer = DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']); - $headerSerializer = DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/../../../src', - __DIR__, - ]); - - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/../../../src', - __DIR__, - ]), - 'eventstore', - ); - - $outboxStore = new DoctrineOutboxStore( - $this->connection, - new PhpNativeMessageSerializer(), - 'outbox', - ); - - $outboxEventBus = new OutboxEventBus($outboxStore); - - $eventBusConsumer = DefaultConsumer::create([new SendEmailProcessor()]); - - $eventBus = $outboxEventBus; - - $repository = new DefaultRepository($store, $eventBus, Profile::metadata()); - - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - new ChainDoctrineSchemaConfigurator([ - $store, - $outboxStore, - ]), - ); - - $schemaDirector->create(); - - $profile = Profile::create(ProfileId::fromString('1'), 'John'); - $repository->save($profile); - - self::assertSame(1, $outboxStore->countOutboxMessages()); - - $messages = $outboxStore->retrieveOutboxMessages(); - - self::assertCount(1, $messages); - - $message = $messages[0]; - - $aggregateHeader = $message->header(AggregateHeader::class); - - self::assertSame('1', $aggregateHeader->aggregateId); - self::assertSame('profile', $aggregateHeader->aggregateName); - self::assertSame(1, $aggregateHeader->playhead); - self::assertEquals( - new ProfileCreated(ProfileId::fromString('1'), 'John'), - $message->event(), - ); - - $consumer = new StoreOutboxProcessor( - $outboxStore, - new EventBusPublisher($eventBusConsumer), - ); - - $consumer->process(); - - self::assertSame(0, $outboxStore->countOutboxMessages()); - self::assertCount(0, $outboxStore->retrieveOutboxMessages()); - } - - public function testSuccessfulWithEventSerializerMessageSerializer(): void - { - $eventSerializer = DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']); - $headerSerializer = DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/../../../src', - __DIR__, - ]); - - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/../../../src', - __DIR__, - ]), - 'eventstore', - ); - - $outboxStore = new DoctrineOutboxStore( - $this->connection, - new EventSerializerMessageSerializer( - $eventSerializer, - $headerSerializer, - new JsonEncoder(), - ), - 'outbox', - ); - - $outboxEventBus = new OutboxEventBus($outboxStore); - - $eventBusConsumer = DefaultConsumer::create([new SendEmailProcessor()]); - - $eventBus = $outboxEventBus; - - $repository = new DefaultRepository($store, $eventBus, Profile::metadata()); - - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - new ChainDoctrineSchemaConfigurator([ - $store, - $outboxStore, - ]), - ); - - $schemaDirector->create(); - - $profile = Profile::create(ProfileId::fromString('1'), 'John'); - $repository->save($profile); - - self::assertSame(1, $outboxStore->countOutboxMessages()); - - $messages = $outboxStore->retrieveOutboxMessages(); - - self::assertCount(1, $messages); - - $message = $messages[0]; - - $aggregateHeader = $message->header(AggregateHeader::class); - - self::assertSame('1', $aggregateHeader->aggregateId); - self::assertSame('profile', $aggregateHeader->aggregateName); - self::assertSame(1, $aggregateHeader->playhead); - self::assertEquals( - new ProfileCreated(ProfileId::fromString('1'), 'John'), - $message->event(), - ); - - $consumer = new StoreOutboxProcessor( - $outboxStore, - new EventBusPublisher($eventBusConsumer), - ); - - $consumer->process(); - - self::assertSame(0, $outboxStore->countOutboxMessages()); - self::assertCount(0, $outboxStore->retrieveOutboxMessages()); - } -} diff --git a/tests/Integration/Outbox/Processor/SendEmailProcessor.php b/tests/Integration/Outbox/Processor/SendEmailProcessor.php deleted file mode 100644 index 381ea256a..000000000 --- a/tests/Integration/Outbox/Processor/SendEmailProcessor.php +++ /dev/null @@ -1,19 +0,0 @@ -id; - } -} diff --git a/tests/Integration/Outbox/Projection/ProfileProjector.php b/tests/Integration/Outbox/Projection/ProfileProjector.php deleted file mode 100644 index 35b97e809..000000000 --- a/tests/Integration/Outbox/Projection/ProfileProjector.php +++ /dev/null @@ -1,58 +0,0 @@ -addColumn('id', 'string')->setLength(36); - $table->addColumn('name', 'string')->setLength(255); - $table->setPrimaryKey(['id']); - - $this->connection->createSchemaManager()->createTable($table); - } - - #[Teardown] - public function drop(): void - { - $this->connection->createSchemaManager()->dropTable('projection_profile'); - } - - #[Subscribe(ProfileCreated::class)] - public function handleProfileCreated(Message $message): void - { - $profileCreated = $message->event(); - - assert($profileCreated instanceof ProfileCreated); - - $this->connection->executeStatement( - 'INSERT INTO projection_profile (id, name) VALUES(:id, :name);', - [ - 'id' => $profileCreated->profileId->toString(), - 'name' => $profileCreated->name, - ], - ); - } -} diff --git a/tests/Integration/Outbox/SendEmailMock.php b/tests/Integration/Outbox/SendEmailMock.php deleted file mode 100644 index 0329c1e69..000000000 --- a/tests/Integration/Outbox/SendEmailMock.php +++ /dev/null @@ -1,25 +0,0 @@ -prophesize(OutboxProcessor::class); - $consumer->process(100)->shouldBeCalled(); - - $command = new OutboxConsumeCommand( - $consumer->reveal(), - ); - - $input = new ArrayInput([]); - $output = new BufferedOutput(); - - $exitCode = $command->run($input, $output); - - self::assertSame(0, $exitCode); - } - - public function testSuccessfulWithAllLimits(): void - { - $consumer = $this->prophesize(OutboxProcessor::class); - $consumer->process(200)->shouldBeCalled(); - - $command = new OutboxConsumeCommand( - $consumer->reveal(), - ); - - $input = new ArrayInput([ - '--message-limit' => 200, - '--run-limit' => 1, - '--memory-limit' => '10GB', - '--time-limit' => 3600, - '--sleep' => 1000, - ]); - - $output = new BufferedOutput(); - - $exitCode = $command->run($input, $output); - - self::assertSame(0, $exitCode); - } -} diff --git a/tests/Unit/Console/Command/OutboxInfoCommandTest.php b/tests/Unit/Console/Command/OutboxInfoCommandTest.php deleted file mode 100644 index 1efd8ae0b..000000000 --- a/tests/Unit/Console/Command/OutboxInfoCommandTest.php +++ /dev/null @@ -1,107 +0,0 @@ -withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable())); - - $store = $this->prophesize(OutboxStore::class); - $store->retrieveOutboxMessages(null)->willReturn([$message]); - - $serializer = $this->prophesize(EventSerializer::class); - $serializer->serialize($event, [Encoder::OPTION_PRETTY_PRINT => true])->willReturn( - new SerializedEvent( - 'profile.visited', - '{"visitorId": "1"}', - ), - ); - - $headersSerializer = $this->prophesize(HeadersSerializer::class); - $headersSerializer->serialize($message->headers(), [Encoder::OPTION_PRETTY_PRINT => true])->willReturn( - ['aggregate' => '{"aggregateName":"profile","aggregateId":"1","playhead":1,"recordedOn":"2020-01-01T20:00:00+01:00"}'], - ); - - $command = new OutboxInfoCommand( - $store->reveal(), - $serializer->reveal(), - $headersSerializer->reveal(), - ); - - $input = new ArrayInput([]); - $output = new BufferedOutput(); - - $exitCode = $command->run($input, $output); - - self::assertSame(0, $exitCode); - - $content = $output->fetch(); - - self::assertStringContainsString('"visitorId": "1"', $content); - } - - public function testSuccessfulWithLimit(): void - { - $event = new ProfileVisited(ProfileId::fromString('1')); - $message = Message::create($event) - ->withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable())); - - $store = $this->prophesize(OutboxStore::class); - $store->retrieveOutboxMessages(100)->willReturn([$message]); - - $serializer = $this->prophesize(EventSerializer::class); - $serializer->serialize($event, [Encoder::OPTION_PRETTY_PRINT => true])->willReturn( - new SerializedEvent( - 'profile.visited', - '{"visitorId": "1"}', - ), - ); - - $headersSerializer = $this->prophesize(HeadersSerializer::class); - $headersSerializer->serialize($message->headers(), [Encoder::OPTION_PRETTY_PRINT => true])->willReturn( - ['aggregate' => '{"aggregateName":"profile","aggregateId":"1","playhead":1,"recordedOn":"2020-01-01T20:00:00+01:00"}'], - ); - - $command = new OutboxInfoCommand( - $store->reveal(), - $serializer->reveal(), - $headersSerializer->reveal(), - ); - - $input = new ArrayInput(['--limit' => 100]); - $output = new BufferedOutput(); - - $exitCode = $command->run($input, $output); - - self::assertSame(0, $exitCode); - - $content = $output->fetch(); - - self::assertStringContainsString('"visitorId": "1"', $content); - } -} diff --git a/tests/Unit/EventBus/ChainEventBusTest.php b/tests/Unit/EventBus/ChainEventBusTest.php deleted file mode 100644 index e6bb572da..000000000 --- a/tests/Unit/EventBus/ChainEventBusTest.php +++ /dev/null @@ -1,46 +0,0 @@ -prophesize(EventBus::class); - $eventBus1->dispatch($message1, $message2)->shouldBeCalled(); - - $eventBus2 = $this->prophesize(EventBus::class); - $eventBus2->dispatch($message1, $message2)->shouldBeCalled(); - - $chainEventBus = new ChainEventBus([$eventBus1->reveal(), $eventBus2->reveal()]); - $chainEventBus->dispatch($message1, $message2); - } -} diff --git a/tests/Unit/EventBus/Serializer/EventSerializerMessageSerializerTest.php b/tests/Unit/EventBus/Serializer/EventSerializerMessageSerializerTest.php deleted file mode 100644 index 0fa205116..000000000 --- a/tests/Unit/EventBus/Serializer/EventSerializerMessageSerializerTest.php +++ /dev/null @@ -1,157 +0,0 @@ -withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'))) - ->withHeader(new ArchivedHeader(false)); - - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->serialize($event)->shouldBeCalledOnce()->willReturn(new SerializedEvent( - 'profile_visited', - '{id: foo}', - )); - - $headersSerializer = $this->prophesize(HeadersSerializer::class); - $headersSerializer->serialize($message->headers())->shouldBeCalledOnce()->willReturn( - [ - 'aggregate' => '{"aggregateName":"profile","aggregateId":"1","playhead":1,"recordedOn":"2020-01-01T20:00:00+01:00"}', - 'archived' => '{"archived":false}', - ], - ); - - $serializer = new EventSerializerMessageSerializer( - $eventSerializer->reveal(), - $headersSerializer->reveal(), - new JsonEncoder(), - ); - - $content = $serializer->serialize($message); - - self::assertEquals( - '{"serializedEvent":{"name":"profile_visited","payload":"{id: foo}"},"headers":{"aggregate":"{\"aggregateName\":\"profile\",\"aggregateId\":\"1\",\"playhead\":1,\"recordedOn\":\"2020-01-01T20:00:00+01:00\"}","archived":"{\"archived\":false}"}}', - $content, - ); - } - - public function testDeserialize(): void - { - $event = new ProfileVisited(ProfileId::fromString('foo')); - - $message = Message::create($event) - ->withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'))) - ->withHeader(new ArchivedHeader(false)); - - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->deserialize(new SerializedEvent( - 'profile_visited', - '{id: foo}', - ))->shouldBeCalledOnce()->willReturn($event); - - $headersSerializer = $this->prophesize(HeadersSerializer::class); - $headersSerializer->deserialize( - [ - 'aggregate' => '{"aggregateName":"profile","aggregateId":"1","playhead":1,"recordedOn":"2020-01-01T20:00:00+01:00"}', - 'archived' => '{"archived":false}', - ], - )->shouldBeCalledOnce()->willReturn($message->headers()); - - $serializer = new EventSerializerMessageSerializer( - $eventSerializer->reveal(), - $headersSerializer->reveal(), - new JsonEncoder(), - ); - - $deserializedMessage = $serializer->deserialize('{"serializedEvent":{"name":"profile_visited","payload":"{id: foo}"},"headers":{"aggregate":"{\"aggregateName\":\"profile\",\"aggregateId\":\"1\",\"playhead\":1,\"recordedOn\":\"2020-01-01T20:00:00+01:00\"}","archived":"{\"archived\":false}"}}'); - - self::assertEquals($message, $deserializedMessage); - } - - public function testDeserializeDecodeFailedInvalidData(): void - { - $this->expectException(DeserializeFailed::class); - - $eventSerializer = $this->prophesize(EventSerializer::class); - $headersSerializer = $this->prophesize(HeadersSerializer::class); - $serializer = new EventSerializerMessageSerializer( - $eventSerializer->reveal(), - $headersSerializer->reveal(), - new JsonEncoder(), - ); - - $serializer->deserialize('{}'); - } - - public function testEquals(): void - { - $event = new ProfileVisited( - ProfileId::fromString('foo'), - ); - - $message = Message::create($event) - ->withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'))); - - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->serialize($event)->shouldBeCalledOnce()->willReturn(new SerializedEvent( - 'profile_visited', - '{id: foo}', - )); - $eventSerializer->deserialize(new SerializedEvent( - 'profile_visited', - '{id: foo}', - ))->shouldBeCalledOnce()->willReturn($event); - - $headersSerializer = $this->prophesize(HeadersSerializer::class); - $headersSerializer->serialize($message->headers())->shouldBeCalledOnce()->willReturn( - [ - 'aggregate' => '{"aggregateName":"profile","aggregateId":"1","playhead":1,"recordedOn":"2020-01-01T20:00:00+01:00"}', - 'archived' => '{"archived":false}', - ], - ); - $headersSerializer->deserialize( - [ - 'aggregate' => '{"aggregateName":"profile","aggregateId":"1","playhead":1,"recordedOn":"2020-01-01T20:00:00+01:00"}', - 'archived' => '{"archived":false}', - ], - )->shouldBeCalledOnce()->willReturn($message->headers()); - - $serializer = new EventSerializerMessageSerializer( - $eventSerializer->reveal(), - $headersSerializer->reveal(), - new JsonEncoder(), - ); - - $content = $serializer->serialize($message); - $clonedMessage = $serializer->deserialize($content); - - self::assertEquals($message, $clonedMessage); - } -} diff --git a/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php b/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php deleted file mode 100644 index b8c4ba2b5..000000000 --- a/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php +++ /dev/null @@ -1,103 +0,0 @@ -withHeader(new AggregateHeader( - 'profile', - '1', - 1, - new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'), - )); - - $nativeSerializer = new PhpNativeMessageSerializer(); - - $content = $nativeSerializer->serialize($message); - - self::assertEquals('Tzo0MToiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UiOjI6e3M6NTA6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBoZWFkZXJzIjthOjE6e3M6NTA6IlBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xBZ2dyZWdhdGVcQWdncmVnYXRlSGVhZGVyIjtPOjUwOiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcQWdncmVnYXRlXEFnZ3JlZ2F0ZUhlYWRlciI6NDp7czoxMzoiYWdncmVnYXRlTmFtZSI7czo3OiJwcm9maWxlIjtzOjExOiJhZ2dyZWdhdGVJZCI7czoxOiIxIjtzOjg6InBsYXloZWFkIjtpOjE7czoxMDoicmVjb3JkZWRPbiI7TzoxNzoiRGF0ZVRpbWVJbW11dGFibGUiOjM6e3M6NDoiZGF0ZSI7czoyNjoiMjAyMC0wMS0wMSAyMDowMDowMC4wMDAwMDAiO3M6MTM6InRpbWV6b25lX3R5cGUiO2k6MTtzOjg6InRpbWV6b25lIjtzOjY6IiswMTowMCI7fX19czo0ODoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAGV2ZW50IjtPOjU4OiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVWaXNpdGVkIjoxOntzOjk6InZpc2l0b3JJZCI7Tzo1MzoiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXFRlc3RzXFVuaXRcRml4dHVyZVxQcm9maWxlSWQiOjE6e3M6NTc6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVJZABpZCI7czozOiJmb28iO319fQ==', $content); - } - - public function testDeserialize(): void - { - $event = new ProfileVisited(ProfileId::fromString('foo')); - $nativeSerializer = new PhpNativeMessageSerializer(); - - $message = $nativeSerializer->deserialize('Tzo0MToiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UiOjI6e3M6NTA6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBoZWFkZXJzIjthOjE6e3M6NTA6IlBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xBZ2dyZWdhdGVcQWdncmVnYXRlSGVhZGVyIjtPOjUwOiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcQWdncmVnYXRlXEFnZ3JlZ2F0ZUhlYWRlciI6NDp7czoxMzoiYWdncmVnYXRlTmFtZSI7czo3OiJwcm9maWxlIjtzOjExOiJhZ2dyZWdhdGVJZCI7czoxOiIxIjtzOjg6InBsYXloZWFkIjtpOjE7czoxMDoicmVjb3JkZWRPbiI7TzoxNzoiRGF0ZVRpbWVJbW11dGFibGUiOjM6e3M6NDoiZGF0ZSI7czoyNjoiMjAyMC0wMS0wMSAyMDowMDowMC4wMDAwMDAiO3M6MTM6InRpbWV6b25lX3R5cGUiO2k6MTtzOjg6InRpbWV6b25lIjtzOjY6IiswMTowMCI7fX19czo0ODoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAGV2ZW50IjtPOjU4OiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVWaXNpdGVkIjoxOntzOjk6InZpc2l0b3JJZCI7Tzo1MzoiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXFRlc3RzXFVuaXRcRml4dHVyZVxQcm9maWxlSWQiOjE6e3M6NTc6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVJZABpZCI7czozOiJmb28iO319fQ=='); - - self::assertEquals($event, $message->event()); - self::assertEquals( - [ - new AggregateHeader( - 'profile', - '1', - 1, - new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'), - ), - ], - $message->headers(), - ); - } - - public function testDeserializeDecodeFailed(): void - { - $this->expectException(DeserializeFailed::class); - - $nativeSerializer = new PhpNativeMessageSerializer(); - - $nativeSerializer->deserialize('!@#%$^&*()'); - } - - public function testDeserializeNotAMessage(): void - { - $this->expectException(DeserializeFailed::class); - - $nativeSerializer = new PhpNativeMessageSerializer(); - - $nativeSerializer->deserialize('Tzo4OiJzdGRDbGFzcyI6MDp7fQ=='); - } - - public function testEquals(): void - { - $event = new ProfileVisited( - ProfileId::fromString('foo'), - ); - - $message = Message::create($event) - ->withHeader(new AggregateHeader( - 'profile', - '1', - 1, - new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'), - )); - - $nativeSerializer = new PhpNativeMessageSerializer(); - - $content = $nativeSerializer->serialize($message); - $clonedMessage = $nativeSerializer->deserialize($content); - - self::assertEquals($message, $clonedMessage); - } -} diff --git a/tests/Unit/Outbox/DoctrineOutboxStoreTest.php b/tests/Unit/Outbox/DoctrineOutboxStoreTest.php deleted file mode 100644 index 8ca1dcfc0..000000000 --- a/tests/Unit/Outbox/DoctrineOutboxStoreTest.php +++ /dev/null @@ -1,258 +0,0 @@ -withHeader(new AggregateHeader('profile', '1', 1, $recordedOn)); - - $innerMockedConnection = $this->prophesize(Connection::class); - $innerMockedConnection->insert( - 'outbox', - ['message' => 'serialized'], - )->shouldBeCalledOnce(); - - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - - $mockedConnection = $this->prophesize(Connection::class); - $mockedConnection->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()) - ); - - $serializer = $this->prophesize(MessageSerializer::class); - $serializer->serialize($message)->shouldBeCalledOnce()->willReturn('serialized'); - - $doctrineOutboxStore = new DoctrineOutboxStore( - $mockedConnection->reveal(), - $serializer->reveal(), - ); - - $doctrineOutboxStore->saveOutboxMessage($message); - } - - public function testMarkOutboxMessageConsumed(): void - { - $recordedOn = new DateTimeImmutable(); - $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new AggregateHeader('profile', '1', 1, $recordedOn)) - ->withHeader(new OutboxHeader(42)); - - $innerMockedConnection = $this->prophesize(Connection::class); - $innerMockedConnection->delete( - 'outbox', - ['id' => 42], - )->shouldBeCalledOnce(); - - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - - $mockedConnection = $this->prophesize(Connection::class); - $mockedConnection->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()) - ); - - $serializer = $this->prophesize(MessageSerializer::class); - - $doctrineOutboxStore = new DoctrineOutboxStore( - $mockedConnection->reveal(), - $serializer->reveal(), - ); - - $doctrineOutboxStore->markOutboxMessageConsumed($message); - } - - public function testMarkOutboxMessageConsumedHeaderMissing(): void - { - $recordedOn = new DateTimeImmutable(); - $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) - ->withHeader(new AggregateHeader('profile', '1', 1, $recordedOn)); - - $innerMockedConnection = $this->prophesize(Connection::class); - $innerMockedConnection->delete( - 'outbox', - ['id' => 42], - )->shouldNotBeCalled(); - - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - - $mockedConnection = $this->prophesize(Connection::class); - $mockedConnection->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()) - ); - - $serializer = $this->prophesize(MessageSerializer::class); - - $doctrineOutboxStore = new DoctrineOutboxStore( - $mockedConnection->reveal(), - $serializer->reveal(), - ); - - $this->expectException(OutboxHeaderIssue::class); - $this->expectExceptionMessage('missing header "outboxIdentifier"'); - $doctrineOutboxStore->markOutboxMessageConsumed($message); - } - - public function testCountOutboxMessages(): void - { - $queryBuilder = $this->prophesize(QueryBuilder::class); - $queryBuilder->select('COUNT(*)')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->from('outbox')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->getSQL()->shouldBeCalledOnce()->willReturn('this sql'); - - $connection = $this->prophesize(Connection::class); - $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $connection->fetchOne('this sql')->shouldBeCalledOnce()->willReturn('1'); - - $serializer = $this->prophesize(MessageSerializer::class); - - $doctrineOutboxStore = new DoctrineOutboxStore( - $connection->reveal(), - $serializer->reveal(), - ); - - $result = $doctrineOutboxStore->countOutboxMessages(); - self::assertSame(1, $result); - } - - public function testCountOutboxMessagesFailure(): void - { - $queryBuilder = $this->prophesize(QueryBuilder::class); - $queryBuilder->select('COUNT(*)')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->from('outbox')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->getSQL()->shouldBeCalledOnce()->willReturn('this sql'); - - $connection = $this->prophesize(Connection::class); - $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $connection->fetchOne('this sql')->shouldBeCalledOnce()->willReturn([]); - - $serializer = $this->prophesize(MessageSerializer::class); - - $doctrineOutboxStore = new DoctrineOutboxStore( - $connection->reveal(), - $serializer->reveal(), - ); - - $this->expectException(WrongQueryResult::class); - $doctrineOutboxStore->countOutboxMessages(); - } - - public function testRetrieveOutboxMessagesNoResult(): void - { - $queryBuilder = $this->prophesize(QueryBuilder::class); - $queryBuilder->select('*')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->from('outbox')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->setMaxResults(null)->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->getSQL()->shouldBeCalledOnce()->willReturn('this sql'); - - $connection = $this->prophesize(Connection::class); - $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $connection->fetchAllAssociative('this sql')->shouldBeCalledOnce()->willReturn([]); - - $serializer = $this->prophesize(MessageSerializer::class); - - $doctrineOutboxStore = new DoctrineOutboxStore( - $connection->reveal(), - $serializer->reveal(), - ); - - $messages = $doctrineOutboxStore->retrieveOutboxMessages(); - self::assertSame([], $messages); - } - - public function testRetrieveOutboxMessages(): void - { - $recordedOn = new DateTimeImmutable(); - $event = new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s')); - $message = Message::create($event) - ->withHeader(new AggregateHeader('profile', '1', 1, $recordedOn)) - ->withHeader(new OutboxHeader(42)); - - $queryBuilder = $this->prophesize(QueryBuilder::class); - $queryBuilder->select('*')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->from('outbox')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->setMaxResults(null)->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $queryBuilder->getSQL()->shouldBeCalledOnce()->willReturn('this sql'); - - $connection = $this->prophesize(Connection::class); - $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); - $connection->fetchAllAssociative('this sql')->shouldBeCalledOnce()->willReturn([ - [ - 'id' => 42, - 'message' => 'serialized', - ], - ]); - - $serializer = $this->prophesize(MessageSerializer::class); - $serializer->deserialize('serialized')->shouldBeCalledOnce()->willReturn($message); - - $doctrineOutboxStore = new DoctrineOutboxStore( - $connection->reveal(), - $serializer->reveal(), - ); - - $messages = $doctrineOutboxStore->retrieveOutboxMessages(); - self::assertEquals([$message], $messages); - } - - public function testConfigureSchema(): void - { - $connection = $this->prophesize(Connection::class); - $serializer = $this->prophesize(MessageSerializer::class); - - $doctrineOutboxStore = new DoctrineOutboxStore( - $connection->reveal(), - $serializer->reveal(), - ); - - $expectedSchema = new Schema(); - $table = $expectedSchema->createTable('outbox'); - - $table->addColumn('id', Types::INTEGER) - ->setNotnull(true) - ->setAutoincrement(true); - $table->addColumn('message', Types::STRING) - ->setNotnull(true) - ->setLength(16_000); - - $table->setPrimaryKey(['id']); - - $schema = new Schema(); - $doctrineOutboxStore->configureSchema($schema, $connection->reveal()); - - $this->assertEquals($expectedSchema, $schema); - } -} diff --git a/tests/Unit/Outbox/EventBusPublisherTest.php b/tests/Unit/Outbox/EventBusPublisherTest.php deleted file mode 100644 index 13961e20f..000000000 --- a/tests/Unit/Outbox/EventBusPublisherTest.php +++ /dev/null @@ -1,36 +0,0 @@ -prophesize(Consumer::class); - $eventBus->consume($message)->shouldBeCalled(); - - $publisher = new EventBusPublisher($eventBus->reveal()); - $publisher->publish($message); - } -} diff --git a/tests/Unit/Outbox/OutboxEventBusTest.php b/tests/Unit/Outbox/OutboxEventBusTest.php deleted file mode 100644 index 60b342f8c..000000000 --- a/tests/Unit/Outbox/OutboxEventBusTest.php +++ /dev/null @@ -1,59 +0,0 @@ -prophesize(OutboxStore::class); - $store->saveOutboxMessage($message)->shouldBeCalled(); - - $eventBus = new OutboxEventBus($store->reveal()); - $eventBus->dispatch($message); - } - - public function testDispatchMultipleMessages(): void - { - $message1 = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $message2 = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $store = $this->prophesize(OutboxStore::class); - $store->saveOutboxMessage($message1, $message2)->shouldBeCalled(); - - $eventBus = new OutboxEventBus($store->reveal()); - $eventBus->dispatch($message1, $message2); - } -} diff --git a/tests/Unit/Outbox/OutboxHeaderIssueTest.php b/tests/Unit/Outbox/OutboxHeaderIssueTest.php deleted file mode 100644 index c9e4e2677..000000000 --- a/tests/Unit/Outbox/OutboxHeaderIssueTest.php +++ /dev/null @@ -1,28 +0,0 @@ -getMessage()); - self::assertSame(0, $error->getCode()); - } - - public function testInvalidHeaderType(): void - { - $error = OutboxHeaderIssue::invalidHeaderType('foo'); - - self::assertSame('Invalid header given: need type "int" got "string"', $error->getMessage()); - self::assertSame(0, $error->getCode()); - } -} diff --git a/tests/Unit/Outbox/OutboxHeaderTest.php b/tests/Unit/Outbox/OutboxHeaderTest.php deleted file mode 100644 index 69252e384..000000000 --- a/tests/Unit/Outbox/OutboxHeaderTest.php +++ /dev/null @@ -1,19 +0,0 @@ -id); - } -} diff --git a/tests/Unit/Outbox/StoreOutboxConsumerTest.php b/tests/Unit/Outbox/StoreOutboxConsumerTest.php deleted file mode 100644 index 70a3262b1..000000000 --- a/tests/Unit/Outbox/StoreOutboxConsumerTest.php +++ /dev/null @@ -1,61 +0,0 @@ -prophesize(OutboxStore::class); - $store->retrieveOutboxMessages(null)->willReturn([$message]); - $store->markOutboxMessageConsumed($message)->shouldBeCalled(); - - $eventBus = $this->prophesize(OutboxPublisher::class); - $eventBus->publish($message)->shouldBeCalled(); - - $consumer = new StoreOutboxProcessor($store->reveal(), $eventBus->reveal()); - $consumer->process(); - } - - public function testConsumeWithLimit(): void - { - $message = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $store = $this->prophesize(OutboxStore::class); - $store->retrieveOutboxMessages(100)->willReturn([$message]); - $store->markOutboxMessageConsumed($message)->shouldBeCalled(); - - $eventBus = $this->prophesize(OutboxPublisher::class); - $eventBus->publish($message)->shouldBeCalled(); - - $consumer = new StoreOutboxProcessor($store->reveal(), $eventBus->reveal()); - $consumer->process(100); - } -}