Skip to content

Commit

Permalink
Merge pull request #656 from patchlevel/split-stream-header
Browse files Browse the repository at this point in the history
[Experimental] split stream header into 3 separate headers
  • Loading branch information
DavidBadura authored Dec 16, 2024
2 parents 413683a + c4de8b6 commit 7e1d0d6
Show file tree
Hide file tree
Showing 22 changed files with 353 additions and 315 deletions.
3 changes: 1 addition & 2 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
</file>
<file src="src/Store/StreamDoctrineDbalStoreStream.php">
<ArgumentTypeCoercion>
<code><![CDATA[$data['playhead'] === null ? null : (int)$data['playhead']]]></code>
<code><![CDATA[(int)$data['playhead']]]></code>
</ArgumentTypeCoercion>
</file>
<file src="src/Subscription/Engine/DefaultSubscriptionEngine.php">
Expand Down Expand Up @@ -371,7 +371,6 @@
</file>
<file src="tests/Unit/Subscription/Engine/SubscriptionManagerTest.php">
<InvalidArgument>
<code><![CDATA[$result]]></code>
<code><![CDATA[$subscriptions]]></code>
<code><![CDATA[$subscriptions]]></code>
</InvalidArgument>
Expand Down
2 changes: 1 addition & 1 deletion phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ parameters:
path: src/Store/InMemoryStore.php

-
message: "#^Parameter \\#2 \\$playhead of class Patchlevel\\\\EventSourcing\\\\Store\\\\StreamHeader constructor expects int\\<1, max\\>\\|null, int\\|null given\\.$#"
message: "#^Parameter \\#1 \\$playhead of class Patchlevel\\\\EventSourcing\\\\Store\\\\Header\\\\PlayheadHeader constructor expects int\\<1, max\\>, int given\\.$#"
count: 1
path: src/Store/StreamDoctrineDbalStoreStream.php

Expand Down
49 changes: 33 additions & 16 deletions src/Console/OutputStyle.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
namespace Patchlevel\EventSourcing\Console;

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Serializer\Encoder\Encoder;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Symfony\Component\Console\Style\SymfonyStyle;
use Throwable;
Expand Down Expand Up @@ -49,13 +50,38 @@ public function message(

$customHeaders = array_filter(
$message->headers(),
static fn ($header) => !$header instanceof StreamHeader
static fn ($header) => !$header instanceof StreamNameHeader
&& !$header instanceof PlayheadHeader
&& !$header instanceof RecordedOnHeader
&& !$header instanceof AggregateHeader
&& !$header instanceof ArchivedHeader
&& !$header instanceof StreamStartHeader,
);

$metaHeader = $this->metaHeader($message);
$streamName = null;
$playhead = null;
$recordedOn = null;

if ($message->hasHeader(AggregateHeader::class)) {
$header = $message->header(AggregateHeader::class);

$streamName = $header->streamName();
$playhead = $header->playhead;
$recordedOn = $header->recordedOn;
}

if ($message->hasHeader(StreamNameHeader::class)) {
$streamName = $message->header(StreamNameHeader::class)->streamName;
}

if ($message->hasHeader(PlayheadHeader::class)) {
$playhead = $message->header(PlayheadHeader::class)->playhead;
}

if ($message->hasHeader(RecordedOnHeader::class)) {
$recordedOn = $message->header(RecordedOnHeader::class)->recordedOn;
}

$streamStart = $message->hasHeader(StreamStartHeader::class);
$achieved = $message->hasHeader(ArchivedHeader::class);

Expand All @@ -70,9 +96,9 @@ public function message(
],
[
[
$metaHeader instanceof AggregateHeader ? $metaHeader->streamName() : $metaHeader->streamName,
$metaHeader->playhead,
$metaHeader->recordedOn?->format('Y-m-d H:i:s'),
$streamName,
$playhead,
$recordedOn?->format('Y-m-d H:i:s'),
$streamStart ? 'yes' : 'no',
$achieved ? 'yes' : 'no',
],
Expand All @@ -98,13 +124,4 @@ public function throwable(Throwable $error): void
$error = $error->getPrevious();
} while ($error !== null);
}

private function metaHeader(Message $message): AggregateHeader|StreamHeader
{
try {
return $message->header(AggregateHeader::class);
} catch (HeaderNotFound) {
return $message->header(StreamHeader::class);
}
}
}
12 changes: 6 additions & 6 deletions src/Message/Translator/AggregateToStreamHeaderTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;

/** @experimental */
final class AggregateToStreamHeaderTranslator implements Translator
Expand All @@ -23,11 +25,9 @@ public function __invoke(Message $message): array
return [
$message
->removeHeader(AggregateHeader::class)
->withHeader(new StreamHeader(
$aggregateHeader->streamName(),
$aggregateHeader->playhead,
$aggregateHeader->recordedOn,
)),
->withHeader(new StreamNameHeader($aggregateHeader->streamName()))
->withHeader(new PlayheadHeader($aggregateHeader->playhead))
->withHeader(new RecordedOnHeader($aggregateHeader->recordedOn)),
];
}
}
44 changes: 16 additions & 28 deletions src/Message/Translator/RecalculatePlayheadTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;

use function array_key_exists;

Expand All @@ -19,42 +19,30 @@ final class RecalculatePlayheadTranslator implements Translator
/** @return list<Message> */
public function __invoke(Message $message): array
{
try {
$header = $message->header(AggregateHeader::class);
} catch (HeaderNotFound) {
try {
$header = $message->header(StreamHeader::class);
} catch (HeaderNotFound) {
return [$message];
}
}
if ($message->hasHeader(StreamNameHeader::class) && $message->hasHeader(PlayheadHeader::class)) {
$streamName = $message->header(StreamNameHeader::class)->streamName;

$stream = $header instanceof StreamHeader ? $header->streamName : $header->streamName();
$playhead = $this->nextPlayhead($streamName);

$playhead = $this->nextPlayhead($stream);

if ($header->playhead === $playhead) {
return [$message];
return [
$message->withHeader(new PlayheadHeader($playhead)),
];
}

if ($header instanceof StreamHeader) {
if ($message->hasHeader(AggregateHeader::class)) {
$header = $message->header(AggregateHeader::class);

return [
$message->withHeader(new StreamHeader(
$header->streamName,
$playhead,
$message->withHeader(new AggregateHeader(
$header->aggregateName,
$header->aggregateId,
$this->nextPlayhead($header->streamName()),
$header->recordedOn,
)),
];
}

return [
$message->withHeader(new AggregateHeader(
$header->aggregateName,
$header->aggregateId,
$playhead,
$header->recordedOn,
)),
];
return [$message];
}

public function reset(): void
Expand Down
25 changes: 14 additions & 11 deletions src/Message/Translator/UntilEventTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;

final class UntilEventTranslator implements Translator
{
Expand All @@ -20,22 +19,26 @@ public function __construct(
/** @return list<Message> */
public function __invoke(Message $message): array
{
try {
if ($message->hasHeader(AggregateHeader::class)) {
$header = $message->header(AggregateHeader::class);
} catch (HeaderNotFound) {
try {
$header = $message->header(StreamHeader::class);
} catch (HeaderNotFound) {

if ($header->recordedOn < $this->until) {
return [$message];
}

return [];
}

$recordedOn = $header->recordedOn;
if ($message->hasHeader(RecordedOnHeader::class)) {
$header = $message->header(RecordedOnHeader::class);

if ($header->recordedOn < $this->until) {
return [$message];
}

if ($recordedOn < $this->until) {
return [$message];
return [];
}

return [];
return [$message];
}
}
8 changes: 6 additions & 2 deletions src/Metadata/Message/MessageHeaderRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;

use function array_flip;
Expand Down Expand Up @@ -73,7 +75,9 @@ public function headerNames(): array
public static function createWithInternalHeaders(array $headerNameToClassMap = []): self
{
$internalHeaders = [
'stream' => StreamHeader::class,
'streamName' => StreamNameHeader::class,
'playhead' => PlayheadHeader::class,
'recordedOn' => RecordedOnHeader::class,
'aggregate' => AggregateHeader::class,
'archived' => ArchivedHeader::class,
'newStreamStart' => StreamStartHeader::class,
Expand Down
48 changes: 27 additions & 21 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid;
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamHeader;
use Patchlevel\EventSourcing\Store\StreamStore;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
Expand Down Expand Up @@ -142,11 +144,7 @@ public function load(AggregateRootId $id): AggregateRoot
}

if ($this->useStreamHeader) {
$playhead = $firstMessage->header(StreamHeader::class)->playhead;

if ($playhead === null) {
throw new AggregateNotFound($this->metadata->className, $id);
}
$playhead = $firstMessage->header(PlayheadHeader::class)->playhead;
} else {
$playhead = $firstMessage->header(AggregateHeader::class)->playhead;
}
Expand Down Expand Up @@ -245,27 +243,35 @@ public function save(AggregateRoot $aggregate): void

$aggregateName = $this->metadata->name;

$useStreamHeader = $this->useStreamHeader;
$streamName = $this->useStreamHeader ? StreamNameTranslator::streamName($aggregateName, $aggregateId) : null;

$messages = array_map(
static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock, $useStreamHeader) {
if ($useStreamHeader) {
$header = new StreamHeader(
StreamNameTranslator::streamName($aggregateName, $aggregateId),
++$playhead,
$clock->now(),
);
static function (object $event) use (
$aggregateName,
$aggregateId,
&$playhead,
$messageDecorator,
$clock,
$streamName,
) {
$message = Message::create($event);

if ($streamName !== null) {
$message = $message
->withHeader(new StreamNameHeader($streamName))
->withHeader(new PlayheadHeader(++$playhead))
->withHeader(new RecordedOnHeader($clock->now()));
} else {
$header = new AggregateHeader(
$aggregateName,
$aggregateId,
++$playhead,
$clock->now(),
$message = $message->withHeader(
new AggregateHeader(
$aggregateName,
$aggregateId,
++$playhead,
$clock->now(),
),
);
}

$message = Message::create($event)->withHeader($header);

if ($messageDecorator) {
return $messageDecorator($message);
}
Expand Down
18 changes: 18 additions & 0 deletions src/Store/Header/PlayheadHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

/**
* @psalm-immutable
* @experimental
*/
final class PlayheadHeader
{
/** @param positive-int $playhead */
public function __construct(
public readonly int $playhead,
) {
}
}
19 changes: 19 additions & 0 deletions src/Store/Header/RecordedOnHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

use DateTimeImmutable;

/**
* @psalm-immutable
* @experimental
*/
final class RecordedOnHeader
{
public function __construct(
public readonly DateTimeImmutable $recordedOn,
) {
}
}
Loading

0 comments on commit 7e1d0d6

Please sign in to comment.