From 71689b0d092e9798c49257fd4a818bf6e617d209 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 17 Dec 2023 11:16:01 +0100 Subject: [PATCH] bulk insert --- Dockerfile | 23 +++++++++ src/Store/DoctrineDbalStore.php | 85 +++++++++++++++++++++++++-------- tests/Benchmark/blackfire.php | 54 +++++++++++++++++++++ 3 files changed, 142 insertions(+), 20 deletions(-) create mode 100644 Dockerfile create mode 100644 tests/Benchmark/blackfire.php diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..1b956b370 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +FROM php:8.3 + +ARG EXTENSIONS="pcntl zip intl bcmath" + +RUN apt-get update && apt-get install -y \ + git \ + zip \ + unzip \ + curl \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=composer /usr/bin/composer /usr/bin/composer + +ADD https://github.com/mlocati/docker-php-extension-installer/releases/latest/download/install-php-extensions /usr/local/bin/ + +RUN chmod +x /usr/local/bin/install-php-extensions && \ + install-php-extensions $EXTENSIONS + +RUN mkdir -p /tmp/blackfire \ + && architecture=$(uname -m) \ + && curl -A "Docker" -L https://blackfire.io/api/v1/releases/cli/linux/$architecture | tar zxp -C /tmp/blackfire \ + && mv /tmp/blackfire/blackfire /usr/bin/blackfire \ + && rm -Rf /tmp/blackfire \ No newline at end of file diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 2deb641dc..e91a22c4b 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -8,6 +8,7 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\Schema; +use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; use Patchlevel\EventSourcing\Aggregate\AggregateRoot; use Patchlevel\EventSourcing\EventBus\HeaderNotFound; @@ -16,6 +17,9 @@ use Patchlevel\EventSourcing\Schema\SchemaConfigurator; use Patchlevel\EventSourcing\Serializer\EventSerializer; +use function array_fill; +use function count; +use function implode; use function is_int; use function is_string; use function sprintf; @@ -27,6 +31,7 @@ public function __construct( private readonly EventSerializer $serializer, private readonly AggregateRootRegistry $aggregateRootRegistry, private readonly string $storeTableName = 'eventstore', + private readonly int $batch = 1000, ) { } @@ -114,38 +119,78 @@ public function save(Message ...$messages): void { $this->connection->transactional( function (Connection $connection) use ($messages): void { + $jsonType = Type::getType(Types::JSON); + $booleanType = Type::getType(Types::BOOLEAN); + $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); + + $placeholders = []; + + $columns = [ + 'aggregate', + 'aggregate_id', + 'playhead', + 'event', + 'payload', + 'recorded_on', + 'new_stream_start', + 'archived', + 'custom_headers', + ]; + + $placeholder = implode(', ', array_fill(0, count($columns), '?')); + foreach ($messages as $message) { $data = $this->serializer->serialize($message->event()); try { - $connection->insert( - $this->storeTableName, - [ - 'aggregate' => $this->aggregateRootRegistry->aggregateName($message->aggregateClass()), - 'aggregate_id' => $message->aggregateId(), - 'playhead' => $message->playhead(), - 'event' => $data->name, - 'payload' => $data->payload, - 'recorded_on' => $message->recordedOn(), - 'new_stream_start' => $message->newStreamStart(), - 'archived' => $message->archived(), - 'custom_headers' => $message->customHeaders(), - ], - [ - 'recorded_on' => Types::DATETIMETZ_IMMUTABLE, - 'custom_headers' => Types::JSON, - 'new_stream_start' => Types::BOOLEAN, - 'archived' => Types::BOOLEAN, - ], - ); + $parameters[] = $this->aggregateRootRegistry->aggregateName($message->aggregateClass()); + $parameters[] = $message->aggregateId(); + $parameters[] = $message->playhead(); + $parameters[] = $data->name; + $parameters[] = $data->payload; + $parameters[] = $dateTimeType->convertToDatabaseValue($message->recordedOn(), $connection->getDatabasePlatform()); + $parameters[] = $booleanType->convertToDatabaseValue($message->newStreamStart(), $connection->getDatabasePlatform()); + $parameters[] = $booleanType->convertToDatabaseValue($message->archived(), $connection->getDatabasePlatform()); + $parameters[] = $jsonType->convertToDatabaseValue($message->customHeaders(), $connection->getDatabasePlatform()); } catch (HeaderNotFound $e) { throw new MissingDataForStorage($e->name, $e); } + + $placeholders[] = $placeholder; } + + if ($parameters === []) { + return; + } + + $query = sprintf( + "INSERT INTO %s (%s) VALUES\n(%s)", + $this->storeTableName, + implode(', ', $columns), + implode("),\n(", $placeholders), + ); + + $connection->executeStatement($query, $parameters); }, ); } + /** + * @param string[] $fields + * + * @return string[] + */ + private function placeholder(array $fields): array + { + $placeholders = []; + + foreach ($fields as $field) { + $placeholders[] = ':' . $field; + } + + return $placeholders; + } + /** * @param Closure():ClosureReturn $function * diff --git a/tests/Benchmark/blackfire.php b/tests/Benchmark/blackfire.php new file mode 100644 index 000000000..db4216d9a --- /dev/null +++ b/tests/Benchmark/blackfire.php @@ -0,0 +1,54 @@ + Driver::class, + 'path' => DB_PATH, +]); + +$bus = new DefaultEventBus(); + +$store = new DoctrineDbalStore( + $connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), + (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/BasicImplementation/Aggregate']), + 'eventstore', +); + +$repository = new DefaultRepository($store, $bus, Profile::metadata()); + +$schemaDirector = new DoctrineSchemaDirector( + $connection, + $store, +); + +$schemaDirector->create(); + +$id = ProfileId::generate(); +$profile = Profile::create($id, 'Peter'); + +for ($i = 0; $i < 10_000; $i++) { + $profile->changeName('Peter ' . $i); +} + +$repository->save($profile);