Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance by inserting in a bulk #444

Merged
merged 4 commits into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM php:8.3

ARG EXTENSIONS="pcntl zip intl bcmath"

RUN apt-get update && apt-get install -y \
git \
zip \
unzip \
curl \
&& rm -rf /var/lib/apt/lists/*

COPY --from=composer /usr/bin/composer /usr/bin/composer

ADD https://github.com/mlocati/docker-php-extension-installer/releases/latest/download/install-php-extensions /usr/local/bin/

RUN chmod +x /usr/local/bin/install-php-extensions && \
install-php-extensions $EXTENSIONS

RUN mkdir -p /tmp/blackfire \
&& architecture=$(uname -m) \
&& curl -A "Docker" -L https://blackfire.io/api/v1/releases/cli/linux/$architecture | tar zxp -C /tmp/blackfire \
&& mv /tmp/blackfire/blackfire /usr/bin/blackfire \
&& rm -Rf /tmp/blackfire
85 changes: 64 additions & 21 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\EventBus\HeaderNotFound;
Expand All @@ -16,6 +17,9 @@
use Patchlevel\EventSourcing\Schema\SchemaConfigurator;
use Patchlevel\EventSourcing\Serializer\EventSerializer;

use function array_fill;
use function count;
use function implode;
use function is_int;
use function is_string;
use function sprintf;
Expand Down Expand Up @@ -112,36 +116,74 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void

public function save(Message ...$messages): void
{
if ($messages === []) {
return;
}

$this->connection->transactional(
function (Connection $connection) use ($messages): void {
foreach ($messages as $message) {
$booleanType = Type::getType(Types::BOOLEAN);
$jsonType = Type::getType(Types::JSON);
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);

$parameters = [];
$placeholders = [];

/** @var array<int, Type> $types */
$types = [];

$columns = [
'aggregate',
'aggregate_id',
'playhead',
'event',
'payload',
'recorded_on',
'new_stream_start',
'archived',
'custom_headers',
];

$columnsLength = count($columns);
$placeholder = implode(', ', array_fill(0, $columnsLength, '?'));

foreach ($messages as $position => $message) {
$offset = (int)$position * $columnsLength;
$placeholders[] = $placeholder;

$data = $this->serializer->serialize($message->event());

try {
$connection->insert(
$this->storeTableName,
[
'aggregate' => $this->aggregateRootRegistry->aggregateName($message->aggregateClass()),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
'event' => $data->name,
'payload' => $data->payload,
'recorded_on' => $message->recordedOn(),
'new_stream_start' => $message->newStreamStart(),
'archived' => $message->archived(),
'custom_headers' => $message->customHeaders(),
],
[
'recorded_on' => Types::DATETIMETZ_IMMUTABLE,
'custom_headers' => Types::JSON,
'new_stream_start' => Types::BOOLEAN,
'archived' => Types::BOOLEAN,
],
);
$parameters[] = $this->aggregateRootRegistry->aggregateName($message->aggregateClass());
$parameters[] = $message->aggregateId();
$parameters[] = $message->playhead();
$parameters[] = $data->name;
$parameters[] = $data->payload;

$parameters[] = $message->recordedOn();
$types[$offset + 5] = $dateTimeType;

$parameters[] = $message->newStreamStart();
$types[$offset + 6] = $booleanType;

$parameters[] = $message->archived();
$types[$offset + 7] = $booleanType;

$parameters[] = $message->customHeaders();
$types[$offset + 8] = $jsonType;
} catch (HeaderNotFound $e) {
throw new MissingDataForStorage($e->name, $e);
}
}

$query = sprintf(
"INSERT INTO %s (%s) VALUES\n(%s)",
$this->storeTableName,
implode(', ', $columns),
implode("),\n(", $placeholders),
);

$connection->executeStatement($query, $parameters, $types);
},
);
}
Expand Down Expand Up @@ -170,6 +212,7 @@ public function archiveMessages(string $aggregate, string $id, int $untilPlayhea
AND archived = false',
$this->storeTableName,
));

$statement->bindValue('aggregate', $aggregateName);
$statement->bindValue('aggregate_id', $id);
$statement->bindValue('playhead', $untilPlayhead);
Expand Down
54 changes: 54 additions & 0 deletions tests/Benchmark/blackfire.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

declare(strict_types=1);

use Doctrine\DBAL\Driver\PDO\SQLite\Driver;
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Aggregate\Profile;
use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\ProfileId;

require_once __DIR__ . '/../../vendor/autoload.php';

const DB_PATH = __DIR__ . '/BasicImplementation/data/db.sqlite3';

if (file_exists(DB_PATH)) {
unlink(DB_PATH);
}

$connection = DriverManager::getConnection([
'driverClass' => Driver::class,
'path' => DB_PATH,
]);

$bus = new DefaultEventBus();

$store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/BasicImplementation/Aggregate']),
'eventstore',
);

$repository = new DefaultRepository($store, $bus, Profile::metadata());

$schemaDirector = new DoctrineSchemaDirector(
$connection,
$store,
);

$schemaDirector->create();

$id = ProfileId::generate();
$profile = Profile::create($id, 'Peter');

for ($i = 0; $i < 10_000; $i++) {
$profile->changeName('Peter ' . $i);
}

$repository->save($profile);
28 changes: 8 additions & 20 deletions tests/Unit/Store/DoctrineDbalStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Statement;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use EmptyIterator;
use Patchlevel\EventSourcing\EventBus\Message;
Expand Down Expand Up @@ -191,27 +192,14 @@ public function testSaveWithOneEvent(): void

$innerMockedConnection = $this->prophesize(Connection::class);

$platform = $this->prophesize(AbstractPlatform::class);
$innerMockedConnection->getDatabasePlatform()->willReturn($platform->reveal());

$innerMockedConnection->insert(
'eventstore',
[
'aggregate' => 'profile',
'aggregate_id' => '1',
'playhead' => 1,
'event' => 'profile_created',
'payload' => '',
'recorded_on' => $recordedOn,
'new_stream_start' => false,
'archived' => false,
'custom_headers' => [],
],
$innerMockedConnection->executeStatement(
"INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?)",
['profile', '1', 1, 'profile_created', '', $recordedOn, false, false, []],
[
'recorded_on' => 'datetimetz_immutable',
'custom_headers' => 'json',
'archived' => Types::BOOLEAN,
'new_stream_start' => Types::BOOLEAN,
5 => Type::getType(Types::DATETIMETZ_IMMUTABLE),
6 => Type::getType(Types::BOOLEAN),
7 => Type::getType(Types::BOOLEAN),
8 => Type::getType(Types::JSON),
],
)->shouldBeCalledOnce();

Expand Down
Loading