From cb506a0d073ffd2949239465b4ab7db5528be59c Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 27 Mar 2024 15:00:34 +0100 Subject: [PATCH 1/9] Add more tests, fix some other issues, remove unneeded code --- src/Store/DoctrineDbalStore.php | 41 +- .../BasicImplementation/Header/BazHeader.php | 2 +- .../BasicImplementation/Header/FooHeader.php | 2 +- .../MessageDecorator/FooMessageDecorator.php | 4 +- tests/Unit/Fixture/Header/BazHeader.php | 17 + tests/Unit/Fixture/Header/FooHeader.php | 17 + tests/Unit/Fixture/ProfileEmailChanged.php | 20 + tests/Unit/Store/ArrayStreamTest.php | 25 + tests/Unit/Store/DoctrineDbalStoreTest.php | 737 +++++++++++++++++- 9 files changed, 829 insertions(+), 36 deletions(-) create mode 100644 tests/Unit/Fixture/Header/BazHeader.php create mode 100644 tests/Unit/Fixture/Header/FooHeader.php create mode 100644 tests/Unit/Fixture/ProfileEmailChanged.php diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 601076bf4..92954270b 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -214,14 +214,7 @@ function (Connection $connection) use ($messages): void { continue; } - $query = sprintf( - "INSERT INTO %s (%s) VALUES\n(%s)", - $this->storeTableName, - implode(', ', $columns), - implode("),\n(", $placeholders), - ); - - $connection->executeStatement($query, $parameters, $types); + $this->executeSave($columns, $placeholders, $parameters, $types, $connection); $parameters = []; $placeholders = []; @@ -234,18 +227,7 @@ function (Connection $connection) use ($messages): void { return; } - $query = sprintf( - "INSERT INTO %s (%s) VALUES\n(%s)", - $this->storeTableName, - implode(', ', $columns), - implode("),\n(", $placeholders), - ); - - try { - $connection->executeStatement($query, $parameters, $types); - } catch (UniqueConstraintViolationException $e) { - throw new UniqueConstraintViolation($e); - } + $this->executeSave($columns, $placeholders, $parameters, $types, $connection); }, ); } @@ -288,8 +270,7 @@ public function configureSchema(Schema $schema, Connection $connection): void $table = $schema->createTable($this->storeTableName); $table->addColumn('id', Types::BIGINT) - ->setAutoincrement(true) - ->setNotnull(true); + ->setAutoincrement(true); $table->addColumn('aggregate', Types::STRING) ->setLength(255) ->setNotnull(true); @@ -390,4 +371,20 @@ private function createTriggerFunctionName(): string return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]); } + + private function executeSave(array $columns, array $placeholders, array $parameters, array $types, Connection $connection): void + { + $query = sprintf( + "INSERT INTO %s (%s) VALUES\n(%s)", + $this->storeTableName, + implode(', ', $columns), + implode("),\n(", $placeholders), + ); + + try { + $connection->executeStatement($query, $parameters, $types); + } catch (UniqueConstraintViolationException $e) { + throw new UniqueConstraintViolation($e); + } + } } diff --git a/tests/Integration/BasicImplementation/Header/BazHeader.php b/tests/Integration/BasicImplementation/Header/BazHeader.php index 18a9941f3..c62394195 100644 --- a/tests/Integration/BasicImplementation/Header/BazHeader.php +++ b/tests/Integration/BasicImplementation/Header/BazHeader.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Integration\BasicImplementation\Header; +namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Header; use Patchlevel\EventSourcing\Attribute\Header; diff --git a/tests/Integration/BasicImplementation/Header/FooHeader.php b/tests/Integration/BasicImplementation/Header/FooHeader.php index 6f8206b65..03264b84f 100644 --- a/tests/Integration/BasicImplementation/Header/FooHeader.php +++ b/tests/Integration/BasicImplementation/Header/FooHeader.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Integration\BasicImplementation\Header; +namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Header; use Patchlevel\EventSourcing\Attribute\Header; diff --git a/tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php b/tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php index d1b26f48a..44e304c16 100644 --- a/tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php +++ b/tests/Integration/BasicImplementation/MessageDecorator/FooMessageDecorator.php @@ -4,10 +4,10 @@ namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\MessageDecorator; -use Integration\BasicImplementation\Header\BazHeader; -use Integration\BasicImplementation\Header\FooHeader; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator; +use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Header\BazHeader; +use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Header\FooHeader; final class FooMessageDecorator implements MessageDecorator { diff --git a/tests/Unit/Fixture/Header/BazHeader.php b/tests/Unit/Fixture/Header/BazHeader.php new file mode 100644 index 000000000..fb8e6947a --- /dev/null +++ b/tests/Unit/Fixture/Header/BazHeader.php @@ -0,0 +1,17 @@ +close(); $stream->index(); } + + public function testPositionEmpty(): void + { + $stream = new ArrayStream([]); + $position = $stream->position(); + + self::assertNull($position); + } + + public function testPosition(): void + { + $message = Message::create( + new ProfileCreated( + ProfileId::fromString('foo'), + Email::fromString('info@patchlevel.de'), + ), + ); + + $messages = [$message]; + + $stream = new ArrayStream($messages); + $position = $stream->position(); + + self::assertSame(0, $position); + } } diff --git a/tests/Unit/Store/DoctrineDbalStoreTest.php b/tests/Unit/Store/DoctrineDbalStoreTest.php index b03b980a9..aa61c062d 100644 --- a/tests/Unit/Store/DoctrineDbalStoreTest.php +++ b/tests/Unit/Store/DoctrineDbalStoreTest.php @@ -8,10 +8,16 @@ use DateTimeImmutable; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Driver; +use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Platforms\AbstractPlatform; +use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Query\SelectQuery; use Doctrine\DBAL\Result; +use Doctrine\DBAL\Schema\Column; +use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Schema\Table; +use Doctrine\DBAL\SQL\Builder\DefaultSelectSQLBuilder; use Doctrine\DBAL\SQL\Builder\SelectSQLBuilder; use Doctrine\DBAL\Statement; use Doctrine\DBAL\Types\Type; @@ -24,9 +30,17 @@ use Patchlevel\EventSourcing\Serializer\SerializedEvent; use Patchlevel\EventSourcing\Store\CriteriaBuilder; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; +use Patchlevel\EventSourcing\Store\MissingDataForStorage; +use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; +use Patchlevel\EventSourcing\Store\WrongQueryResult; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\EmailChanged; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\Header\BazHeader; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\Header\FooHeader; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileEmailChanged; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; +use PDO; use PHPUnit\Framework\Attributes\RequiresPhp; use PHPUnit\Framework\TestCase; use Prophecy\Argument; @@ -57,13 +71,55 @@ public function testLoadWithNoEvents(): void )->willReturn($result->reveal()); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $selectSqlBuilder = $this->prophesize(SelectSQLBuilder::class); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); - $selectSqlBuilder->buildSQL(Argument::type(SelectQuery::class)) - ->willReturn('SELECT * FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived) ORDER BY id ASC'); + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + $queryBuilder = new QueryBuilder($connection->reveal()); + $connection->createQueryBuilder()->willReturn($queryBuilder); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + 'eventstore', + ); + + $stream = $doctrineDbalStore->load( + (new CriteriaBuilder()) + ->aggregateName('profile') + ->aggregateId('1') + ->fromPlayhead(0) + ->archived(false) + ->build(), + ); + + self::assertSame(null, $stream->index()); + self::assertSame(null, $stream->position()); + } + + public function testLoadWithLimitAndOffsetAndIndex(): void + { + $connection = $this->prophesize(Connection::class); + $result = $this->prophesize(Result::class); + $result->iterateAssociative()->willReturn(new EmptyIterator()); - $abstractPlatform->createSelectSQLBuilder() - ->willReturn($selectSqlBuilder->reveal()); + $connection->executeQuery( + 'SELECT * FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived) AND (id > :index) ORDER BY id ASC LIMIT 10 OFFSET 5', + [ + 'aggregate' => 'profile', + 'id' => '1', + 'playhead' => 0, + 'archived' => false, + 'index' => 1, + ], + Argument::type('array'), + )->willReturn($result->reveal()); + + $abstractPlatform = $this->prophesize(AbstractPlatform::class); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); $queryBuilder = new QueryBuilder($connection->reveal()); @@ -85,7 +141,10 @@ public function testLoadWithNoEvents(): void ->aggregateId('1') ->fromPlayhead(0) ->archived(false) + ->fromIndex(1) ->build(), + 10, + 5 ); self::assertSame(null, $stream->index()); @@ -125,12 +184,8 @@ public function testLoadWithOneEvent(): void )->willReturn($result->reveal()); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $selectSqlBuilder = $this->prophesize(SelectSQLBuilder::class); - - $selectSqlBuilder->buildSQL(Argument::type(SelectQuery::class)) - ->willReturn('SELECT * FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived) ORDER BY id ASC'); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn($selectSqlBuilder->reveal()); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); $abstractPlatform->getDateTimeTzFormatString()->shouldBeCalledOnce()->willReturn('Y-m-d H:i:s'); $abstractPlatform->convertFromBoolean('0')->shouldBeCalledTimes(2)->willReturn(false); @@ -183,6 +238,114 @@ public function testLoadWithOneEvent(): void self::assertSame(0, $stream->position()); } + public function testLoadWithTwoEvents(): void + { + $connection = $this->prophesize(Connection::class); + $result = $this->prophesize(Result::class); + $result->iterateAssociative()->willReturn(new ArrayIterator( + [ + [ + 'id' => 1, + 'aggregate' => 'profile', + 'aggregate_id' => '1', + 'playhead' => '1', + 'event' => 'profile.created', + 'payload' => '{"profileId": "1", "email": "s"}', + 'recorded_on' => '2021-02-17 10:00:00', + 'archived' => '0', + 'new_stream_start' => '0', + 'custom_headers' => '[]', + ], + [ + 'id' => 2, + 'aggregate' => 'profile', + 'aggregate_id' => '1', + 'playhead' => '2', + 'event' => 'profile.email_changed', + 'payload' => '{"profileId": "1", "email": "d"}', + 'recorded_on' => '2021-02-17 11:00:00', + 'archived' => '0', + 'new_stream_start' => '0', + 'custom_headers' => '[]', + ], + ], + )); + + $connection->executeQuery( + 'SELECT * FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived) ORDER BY id ASC', + [ + 'aggregate' => 'profile', + 'id' => '1', + 'playhead' => 0, + 'archived' => false, + ], + Argument::type('array'), + )->willReturn($result->reveal()); + + $abstractPlatform = $this->prophesize(AbstractPlatform::class); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->getDateTimeTzFormatString()->shouldBeCalledTimes(2)->willReturn('Y-m-d H:i:s'); + $abstractPlatform->convertFromBoolean('0')->shouldBeCalledTimes(4)->willReturn(false); + + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + + $queryBuilder = new QueryBuilder($connection->reveal()); + $connection->createQueryBuilder()->willReturn($queryBuilder); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->deserialize( + new SerializedEvent('profile.created', '{"profileId": "1", "email": "s"}'), + )->willReturn(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))); + $eventSerializer->deserialize( + new SerializedEvent('profile.email_changed', '{"profileId": "1", "email": "d"}'), + )->willReturn(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('d'))); + + $headersSerializer = $this->prophesize(HeadersSerializer::class); + $headersSerializer->deserialize([])->willReturn([]); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + 'eventstore', + ); + + $stream = $doctrineDbalStore->load( + (new CriteriaBuilder()) + ->aggregateName('profile') + ->aggregateId('1') + ->fromPlayhead(0) + ->archived(false) + ->build(), + ); + + self::assertSame(1, $stream->index()); + self::assertSame(0, $stream->position()); + + $message = $stream->current(); + + self::assertSame(1, $stream->index()); + self::assertSame(0, $stream->position()); + + self::assertInstanceOf(Message::class, $message); + self::assertInstanceOf(ProfileCreated::class, $message->event()); + self::assertSame('1', $message->header(AggregateHeader::class)->aggregateId); + self::assertSame(1, $message->header(AggregateHeader::class)->playhead); + self::assertEquals(new DateTimeImmutable('2021-02-17 10:00:00'), $message->header(AggregateHeader::class)->recordedOn); + + $stream->next(); + $message = $stream->current(); + + self::assertSame(2, $stream->index()); + self::assertSame(1, $stream->position()); + + self::assertInstanceOf(Message::class, $message); + self::assertInstanceOf(ProfileEmailChanged::class, $message->event()); + self::assertSame('1', $message->header(AggregateHeader::class)->aggregateId); + self::assertSame(2, $message->header(AggregateHeader::class)->playhead); + self::assertEquals(new DateTimeImmutable('2021-02-17 11:00:00'), $message->header(AggregateHeader::class)->recordedOn); + } + public function testTransactional(): void { $callback = static function (): void { @@ -252,6 +415,560 @@ public function testSaveWithOneEvent(): void $singleTableStore->save($message); } + public function testSaveWithoutAggregateHeader(): void + { + $recordedOn = new DateTimeImmutable(); + $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))); + + $innerMockedConnection = $this->prophesize(Connection::class); + + $innerMockedConnection->executeStatement( + "INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", + ['profile', '1', 1, 'profile_created', '', $recordedOn, false, false, []], + [ + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 6 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 8 => Type::getType(Types::JSON), + ], + )->shouldNotBeCalled(); + + $driver = $this->prophesize(Driver::class); + $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); + + $headersSerializer = $this->prophesize(HeadersSerializer::class); + $headersSerializer->serialize([])->willReturn([]); + + $mockedConnection = $this->prophesize(Connection::class); + $mockedConnection->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()), + ); + + $singleTableStore = new DoctrineDbalStore( + $mockedConnection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + 'eventstore', + ); + + $this->expectException(MissingDataForStorage::class); + $singleTableStore->save($message); + } + + public function testSaveWithTwoEvents(): void + { + $recordedOn = new DateTimeImmutable(); + $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 1, + $recordedOn, + )); + $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('d'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 2, + $recordedOn, + )); + + $innerMockedConnection = $this->prophesize(Connection::class); + + $innerMockedConnection->executeStatement( + "INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", + [ + 'profile', '1', 1, 'profile_created', '', $recordedOn, false, false, [], + 'profile', '1', 2, 'profile_email_changed', '', $recordedOn, false, false, [], + ], + [ + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 6 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 8 => Type::getType(Types::JSON), + 14 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 15 => Type::getType(Types::BOOLEAN), + 16 => Type::getType(Types::BOOLEAN), + 17 => Type::getType(Types::JSON), + ], + )->shouldBeCalledOnce(); + + $driver = $this->prophesize(Driver::class); + $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->serialize($message1->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); + $eventSerializer->serialize($message2->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_email_changed', '')); + + $headersSerializer = $this->prophesize(HeadersSerializer::class); + $headersSerializer->serialize([])->willReturn([]); + + $mockedConnection = $this->prophesize(Connection::class); + $mockedConnection->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()), + ); + + $singleTableStore = new DoctrineDbalStore( + $mockedConnection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + ); + $singleTableStore->save($message1, $message2); + } + + public function testSaveWithUniqueConstraintViolation(): void + { + $recordedOn = new DateTimeImmutable(); + $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 1, + $recordedOn, + )); + $message2 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 1, + $recordedOn, + )); + + $innerMockedConnection = $this->prophesize(Connection::class); + + $innerMockedConnection->executeStatement( + "INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", + [ + 'profile', '1', 1, 'profile_created', '', $recordedOn, false, false, [], + 'profile', '1', 1, 'profile_created', '', $recordedOn, false, false, [], + ], + [ + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 6 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 8 => Type::getType(Types::JSON), + 14 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 15 => Type::getType(Types::BOOLEAN), + 16 => Type::getType(Types::BOOLEAN), + 17 => Type::getType(Types::JSON), + ], + )->shouldBeCalledOnce()->willThrow(UniqueConstraintViolationException::class); + + $driver = $this->prophesize(Driver::class); + $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->serialize($message1->event())->shouldBeCalledTimes(2)->willReturn(new SerializedEvent('profile_created', '')); + + $headersSerializer = $this->prophesize(HeadersSerializer::class); + $headersSerializer->serialize([])->willReturn([]); + + $mockedConnection = $this->prophesize(Connection::class); + $mockedConnection->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()), + ); + + $singleTableStore = new DoctrineDbalStore( + $mockedConnection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + ); + + $this->expectException(UniqueConstraintViolation::class); + $singleTableStore->save($message1, $message2); + } + + public function testSaveWithThousandEvents(): void + { + $recordedOn = new DateTimeImmutable(); + + $messages = []; + for ($i = 1; $i <= 10000; $i++) { + $messages[] = Message::create(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('s'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + $i, + $recordedOn, + )); + } + + $innerMockedConnection = $this->prophesize(Connection::class); + + $innerMockedConnection->executeStatement(Argument::any(), Argument::any(), Argument::any()) + ->shouldBeCalledTimes(2); + + $driver = $this->prophesize(Driver::class); + $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->serialize($messages[0]->event())->shouldBeCalledTimes(10000)->willReturn(new SerializedEvent('profile_email_changed', '')); + + $headersSerializer = $this->prophesize(HeadersSerializer::class); + $headersSerializer->serialize([])->willReturn([]); + + $mockedConnection = $this->prophesize(Connection::class); + $mockedConnection->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()), + ); + + $singleTableStore = new DoctrineDbalStore( + $mockedConnection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + ); + $singleTableStore->save(...$messages); + } + + public function testSaveWithCustomHeaders(): void + { + $customHeaders = [ + new FooHeader('foo'), + new BazHeader('baz') + ]; + + $recordedOn = new DateTimeImmutable(); + $message = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 1, + $recordedOn, + )) + ->withHeaders($customHeaders) + ; + + $innerMockedConnection = $this->prophesize(Connection::class); + + $innerMockedConnection->executeStatement( + "INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", + ['profile', '1', 1, 'profile_created', '', $recordedOn, false, false, []], + [ + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 6 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 8 => Type::getType(Types::JSON), + ], + )->shouldBeCalledOnce(); + + $driver = $this->prophesize(Driver::class); + $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); + + $headersSerializer = $this->prophesize(HeadersSerializer::class); + $headersSerializer->serialize($customHeaders)->willReturn([]); + + $mockedConnection = $this->prophesize(Connection::class); + $mockedConnection->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()), + ); + + $singleTableStore = new DoctrineDbalStore( + $mockedConnection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + 'eventstore', + ); + $singleTableStore->save($message); + } + + public function testCount(): void + { + $connection = $this->prophesize(Connection::class); + $connection->fetchOne( + 'SELECT COUNT(*) FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived)', + [ + 'aggregate' => 'profile', + 'id' => '1', + 'playhead' => 0, + 'archived' => false, + ], + Argument::type('array'), + )->willReturn('1'); + + $abstractPlatform = $this->prophesize(AbstractPlatform::class); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + + $queryBuilder = new QueryBuilder($connection->reveal()); + $connection->createQueryBuilder()->willReturn($queryBuilder); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal() + ); + + $count = $doctrineDbalStore->count( + (new CriteriaBuilder()) + ->aggregateName('profile') + ->aggregateId('1') + ->fromPlayhead(0) + ->archived(false) + ->build(), + ); + + self::assertSame(1, $count); + } + + public function testCountWrongResult(): void + { + $connection = $this->prophesize(Connection::class); + $connection->fetchOne( + 'SELECT COUNT(*) FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived)', + [ + 'aggregate' => 'profile', + 'id' => '1', + 'playhead' => 0, + 'archived' => false, + ], + Argument::type('array'), + )->willReturn([]); + + $abstractPlatform = $this->prophesize(AbstractPlatform::class); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + + $queryBuilder = new QueryBuilder($connection->reveal()); + $connection->createQueryBuilder()->willReturn($queryBuilder); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal() + ); + + + $this->expectException(WrongQueryResult::class); + $doctrineDbalStore->count( + (new CriteriaBuilder()) + ->aggregateName('profile') + ->aggregateId('1') + ->fromPlayhead(0) + ->archived(false) + ->build(), + ); + } + + public function testSetupSubscription(): void + { + $connection = $this->prophesize(Connection::class); + $connection->executeStatement( + <<<'SQL' + CREATE OR REPLACE FUNCTION notify_eventstore() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('eventstore', 'update'); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + SQL, + )->shouldBeCalledOnce()->willReturn(1); + $connection->executeStatement("DROP TRIGGER IF EXISTS notify_trigger ON eventstore;") + ->shouldBeCalledOnce()->willReturn(1); + $connection->executeStatement("CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON eventstore FOR EACH ROW EXECUTE PROCEDURE notify_eventstore();") + ->shouldBeCalledOnce()->willReturn(1); + + $abstractPlatform = $this->prophesize(PostgreSQLPlatform::class); + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal() + ); + $doctrineDbalStore->setupSubscription(); + + } + + public function testSetupSubscriptionWithOtherStoreTableName(): void + { + $connection = $this->prophesize(Connection::class); + $connection->executeStatement( + <<<'SQL' + CREATE OR REPLACE FUNCTION new.notify_eventstore() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('new.eventstore', 'update'); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + SQL, + )->shouldBeCalledOnce()->willReturn(1); + $connection->executeStatement("DROP TRIGGER IF EXISTS notify_trigger ON new.eventstore;") + ->shouldBeCalledOnce()->willReturn(1); + $connection->executeStatement("CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON new.eventstore FOR EACH ROW EXECUTE PROCEDURE new.notify_eventstore();") + ->shouldBeCalledOnce()->willReturn(1); + + $abstractPlatform = $this->prophesize(PostgreSQLPlatform::class); + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + 'new.eventstore' + ); + $doctrineDbalStore->setupSubscription(); + + } + public function testSetupSubscriptionNotPostgres(): void + { + $connection = $this->prophesize(Connection::class); + $connection->executeStatement( + <<<'SQL' + CREATE OR REPLACE FUNCTION notify_eventstore() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('eventstore'); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + SQL, + )->shouldNotBeCalled(); + $connection->executeStatement("DROP TRIGGER IF EXISTS notify_trigger ON eventstore;") + ->shouldNotBeCalled(); + $connection->executeStatement("CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON eventstore FOR EACH ROW EXECUTE PROCEDURE notify_eventstore();") + ->shouldNotBeCalled(); + + $abstractPlatform = $this->prophesize(AbstractPlatform::class); + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal() + ); + $doctrineDbalStore->setupSubscription(); + + } + + public function testWait(): void + { + $nativeConnection = $this->getMockBuilder(PDO::class) + ->disableOriginalConstructor() + ->addMethods(['pgsqlGetNotify']) + ->getMock(); + $nativeConnection + ->expects($this->once()) + ->method('pgsqlGetNotify') + ->with(PDO::FETCH_ASSOC, 100) + ->willReturn([]); + + $connection = $this->prophesize(Connection::class); + $connection->executeStatement('LISTEN "eventstore"') + ->shouldBeCalledOnce() + ->willReturn(1); + $connection->getNativeConnection() + ->shouldBeCalledOnce() + ->willReturn($nativeConnection); + + $abstractPlatform = $this->prophesize(PostgreSQLPlatform::class); + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal() + ); + $doctrineDbalStore->wait(100); + } + + public function testConfigureSchemaWithDifferentConnections(): void + { + $connection = $this->prophesize(Connection::class); + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal() + ); + $schema = new Schema(); + $doctrineDbalStore->configureSchema($schema, $this->prophesize(Connection::class)->reveal()); + + self::assertEquals(new Schema(), $schema); + } + + public function testConfigureSchema(): void + { + $connection = $this->prophesize(Connection::class); + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal() + ); + + $expectedSchema = new Schema(); + $table = $expectedSchema->createTable('eventstore'); + $table->addColumn('id', Types::BIGINT) + ->setAutoincrement(true) + ->setNotnull(true); + $table->addColumn('aggregate', Types::STRING) + ->setLength(255) + ->setNotnull(true); + $table->addColumn('aggregate_id', Types::STRING) + ->setLength(36) + ->setNotnull(true); + $table->addColumn('playhead', Types::INTEGER) + ->setNotnull(true); + $table->addColumn('event', Types::STRING) + ->setLength(255) + ->setNotnull(true); + $table->addColumn('payload', Types::JSON) + ->setNotnull(true); + $table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE) + ->setNotnull(true); + $table->addColumn('new_stream_start', Types::BOOLEAN) + ->setNotnull(true) + ->setDefault(false); + $table->addColumn('archived', Types::BOOLEAN) + ->setNotnull(true) + ->setDefault(false); + $table->addColumn('custom_headers', Types::JSON) + ->setNotnull(true); + + $table->setPrimaryKey(['id']); + $table->addUniqueIndex(['aggregate', 'aggregate_id', 'playhead']); + $table->addIndex(['aggregate', 'aggregate_id', 'playhead', 'archived']); + + + $schema = new Schema(); + $doctrineDbalStore->configureSchema($schema, $connection->reveal()); + + self::assertEquals($expectedSchema, $schema); + } + + #[RequiresPhp('>= 8.2')] public function testArchiveMessages(): void { From f8d45fd2faf5602477195ccb644a3aa109501c08 Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 27 Mar 2024 15:00:53 +0100 Subject: [PATCH 2/9] Update makefile to use psalm plugin, add thread execution, update config --- Makefile | 2 +- infection.json.dist | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 63cd8e000..6c362741b 100644 --- a/Makefile +++ b/Makefile @@ -49,7 +49,7 @@ phpunit-unit: vendor ## run phpu .PHONY: infection infection: vendor ## run infection - vendor/bin/infection + php -d memory_limit=312M vendor/bin/roave-infection-static-analysis-plugin --threads=max .PHONY: deptrac deptrac: vendor-tools ## run deptrac diff --git a/infection.json.dist b/infection.json.dist index 101d5c0fc..afd5cc1d7 100644 --- a/infection.json.dist +++ b/infection.json.dist @@ -6,14 +6,15 @@ }, "logs": { "text": "infection.log", + "html": "infection.html", "stryker": { - "report": "2.0.x" + "report": "3.0.x" } }, "mutators": { "@default": true }, - "minMsi": 55, - "minCoveredMsi": 90, + "minMsi": 62, + "minCoveredMsi": 92, "testFrameworkOptions": "--testsuite=unit" } From bd1467d2dd37b4c89475160e463b8aa147eb4a92 Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 27 Mar 2024 15:01:52 +0100 Subject: [PATCH 3/9] Add thread execution to ci --- .github/workflows/mutation-tests-diff.yml | 2 +- .github/workflows/mutation-tests.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/mutation-tests-diff.yml b/.github/workflows/mutation-tests-diff.yml index 1d8b9d0b5..f7dd2e962 100644 --- a/.github/workflows/mutation-tests-diff.yml +++ b/.github/workflows/mutation-tests-diff.yml @@ -39,4 +39,4 @@ jobs: dependency-versions: ${{ matrix.dependencies }} - name: "Infection" - run: "vendor/bin/roave-infection-static-analysis-plugin --git-diff-lines --git-diff-base=origin/$GITHUB_BASE_REF --ignore-msi-with-no-mutations --only-covered --min-msi=80 --min-covered-msi=95" + run: "vendor/bin/roave-infection-static-analysis-plugin --threads=max --git-diff-lines --git-diff-base=origin/$GITHUB_BASE_REF --ignore-msi-with-no-mutations --only-covered --min-msi=80 --min-covered-msi=95" diff --git a/.github/workflows/mutation-tests.yml b/.github/workflows/mutation-tests.yml index 9598f2a73..93c8b5617 100644 --- a/.github/workflows/mutation-tests.yml +++ b/.github/workflows/mutation-tests.yml @@ -41,6 +41,6 @@ jobs: dependency-versions: ${{ matrix.dependencies }} - name: "Infection" - run: "vendor/bin/roave-infection-static-analysis-plugin" + run: "vendor/bin/roave-infection-static-analysis-plugin --threads=max" env: STRYKER_DASHBOARD_API_KEY: ${{ secrets.STRYKER_DASHBOARD_API_KEY }} From f11a30fb098b0c8c449cb425f7aaacc4b9ba81a8 Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 27 Mar 2024 15:13:45 +0100 Subject: [PATCH 4/9] Fix SA --- baseline.xml | 13 ++++ src/Store/DoctrineDbalStore.php | 6 ++ tests/Unit/Store/DoctrineDbalStoreTest.php | 87 ++++++++++++++-------- 3 files changed, 73 insertions(+), 33 deletions(-) diff --git a/baseline.xml b/baseline.xml index 7adc6fbc3..6ab928c76 100644 --- a/baseline.xml +++ b/baseline.xml @@ -286,6 +286,19 @@ + + + + + + reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + + diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 92954270b..437d0ca27 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -372,6 +372,12 @@ private function createTriggerFunctionName(): string return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]); } + /** + * @param array $columns + * @param array $placeholders + * @param list $parameters + * @param array<0|positive-int, Type> $types + */ private function executeSave(array $columns, array $placeholders, array $parameters, array $types, Connection $connection): void { $query = sprintf( diff --git a/tests/Unit/Store/DoctrineDbalStoreTest.php b/tests/Unit/Store/DoctrineDbalStoreTest.php index aa61c062d..500a52b19 100644 --- a/tests/Unit/Store/DoctrineDbalStoreTest.php +++ b/tests/Unit/Store/DoctrineDbalStoreTest.php @@ -12,13 +12,9 @@ use Doctrine\DBAL\Platforms\AbstractPlatform; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; -use Doctrine\DBAL\Query\SelectQuery; use Doctrine\DBAL\Result; -use Doctrine\DBAL\Schema\Column; use Doctrine\DBAL\Schema\Schema; -use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\SQL\Builder\DefaultSelectSQLBuilder; -use Doctrine\DBAL\SQL\Builder\SelectSQLBuilder; use Doctrine\DBAL\Statement; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; @@ -34,7 +30,6 @@ use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Patchlevel\EventSourcing\Store\WrongQueryResult; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; -use Patchlevel\EventSourcing\Tests\Unit\Fixture\EmailChanged; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Header\BazHeader; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Header\FooHeader; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; @@ -144,7 +139,7 @@ public function testLoadWithLimitAndOffsetAndIndex(): void ->fromIndex(1) ->build(), 10, - 5 + 5, ); self::assertSame(null, $stream->index()); @@ -482,8 +477,24 @@ public function testSaveWithTwoEvents(): void $innerMockedConnection->executeStatement( "INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", [ - 'profile', '1', 1, 'profile_created', '', $recordedOn, false, false, [], - 'profile', '1', 2, 'profile_email_changed', '', $recordedOn, false, false, [], + 'profile', + '1', + 1, + 'profile_created', + '', + $recordedOn, + false, + false, + [], + 'profile', + '1', + 2, + 'profile_email_changed', + '', + $recordedOn, + false, + false, + [], ], [ 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), @@ -544,8 +555,24 @@ public function testSaveWithUniqueConstraintViolation(): void $innerMockedConnection->executeStatement( "INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", [ - 'profile', '1', 1, 'profile_created', '', $recordedOn, false, false, [], - 'profile', '1', 1, 'profile_created', '', $recordedOn, false, false, [], + 'profile', + '1', + 1, + 'profile_created', + '', + $recordedOn, + false, + false, + [], + 'profile', + '1', + 1, + 'profile_created', + '', + $recordedOn, + false, + false, + [], ], [ 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), @@ -631,7 +658,7 @@ public function testSaveWithCustomHeaders(): void { $customHeaders = [ new FooHeader('foo'), - new BazHeader('baz') + new BazHeader('baz'), ]; $recordedOn = new DateTimeImmutable(); @@ -642,8 +669,7 @@ public function testSaveWithCustomHeaders(): void 1, $recordedOn, )) - ->withHeaders($customHeaders) - ; + ->withHeaders($customHeaders); $innerMockedConnection = $this->prophesize(Connection::class); @@ -709,7 +735,7 @@ public function testCount(): void $doctrineDbalStore = new DoctrineDbalStore( $connection->reveal(), $eventSerializer->reveal(), - $headersSerializer->reveal() + $headersSerializer->reveal(), ); $count = $doctrineDbalStore->count( @@ -751,10 +777,9 @@ public function testCountWrongResult(): void $doctrineDbalStore = new DoctrineDbalStore( $connection->reveal(), $eventSerializer->reveal(), - $headersSerializer->reveal() + $headersSerializer->reveal(), ); - $this->expectException(WrongQueryResult::class); $doctrineDbalStore->count( (new CriteriaBuilder()) @@ -779,9 +804,9 @@ public function testSetupSubscription(): void $$ LANGUAGE plpgsql; SQL, )->shouldBeCalledOnce()->willReturn(1); - $connection->executeStatement("DROP TRIGGER IF EXISTS notify_trigger ON eventstore;") + $connection->executeStatement('DROP TRIGGER IF EXISTS notify_trigger ON eventstore;') ->shouldBeCalledOnce()->willReturn(1); - $connection->executeStatement("CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON eventstore FOR EACH ROW EXECUTE PROCEDURE notify_eventstore();") + $connection->executeStatement('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON eventstore FOR EACH ROW EXECUTE PROCEDURE notify_eventstore();') ->shouldBeCalledOnce()->willReturn(1); $abstractPlatform = $this->prophesize(PostgreSQLPlatform::class); @@ -793,10 +818,9 @@ public function testSetupSubscription(): void $doctrineDbalStore = new DoctrineDbalStore( $connection->reveal(), $eventSerializer->reveal(), - $headersSerializer->reveal() + $headersSerializer->reveal(), ); $doctrineDbalStore->setupSubscription(); - } public function testSetupSubscriptionWithOtherStoreTableName(): void @@ -812,9 +836,9 @@ public function testSetupSubscriptionWithOtherStoreTableName(): void $$ LANGUAGE plpgsql; SQL, )->shouldBeCalledOnce()->willReturn(1); - $connection->executeStatement("DROP TRIGGER IF EXISTS notify_trigger ON new.eventstore;") + $connection->executeStatement('DROP TRIGGER IF EXISTS notify_trigger ON new.eventstore;') ->shouldBeCalledOnce()->willReturn(1); - $connection->executeStatement("CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON new.eventstore FOR EACH ROW EXECUTE PROCEDURE new.notify_eventstore();") + $connection->executeStatement('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON new.eventstore FOR EACH ROW EXECUTE PROCEDURE new.notify_eventstore();') ->shouldBeCalledOnce()->willReturn(1); $abstractPlatform = $this->prophesize(PostgreSQLPlatform::class); @@ -827,11 +851,11 @@ public function testSetupSubscriptionWithOtherStoreTableName(): void $connection->reveal(), $eventSerializer->reveal(), $headersSerializer->reveal(), - 'new.eventstore' + 'new.eventstore', ); $doctrineDbalStore->setupSubscription(); - } + public function testSetupSubscriptionNotPostgres(): void { $connection = $this->prophesize(Connection::class); @@ -845,9 +869,9 @@ public function testSetupSubscriptionNotPostgres(): void $$ LANGUAGE plpgsql; SQL, )->shouldNotBeCalled(); - $connection->executeStatement("DROP TRIGGER IF EXISTS notify_trigger ON eventstore;") + $connection->executeStatement('DROP TRIGGER IF EXISTS notify_trigger ON eventstore;') ->shouldNotBeCalled(); - $connection->executeStatement("CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON eventstore FOR EACH ROW EXECUTE PROCEDURE notify_eventstore();") + $connection->executeStatement('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON eventstore FOR EACH ROW EXECUTE PROCEDURE notify_eventstore();') ->shouldNotBeCalled(); $abstractPlatform = $this->prophesize(AbstractPlatform::class); @@ -859,10 +883,9 @@ public function testSetupSubscriptionNotPostgres(): void $doctrineDbalStore = new DoctrineDbalStore( $connection->reveal(), $eventSerializer->reveal(), - $headersSerializer->reveal() + $headersSerializer->reveal(), ); $doctrineDbalStore->setupSubscription(); - } public function testWait(): void @@ -894,7 +917,7 @@ public function testWait(): void $doctrineDbalStore = new DoctrineDbalStore( $connection->reveal(), $eventSerializer->reveal(), - $headersSerializer->reveal() + $headersSerializer->reveal(), ); $doctrineDbalStore->wait(100); } @@ -908,7 +931,7 @@ public function testConfigureSchemaWithDifferentConnections(): void $doctrineDbalStore = new DoctrineDbalStore( $connection->reveal(), $eventSerializer->reveal(), - $headersSerializer->reveal() + $headersSerializer->reveal(), ); $schema = new Schema(); $doctrineDbalStore->configureSchema($schema, $this->prophesize(Connection::class)->reveal()); @@ -925,7 +948,7 @@ public function testConfigureSchema(): void $doctrineDbalStore = new DoctrineDbalStore( $connection->reveal(), $eventSerializer->reveal(), - $headersSerializer->reveal() + $headersSerializer->reveal(), ); $expectedSchema = new Schema(); @@ -961,14 +984,12 @@ public function testConfigureSchema(): void $table->addUniqueIndex(['aggregate', 'aggregate_id', 'playhead']); $table->addIndex(['aggregate', 'aggregate_id', 'playhead', 'archived']); - $schema = new Schema(); $doctrineDbalStore->configureSchema($schema, $connection->reveal()); self::assertEquals($expectedSchema, $schema); } - #[RequiresPhp('>= 8.2')] public function testArchiveMessages(): void { From 612d721b73e5ca024ef81846128b02568f05dbb8 Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 27 Mar 2024 15:18:22 +0100 Subject: [PATCH 5/9] Split up the test --- tests/Unit/Store/DoctrineDbalStoreTest.php | 104 ++++++++++++++++++++- 1 file changed, 99 insertions(+), 5 deletions(-) diff --git a/tests/Unit/Store/DoctrineDbalStoreTest.php b/tests/Unit/Store/DoctrineDbalStoreTest.php index 500a52b19..2770f6d75 100644 --- a/tests/Unit/Store/DoctrineDbalStoreTest.php +++ b/tests/Unit/Store/DoctrineDbalStoreTest.php @@ -95,20 +95,19 @@ public function testLoadWithNoEvents(): void self::assertSame(null, $stream->position()); } - public function testLoadWithLimitAndOffsetAndIndex(): void + public function testLoadWithLimit(): void { $connection = $this->prophesize(Connection::class); $result = $this->prophesize(Result::class); $result->iterateAssociative()->willReturn(new EmptyIterator()); $connection->executeQuery( - 'SELECT * FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived) AND (id > :index) ORDER BY id ASC LIMIT 10 OFFSET 5', + 'SELECT * FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived) ORDER BY id ASC LIMIT 10', [ 'aggregate' => 'profile', 'id' => '1', 'playhead' => 0, 'archived' => false, - 'index' => 1, ], Argument::type('array'), )->willReturn($result->reveal()); @@ -136,10 +135,105 @@ public function testLoadWithLimitAndOffsetAndIndex(): void ->aggregateId('1') ->fromPlayhead(0) ->archived(false) - ->fromIndex(1) ->build(), 10, - 5, + ); + + self::assertSame(null, $stream->index()); + self::assertSame(null, $stream->position()); + } + + public function testLoadWithOffset(): void + { + $connection = $this->prophesize(Connection::class); + $result = $this->prophesize(Result::class); + $result->iterateAssociative()->willReturn(new EmptyIterator()); + + $connection->executeQuery( + 'SELECT * FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived) ORDER BY id ASC OFFSET 5', + [ + 'aggregate' => 'profile', + 'id' => '1', + 'playhead' => 0, + 'archived' => false, + ], + Argument::type('array'), + )->willReturn($result->reveal()); + + $abstractPlatform = $this->prophesize(AbstractPlatform::class); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + $queryBuilder = new QueryBuilder($connection->reveal()); + $connection->createQueryBuilder()->willReturn($queryBuilder); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + 'eventstore', + ); + + $stream = $doctrineDbalStore->load( + (new CriteriaBuilder()) + ->aggregateName('profile') + ->aggregateId('1') + ->fromPlayhead(0) + ->archived(false) + ->build(), + offset: 5, + ); + + self::assertSame(null, $stream->index()); + self::assertSame(null, $stream->position()); + } + + public function testLoadWithIndex(): void + { + $connection = $this->prophesize(Connection::class); + $result = $this->prophesize(Result::class); + $result->iterateAssociative()->willReturn(new EmptyIterator()); + + $connection->executeQuery( + 'SELECT * FROM eventstore WHERE (aggregate = :aggregate) AND (aggregate_id = :id) AND (playhead > :playhead) AND (archived = :archived) AND (id > :index) ORDER BY id ASC', + [ + 'aggregate' => 'profile', + 'id' => '1', + 'playhead' => 0, + 'archived' => false, + 'index' => 1, + ], + Argument::type('array'), + )->willReturn($result->reveal()); + + $abstractPlatform = $this->prophesize(AbstractPlatform::class); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + + $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); + $queryBuilder = new QueryBuilder($connection->reveal()); + $connection->createQueryBuilder()->willReturn($queryBuilder); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + + $doctrineDbalStore = new DoctrineDbalStore( + $connection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + 'eventstore', + ); + + $stream = $doctrineDbalStore->load( + (new CriteriaBuilder()) + ->aggregateName('profile') + ->aggregateId('1') + ->fromPlayhead(0) + ->archived(false) + ->fromIndex(1) + ->build() ); self::assertSame(null, $stream->index()); From 459d0258633d4825de1319d48e94bb39e851cf92 Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 27 Mar 2024 15:28:18 +0100 Subject: [PATCH 6/9] Update baseline --- baseline.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/baseline.xml b/baseline.xml index 6ab928c76..776a81ea3 100644 --- a/baseline.xml +++ b/baseline.xml @@ -297,6 +297,8 @@ reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> From 02bbf4b6b34dfbd4c19836cbec7527b550cb6815 Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 27 Mar 2024 15:30:41 +0100 Subject: [PATCH 7/9] Skip test for lower dbal versions --- tests/Unit/Store/DoctrineDbalStoreTest.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/Unit/Store/DoctrineDbalStoreTest.php b/tests/Unit/Store/DoctrineDbalStoreTest.php index 2770f6d75..b5bea3676 100644 --- a/tests/Unit/Store/DoctrineDbalStoreTest.php +++ b/tests/Unit/Store/DoctrineDbalStoreTest.php @@ -36,6 +36,7 @@ use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileEmailChanged; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use PDO; +use PHPUnit\Framework\Attributes\RequiresMethod; use PHPUnit\Framework\Attributes\RequiresPhp; use PHPUnit\Framework\TestCase; use Prophecy\Argument; @@ -143,6 +144,7 @@ public function testLoadWithLimit(): void self::assertSame(null, $stream->position()); } + #[RequiresMethod(AbstractPlatform::class, 'supportsLimitOffset')] public function testLoadWithOffset(): void { $connection = $this->prophesize(Connection::class); @@ -233,7 +235,7 @@ public function testLoadWithIndex(): void ->fromPlayhead(0) ->archived(false) ->fromIndex(1) - ->build() + ->build(), ); self::assertSame(null, $stream->index()); From 03beffc7abcdaba5cc931fd7c5c0fb5db1943efc Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Wed, 27 Mar 2024 15:32:40 +0100 Subject: [PATCH 8/9] Remove html generation --- infection.json.dist | 1 - 1 file changed, 1 deletion(-) diff --git a/infection.json.dist b/infection.json.dist index afd5cc1d7..eef4a6bef 100644 --- a/infection.json.dist +++ b/infection.json.dist @@ -6,7 +6,6 @@ }, "logs": { "text": "infection.log", - "html": "infection.html", "stryker": { "report": "3.0.x" } From 300b010c6e99daf5e3bb715bc33ab6fdd9ef9000 Mon Sep 17 00:00:00 2001 From: Daniel Badura Date: Thu, 28 Mar 2024 13:01:03 +0100 Subject: [PATCH 9/9] Skip tests for lower dbal versions --- tests/Unit/Store/DoctrineDbalStoreTest.php | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/Unit/Store/DoctrineDbalStoreTest.php b/tests/Unit/Store/DoctrineDbalStoreTest.php index b5bea3676..19848e68c 100644 --- a/tests/Unit/Store/DoctrineDbalStoreTest.php +++ b/tests/Unit/Store/DoctrineDbalStoreTest.php @@ -36,13 +36,13 @@ use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileEmailChanged; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use PDO; -use PHPUnit\Framework\Attributes\RequiresMethod; use PHPUnit\Framework\Attributes\RequiresPhp; use PHPUnit\Framework\TestCase; use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; use function iterator_to_array; +use function method_exists; /** @covers \Patchlevel\EventSourcing\Store\DoctrineDbalStore */ final class DoctrineDbalStoreTest extends TestCase @@ -144,9 +144,12 @@ public function testLoadWithLimit(): void self::assertSame(null, $stream->position()); } - #[RequiresMethod(AbstractPlatform::class, 'supportsLimitOffset')] public function testLoadWithOffset(): void { + if (method_exists(AbstractPlatform::class, 'supportsLimitOffset')) { + $this->markTestSkipped('In older DBAL versions platforms did not need to support this'); + } + $connection = $this->prophesize(Connection::class); $result = $this->prophesize(Result::class); $result->iterateAssociative()->willReturn(new EmptyIterator());