From 6223042946a3fae35f63af54398ffdf7abf940b8 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 3 Mar 2024 12:31:08 +0100 Subject: [PATCH 1/2] save 10000 events --- tests/Integration/Store/StoreTest.php | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/Integration/Store/StoreTest.php b/tests/Integration/Store/StoreTest.php index 5fbc6541b..dc8b6e481 100644 --- a/tests/Integration/Store/StoreTest.php +++ b/tests/Integration/Store/StoreTest.php @@ -87,6 +87,26 @@ public function testSave(): void self::assertEquals(['profileId' => 'test', 'name' => 'test'], json_decode($result1['payload'], true)); } + public function testSave10000Messages(): void + { + $messages = []; + + for ($i = 1; $i <= 10000; $i++) { + $messages[] = Message::create(new ProfileCreated(ProfileId::fromString('test'), 'test')) + ->withAggregateName('profile') + ->withAggregateId('test') + ->withPlayhead($i) + ->withRecordedOn(new DateTimeImmutable('2020-01-01 00:00:00')); + } + + $this->store->save(...$messages); + + /** @var int $result */ + $result = $this->connection->fetchFirstColumn('SELECT COUNT(*) FROM eventstore')[0]; + + self::assertEquals(10000, $result); + } + public function testLoad(): void { $message = Message::create(new ProfileCreated(ProfileId::fromString('test'), 'test')) From 2671b3274d5a38a0be9338b3af55a7fe2bff3091 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 3 Mar 2024 12:39:58 +0100 Subject: [PATCH 2/2] fix max batch size --- src/Store/DoctrineDbalStore.php | 47 +++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index c81bb3f93..1112ae09a 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -18,6 +18,7 @@ use function array_fill; use function count; +use function floor; use function implode; use function is_int; use function is_string; @@ -25,6 +26,11 @@ final class DoctrineDbalStore implements Store, ArchivableStore, SchemaConfigurator { + /** + * PostgreSQL has a limit of 65535 parameters in a single query. + */ + private const MAX_UNSIGNED_SMALL_INT = 65_535; + public function __construct( private readonly Connection $connection, private readonly EventSerializer $serializer, @@ -123,12 +129,6 @@ function (Connection $connection) use ($messages): void { $jsonType = Type::getType(Types::JSON); $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); - $parameters = []; - $placeholders = []; - - /** @var array, Type> $types */ - $types = []; - $columns = [ 'aggregate', 'aggregate_id', @@ -142,11 +142,17 @@ function (Connection $connection) use ($messages): void { ]; $columnsLength = count($columns); + $batchSize = (int)floor(self::MAX_UNSIGNED_SMALL_INT / $columnsLength); $placeholder = implode(', ', array_fill(0, $columnsLength, '?')); - foreach ($messages as $position => $message) { + $parameters = []; + $placeholders = []; + /** @var array, Type> $types */ + $types = []; + $position = 0; + foreach ($messages as $message) { /** @var int<0, max> $offset */ - $offset = (int)$position * $columnsLength; + $offset = $position * $columnsLength; $placeholders[] = $placeholder; $data = $this->serializer->serialize($message->event()); @@ -184,6 +190,31 @@ function (Connection $connection) use ($messages): void { $parameters[] = $message->customHeaders(); $types[$offset + 8] = $jsonType; + + $position++; + + if ($position !== $batchSize) { + continue; + } + + $query = sprintf( + "INSERT INTO %s (%s) VALUES\n(%s)", + $this->storeTableName, + implode(', ', $columns), + implode("),\n(", $placeholders), + ); + + $connection->executeStatement($query, $parameters, $types); + + $parameters = []; + $placeholders = []; + $types = []; + + $position = 0; + } + + if ($position === 0) { + return; } $query = sprintf(