Skip to content

Commit

Permalink
Update Message with headers
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielBadura committed Feb 7, 2024
1 parent 0b14347 commit 848c9c1
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 149 deletions.
2 changes: 1 addition & 1 deletion docs/pages/event_bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use Patchlevel\EventSourcing\EventBus\Message;

$message = Message::create(new NameChanged('foo'))
// ...
->withCustomHeader('application-id', 'app');
->withHeader('application-id', 'app');
```

!!! note
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/message_decorator.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ final class OnSystemRecordedDecorator implements MessageDecorator
{
public function __invoke(Message $message): Message
{
return $message->withCustomHeader('system', 'accounting_system');
return $message->withHeader('system', 'accounting_system');
}
}
```
Expand Down
6 changes: 1 addition & 5 deletions src/EventBus/Decorator/SplitStreamDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ public function __invoke(Message $message): Message
$event = $message->event();
$metadata = $this->eventMetadataFactory->metadata($event::class);

if ($metadata->splitStream) {
return $message->withNewStreamStart(true);
}

return $message;
return $message->withNewStreamStart($metadata->splitStream);
}
}
10 changes: 10 additions & 0 deletions src/EventBus/HeaderNotFound.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public static function recordedOn(): self
return new self(Message::HEADER_RECORDED_ON);
}

public static function newStreamStart(): self
{
return new self(Message::HEADER_NEW_STREAM_START);
}

public static function archived(): self
{
return new self(Message::HEADER_ARCHIVED);
}

public static function custom(string $name): self
{
return new self($name);
Expand Down
158 changes: 50 additions & 108 deletions src/EventBus/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
* playhead?: positive-int,
* recordedOn?: DateTimeImmutable,
* newStreamStart?: bool,
* archived?: bool
* archived?: bool,
* ...<string, mixed>
* }
*/
final class Message
Expand All @@ -29,16 +30,8 @@ final class Message
public const HEADER_ARCHIVED = 'archived';
public const HEADER_NEW_STREAM_START = 'newStreamStart';

private string|null $aggregateName = null;
private string|null $aggregateId = null;
/** @var positive-int|null */
private int|null $playhead = null;
private DateTimeImmutable|null $recordedOn = null;
private bool $newStreamStart = false;
private bool $archived = false;

/** @var array<string, mixed> */
private array $customHeaders = [];
/** @var Headers */
private array $headers = [];

Check failure on line 34 in src/EventBus/Message.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

PHPDoc tag @var for property Patchlevel\EventSourcing\EventBus\Message::$headers with type mixed is not subtype of native type array.

Check failure on line 34 in src/EventBus/Message.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Property Patchlevel\EventSourcing\EventBus\Message::$headers type has no value type specified in iterable type array.

/** @param T $event */
public function __construct(
Expand All @@ -58,6 +51,12 @@ public static function create(object $event): self
return new self($event);
}

/** @param Headers $headers */
public static function createWithHeaders(object $event, array $headers): self

Check failure on line 55 in src/EventBus/Message.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Method Patchlevel\EventSourcing\EventBus\Message::createWithHeaders() has parameter $headers with no value type specified in iterable type array.

Check failure on line 55 in src/EventBus/Message.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

PHPDoc tag @param for parameter $headers with type mixed is not subtype of native type array.
{
return self::create($event)->withHeaders($headers);
}

/** @return T */
public function event(): object
{
Expand All @@ -67,7 +66,7 @@ public function event(): object
/** @throws HeaderNotFound */
public function aggregateName(): string
{
$value = $this->aggregateName;
$value = $this->headers[self::HEADER_AGGREGATE_NAME] ?? null;

if ($value === null) {
throw HeaderNotFound::aggregateName();
Expand All @@ -78,16 +77,13 @@ public function aggregateName(): string

public function withAggregateName(string $value): self
{
$message = clone $this;
$message->aggregateName = $value;

return $message;
return $this->withHeader(self::HEADER_AGGREGATE_NAME, $value);
}

/** @throws HeaderNotFound */
public function aggregateId(): string
{
$value = $this->aggregateId;
$value = $this->headers[self::HEADER_AGGREGATE_ID] ?? null;

if ($value === null) {
throw HeaderNotFound::aggregateId();
Expand All @@ -98,10 +94,7 @@ public function aggregateId(): string

public function withAggregateId(string $value): self
{
$message = clone $this;
$message->aggregateId = $value;

return $message;
return $this->withHeader(self::HEADER_AGGREGATE_ID, $value);
}

/**
Expand All @@ -111,7 +104,7 @@ public function withAggregateId(string $value): self
*/
public function playhead(): int
{
$value = $this->playhead;
$value = $this->headers[self::HEADER_PLAYHEAD] ?? null;

if ($value === null) {
throw HeaderNotFound::playhead();
Expand All @@ -123,16 +116,13 @@ public function playhead(): int
/** @param positive-int $value */
public function withPlayhead(int $value): self
{
$message = clone $this;
$message->playhead = $value;

return $message;
return $this->withHeader(self::HEADER_PLAYHEAD, $value);
}

/** @throws HeaderNotFound */
public function recordedOn(): DateTimeImmutable
{
$value = $this->recordedOn;
$value = $this->headers[self::HEADER_RECORDED_ON] ?? null;

if ($value === null) {
throw HeaderNotFound::recordedOn();
Expand All @@ -143,136 +133,88 @@ public function recordedOn(): DateTimeImmutable

public function withRecordedOn(DateTimeImmutable $value): self
{
$message = clone $this;
$message->recordedOn = $value;

return $message;
return $this->withHeader(self::HEADER_RECORDED_ON, $value);
}

public function newStreamStart(): bool
{
return $this->newStreamStart;
$value = $this->headers[self::HEADER_NEW_STREAM_START] ?? null;

if ($value === null) {
throw HeaderNotFound::newStreamStart();
}

return $value;
}

public function withNewStreamStart(bool $value): self
{
$message = clone $this;
$message->newStreamStart = $value;

return $message;
return $this->withHeader(self::HEADER_NEW_STREAM_START, $value);
}

public function archived(): bool
{
return $this->archived;
$value = $this->headers[self::HEADER_ARCHIVED] ?? null;

if ($value === null) {
throw HeaderNotFound::archived();
}

return $value;
}

public function withArchived(bool $value): self
{
$message = clone $this;
$message->archived = $value;

return $message;
return $this->withHeader(self::HEADER_ARCHIVED, $value);
}

/** @throws HeaderNotFound */
public function customHeader(string $name): mixed
public function header(string $name): mixed
{
if (!array_key_exists($name, $this->customHeaders)) {
if (!array_key_exists($name, $this->headers)) {
throw HeaderNotFound::custom($name);
}

return $this->customHeaders[$name];
return $this->headers[$name];
}

public function withCustomHeader(string $name, mixed $value): self
public function withHeader(string $name, mixed $value): self
{
$message = clone $this;
$message->customHeaders[$name] = $value;
$message->headers[$name] = $value;

return $message;
}

/** @return array<string, mixed> */
public function customHeaders(): array
/** @return Headers */
public function headers(): array

Check failure on line 190 in src/EventBus/Message.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Method Patchlevel\EventSourcing\EventBus\Message::headers() return type has no value type specified in iterable type array.

Check failure on line 190 in src/EventBus/Message.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

PHPDoc tag @return with type mixed is not subtype of native type array.
{
return $this->customHeaders;
return $this->headers;
}

/** @param array<string, mixed> $headers */
public function withCustomHeaders(array $headers): self
/** @param Headers $headers */
public function withHeaders(array $headers): self

Check failure on line 196 in src/EventBus/Message.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Method Patchlevel\EventSourcing\EventBus\Message::withHeaders() has parameter $headers with no value type specified in iterable type array.

Check failure on line 196 in src/EventBus/Message.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

PHPDoc tag @param for parameter $headers with type mixed is not subtype of native type array.
{
$message = clone $this;
$message->customHeaders += $headers;
$message->headers += $headers;

return $message;
}

/** @return Headers */
public function headers(): array
{
$headers = $this->customHeaders;

if ($this->aggregateName !== null) {
$headers[self::HEADER_AGGREGATE_NAME] = $this->aggregateName;
}

if ($this->aggregateId !== null) {
$headers[self::HEADER_AGGREGATE_ID] = $this->aggregateId;
}

if ($this->playhead !== null) {
$headers[self::HEADER_PLAYHEAD] = $this->playhead;
}

if ($this->recordedOn !== null) {
$headers[self::HEADER_RECORDED_ON] = $this->recordedOn;
}

$headers[self::HEADER_NEW_STREAM_START] = $this->newStreamStart;
$headers[self::HEADER_ARCHIVED] = $this->archived;

return $headers;
}

/** @param Headers $headers */
public static function createWithHeaders(object $event, array $headers): self
/** @return array<string, mixed> */
public function customHeaders(): array
{
$message = self::create($event);

if (array_key_exists(self::HEADER_AGGREGATE_NAME, $headers)) {
$message = $message->withAggregateName($headers[self::HEADER_AGGREGATE_NAME]);
}

if (array_key_exists(self::HEADER_AGGREGATE_ID, $headers)) {
$message = $message->withAggregateId($headers[self::HEADER_AGGREGATE_ID]);
}

if (array_key_exists(self::HEADER_PLAYHEAD, $headers)) {
$message = $message->withPlayhead($headers[self::HEADER_PLAYHEAD]);
}

if (array_key_exists(self::HEADER_RECORDED_ON, $headers)) {
$message = $message->withRecordedOn($headers[self::HEADER_RECORDED_ON]);
}

if (array_key_exists(self::HEADER_NEW_STREAM_START, $headers)) {
$message = $message->withNewStreamStart($headers[self::HEADER_NEW_STREAM_START]);
}

if (array_key_exists(self::HEADER_ARCHIVED, $headers)) {
$message = $message->withArchived($headers[self::HEADER_ARCHIVED]);
}
$headers = $this->headers;

unset(
$headers[self::HEADER_AGGREGATE_NAME],
$headers[self::HEADER_AGGREGATE_ID],
$headers[self::HEADER_PLAYHEAD],
$headers[self::HEADER_RECORDED_ON],
$headers[self::HEADER_NEW_STREAM_START],
$headers[self::HEADER_ARCHIVED],
$headers[self::HEADER_NEW_STREAM_START],
);

return $message->withCustomHeaders($headers);
return $headers;
}
}
2 changes: 1 addition & 1 deletion src/Outbox/DoctrineOutboxStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public function retrieveOutboxMessages(int|null $limit = null): array
function (array $data) {
$message = $this->messageSerializer->deserialize($data['message']);

return $message->withCustomHeader(self::HEADER_OUTBOX_IDENTIFIER, $data['id']);
return $message->withHeader(self::HEADER_OUTBOX_IDENTIFIER, $data['id']);
},
$result,
);
Expand Down
13 changes: 11 additions & 2 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\EventBus\Decorator\MessageDecorator;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\HeaderNotFound;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata;
use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound;
Expand Down Expand Up @@ -222,7 +223,9 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $
->withAggregateName($aggregateName)
->withAggregateId($aggregateId)
->withPlayhead(++$playhead)
->withRecordedOn($clock->now());
->withRecordedOn($clock->now())
->withArchived(false)
->withNewStreamStart(false);

if ($messageDecorator) {
return $messageDecorator($message);
Expand Down Expand Up @@ -371,7 +374,13 @@ private function archive(Message ...$messages): void
$lastMessageWithNewStreamStart = null;

foreach ($messages as $message) {
if (!$message->newStreamStart()) {
try {
$newStreamStart = $message->newStreamStart();
} catch (HeaderNotFound) {
continue;
}

if (!$newStreamStart) {
continue;
}

Expand Down
4 changes: 3 additions & 1 deletion src/Store/DoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ private function buildGenerator(
->withAggregateId($data['aggregate_id'])
->withPlayhead(DoctrineHelper::normalizePlayhead($data['playhead'], $platform))
->withRecordedOn(DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform))
->withCustomHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform));
->withArchived(DoctrineHelper::normalizeArchived($data['archived'], $platform))

Check failure on line 106 in src/Store/DoctrineDbalStoreStream.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidArrayOffset

src/Store/DoctrineDbalStoreStream.php:106:66: InvalidArrayOffset: Cannot access value on variable $data using offset value of 'archived', expecting 'id', 'aggregate', 'aggregate_id', 'playhead', 'event', 'payload', 'recorded_on' or 'custom_headers' (see https://psalm.dev/115)
->withNewStreamStart(DoctrineHelper::normalizeNewStreamStart($data['new_stream_start'], $platform))

Check failure on line 107 in src/Store/DoctrineDbalStoreStream.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidArrayOffset

src/Store/DoctrineDbalStoreStream.php:107:78: InvalidArrayOffset: Cannot access value on variable $data using offset value of 'new_stream_start', expecting 'id', 'aggregate', 'aggregate_id', 'playhead', 'event', 'payload', 'recorded_on' or 'custom_headers' (see https://psalm.dev/115)
->withHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform));

Check failure on line 108 in src/Store/DoctrineDbalStoreStream.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

ArgumentTypeCoercion

src/Store/DoctrineDbalStoreStream.php:108:31: ArgumentTypeCoercion: Argument 1 of Patchlevel\EventSourcing\EventBus\Message::withHeaders expects array{aggregateId?: string, aggregateName?: string, archived?: bool, newStreamStart?: bool, playhead?: int<1, max>, recordedOn?: DateTimeImmutable, ...<string, mixed>}, but parent type array<string, mixed> provided (see https://psalm.dev/193)
}
}
}
Loading

0 comments on commit 848c9c1

Please sign in to comment.