Skip to content

Commit

Permalink
introduce event id in stream store & fix split stream logic
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 13, 2024
1 parent da59e4c commit f9729b3
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 108 deletions.
2 changes: 2 additions & 0 deletions src/Metadata/Message/MessageHeaderRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
Expand Down Expand Up @@ -81,6 +82,7 @@ public static function createWithInternalHeaders(array $headerNameToClassMap = [
'aggregate' => AggregateHeader::class,
'archived' => ArchivedHeader::class,
'newStreamStart' => StreamStartHeader::class,
'eventId' => EventIdHeader::class,
];

return new self($headerNameToClassMap + $internalHeaders);
Expand Down
17 changes: 17 additions & 0 deletions src/Store/Header/EventIdHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

/**
* @psalm-immutable
* @experimental
*/
final class EventIdHeader
{
public function __construct(
public readonly string $eventId,
) {
}
}
52 changes: 33 additions & 19 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use PDO;
use Psr\Clock\ClockInterface;
use Ramsey\Uuid\Uuid;

use function array_fill;
use function array_filter;
Expand Down Expand Up @@ -192,17 +194,18 @@ public function save(Message ...$messages): void

$this->transactional(
function () use ($messages): void {
/** @var array<string, int> $achievedUntilPlayhead */
$achievedUntilPlayhead = [];
/** @var array<string, int> $achievedUntilEventId */
$achievedUntilEventId = [];

$booleanType = Type::getType(Types::BOOLEAN);
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);

$columns = [
'stream',
'playhead',
'event',
'payload',
'event_id',
'event_name',
'event_payload',
'recorded_on',
'new_stream_start',
'archived',
Expand Down Expand Up @@ -232,13 +235,19 @@ function () use ($messages): void {
throw new MissingDataForStorage($e->name, $e);
}

$playhead = null;

if ($message->hasHeader(PlayheadHeader::class)) {
$playhead = $message->header(PlayheadHeader::class)->playhead;
$parameters[] = $message->header(PlayheadHeader::class)->playhead;
} else {
$parameters[] = null;
}

if ($message->hasHeader(EventIdHeader::class)) {
$eventId = $message->header(EventIdHeader::class)->eventId;
} else {
$eventId = Uuid::uuid7()->toString();
}

$parameters[] = $playhead;
$parameters[] = $eventId;
$parameters[] = $data->name;
$parameters[] = $data->payload;

Expand All @@ -248,19 +257,19 @@ function () use ($messages): void {
$parameters[] = $this->clock->now();
}

$types[$offset + 4] = $dateTimeType;
$types[$offset + 5] = $dateTimeType;

$streamStart = $message->hasHeader(StreamStartHeader::class);

if ($streamStart && $playhead) {
$achievedUntilPlayhead[$streamName] = $playhead;
if ($streamStart) {
$achievedUntilEventId[$streamName] = $eventId;
}

$parameters[] = $streamStart;
$types[$offset + 5] = $booleanType;
$types[$offset + 6] = $booleanType;

$parameters[] = $message->hasHeader(ArchivedHeader::class);
$types[$offset + 6] = $booleanType;
$types[$offset + 7] = $booleanType;

$parameters[] = $this->headersSerializer->serialize($this->getCustomHeaders($message));

Expand All @@ -283,21 +292,21 @@ function () use ($messages): void {
$this->executeSave($columns, $placeholders, $parameters, $types, $this->connection);
}

foreach ($achievedUntilPlayhead as $stream => $playhead) {
foreach ($achievedUntilEventId as $stream => $eventId) {
$this->connection->executeStatement(
sprintf(
<<<'SQL'
UPDATE %s
UPDATE %1$s
SET archived = true
WHERE stream = :stream
AND playhead < :playhead
AND id < (SELECT id FROM %1$s WHERE event_id = :event_id)
AND archived = false
SQL,
$this->config['table_name'],
),
[
'stream' => $stream,
'playhead' => $playhead,
'event_id' => $eventId,
],
);
}
Expand Down Expand Up @@ -366,10 +375,13 @@ public function configureSchema(Schema $schema, Connection $connection): void
->setNotnull(true);
$table->addColumn('playhead', Types::INTEGER)
->setNotnull(false);
$table->addColumn('event', Types::STRING)
$table->addColumn('event_id', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('event_name', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('payload', Types::JSON)
$table->addColumn('event_payload', Types::JSON)
->setNotnull(true);
$table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE)
->setNotnull(true);
Expand All @@ -383,6 +395,7 @@ public function configureSchema(Schema $schema, Connection $connection): void
->setNotnull(true);

$table->setPrimaryKey(['id']);
$table->addUniqueIndex(['event_id']);
$table->addUniqueIndex(['stream', 'playhead']);
$table->addIndex(['stream', 'playhead', 'archived']);
}
Expand All @@ -392,6 +405,7 @@ private function getCustomHeaders(Message $message): array
{
$filteredHeaders = [
StreamNameHeader::class,
EventIdHeader::class,
PlayheadHeader::class,
RecordedOnHeader::class,
StreamStartHeader::class,
Expand Down
8 changes: 5 additions & 3 deletions src/Store/StreamDoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
Expand Down Expand Up @@ -120,7 +121,7 @@ private function buildGenerator(
/** @var DateTimeTzImmutableType $dateTimeType */
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);

/** @var array{id: positive-int, stream: string, playhead: int|string|null, event: string, payload: string, recorded_on: string, archived: int|string, new_stream_start: int|string, custom_headers: string} $data */
/** @var array{id: positive-int, stream: string, playhead: int|string|null, event_id: string, event_name: string, event_payload: string, recorded_on: string, archived: int|string, new_stream_start: int|string, custom_headers: string} $data */
foreach ($result->iterateAssociative() as $data) {
if ($this->position === null) {
$this->position = 0;
Expand All @@ -129,11 +130,12 @@ private function buildGenerator(
}

$this->index = $data['id'];
$event = $eventSerializer->deserialize(new SerializedEvent($data['event'], $data['payload']));
$event = $eventSerializer->deserialize(new SerializedEvent($data['event_name'], $data['event_payload']));

$message = Message::create($event)
->withHeader(new StreamNameHeader($data['stream']))
->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform)));
->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform)))
->withHeader(new EventIdHeader($data['event_id']));

if ($data['playhead'] !== null) {
$message = $message->withHeader(new PlayheadHeader((int)$data['playhead']));
Expand Down
104 changes: 90 additions & 14 deletions tests/Integration/Store/StreamDoctrineDbalStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Patchlevel\EventSourcing\Store\StreamStore;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Patchlevel\EventSourcing\Tests\DbalManager;
Expand Down Expand Up @@ -85,21 +87,21 @@ public function testSave(): void
self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1['stream']);
self::assertEquals('1', $result1['playhead']);
self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']);
self::assertEquals('profile.created', $result1['event']);
self::assertEquals('profile.created', $result1['event_name']);
self::assertEquals(
['profileId' => $profileId->toString(), 'name' => 'test'],
json_decode($result1['payload'], true),
json_decode($result1['event_payload'], true),
);

$result2 = $result[1];

self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2['stream']);
self::assertEquals('2', $result2['playhead']);
self::assertStringContainsString('2020-01-02 00:00:00', $result2['recorded_on']);
self::assertEquals('profile.created', $result2['event']);
self::assertEquals('profile.created', $result2['event_name']);
self::assertEquals(
['profileId' => $profileId->toString(), 'name' => 'test'],
json_decode($result2['payload'], true),
json_decode($result2['event_payload'], true),
);
}

Expand All @@ -124,21 +126,21 @@ public function testSaveWithOnlyStreamName(): void
self::assertEquals('extern', $result1['stream']);
self::assertEquals(null, $result1['playhead']);
self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']);
self::assertEquals('extern', $result1['event']);
self::assertEquals('extern', $result1['event_name']);
self::assertEquals(
['message' => 'test 1'],
json_decode($result1['payload'], true),
json_decode($result1['event_payload'], true),
);

$result2 = $result[1];

self::assertEquals('extern', $result2['stream']);
self::assertEquals(null, $result2['playhead']);
self::assertStringContainsString('2020-01-01 00:00:00', $result2['recorded_on']);
self::assertEquals('extern', $result2['event']);
self::assertEquals('extern', $result2['event_name']);
self::assertEquals(
['message' => 'test 2'],
json_decode($result2['payload'], true),
json_decode($result2['event_payload'], true),
);
}

Expand Down Expand Up @@ -171,25 +173,79 @@ public function testSaveWithTransactional(): void
self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1['stream']);
self::assertEquals('1', $result1['playhead']);
self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']);
self::assertEquals('profile.created', $result1['event']);
self::assertEquals('profile.created', $result1['event_name']);
self::assertEquals(
['profileId' => $profileId->toString(), 'name' => 'test'],
json_decode($result1['payload'], true),
json_decode($result1['event_payload'], true),
);

$result2 = $result[1];

self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2['stream']);
self::assertEquals('2', $result2['playhead']);
self::assertStringContainsString('2020-01-02 00:00:00', $result2['recorded_on']);
self::assertEquals('profile.created', $result2['event']);
self::assertEquals('profile.created', $result2['event_name']);
self::assertEquals(
['profileId' => $profileId->toString(), 'name' => 'test'],
json_decode($result2['payload'], true),
json_decode($result2['event_payload'], true),
);
}

public function testUniqueConstraint(): void
public function testSplitStream(): void
{
$profileId = ProfileId::generate();

$messages = [
Message::create(new ProfileCreated($profileId, 'test'))
->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString())))
->withHeader(new PlayheadHeader(1))
->withHeader(new EventIdHeader('1'))
->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))),
Message::create(new ProfileCreated($profileId, 'test'))
->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString())))
->withHeader(new PlayheadHeader(2))
->withHeader(new EventIdHeader('2'))
->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-02 00:00:00')))
->withHeader(new StreamStartHeader()),
];

$this->store->save(...$messages);

/** @var list<array<string, string>> $result */
$result = $this->connection->fetchAllAssociative('SELECT * FROM event_store ORDER BY id');

self::assertCount(2, $result);

$result1 = $result[0];

self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result1['stream']);
self::assertEquals('1', $result1['playhead']);
self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']);
self::assertEquals('profile.created', $result1['event_name']);
self::assertEquals(
['profileId' => $profileId->toString(), 'name' => 'test'],
json_decode($result1['event_payload'], true),
);

self::assertEquals('1', $result1['archived']);
self::assertEquals('0', $result1['new_stream_start']);

$result2 = $result[1];

self::assertEquals(sprintf('profile-%s', $profileId->toString()), $result2['stream']);
self::assertEquals('2', $result2['playhead']);
self::assertStringContainsString('2020-01-02 00:00:00', $result2['recorded_on']);
self::assertEquals('profile.created', $result2['event_name']);
self::assertEquals(
['profileId' => $profileId->toString(), 'name' => 'test'],
json_decode($result2['event_payload'], true),
);

self::assertEquals('0', $result2['archived']);
self::assertEquals('1', $result2['new_stream_start']);
}

public function testUniqueStreamNameAndPlayheadConstraint(): void
{
$this->expectException(UniqueConstraintViolation::class);

Expand All @@ -209,6 +265,26 @@ public function testUniqueConstraint(): void
$this->store->save(...$messages);
}

public function testUniqueEventIdConstraint(): void
{
$this->expectException(UniqueConstraintViolation::class);

$profileId = ProfileId::generate();

$messages = [
Message::create(new ProfileCreated($profileId, 'test'))
->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString())))
->withHeader(new EventIdHeader('1'))
->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))),
Message::create(new ProfileCreated($profileId, 'test'))
->withHeader(new StreamNameHeader(sprintf('profile-%s', $profileId->toString())))
->withHeader(new EventIdHeader('1'))
->withHeader(new RecordedOnHeader(new DateTimeImmutable('2020-01-01 00:00:00'))),
];

$this->store->save(...$messages);
}

public function testSave10000Messages(): void
{
$profileId = ProfileId::generate();
Expand Down Expand Up @@ -331,7 +407,7 @@ public function testStreams(): void
], $streams);
}

public function testRemote(): void
public function testRemove(): void
{
$profileId = ProfileId::fromString('0190e47e-77e9-7b90-bf62-08bbf0ab9b4b');

Expand Down
Loading

0 comments on commit f9729b3

Please sign in to comment.