Skip to content

Commit

Permalink
use aggregate name in message to decouple components
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Feb 6, 2024
1 parent ec16bc7 commit ef1cc8a
Show file tree
Hide file tree
Showing 16 changed files with 72 additions and 120 deletions.
4 changes: 2 additions & 2 deletions src/EventBus/HeaderNotFound.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ private function __construct(
parent::__construct(sprintf('message header "%s" is not defined', $name));
}

public static function aggregateClass(): self
public static function aggregateName(): self
{
return new self(Message::HEADER_AGGREGATE_CLASS);
return new self(Message::HEADER_AGGREGATE_NAME);
}

public static function aggregateId(): self
Expand Down
35 changes: 14 additions & 21 deletions src/EventBus/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
namespace Patchlevel\EventSourcing\EventBus;

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

use function array_key_exists;

/**
* @template-covariant T of object
* @psalm-immutable
* @psalm-type Headers = array{
* aggregateClass?: class-string<AggregateRoot>,
* aggregateName?: string,
* aggregateId?: string,
* playhead?: positive-int,
* recordedOn?: DateTimeImmutable,
Expand All @@ -23,15 +22,14 @@
*/
final class Message
{
public const HEADER_AGGREGATE_CLASS = 'aggregateClass';
public const HEADER_AGGREGATE_NAME = 'aggregateName';
public const HEADER_AGGREGATE_ID = 'aggregateId';
public const HEADER_PLAYHEAD = 'playhead';
public const HEADER_RECORDED_ON = 'recordedOn';
public const HEADER_ARCHIVED = 'archived';
public const HEADER_NEW_STREAM_START = 'newStreamStart';

/** @var class-string<AggregateRoot>|null */
private string|null $aggregateClass = null;
private string|null $aggregateName = null;
private string|null $aggregateId = null;
/** @var positive-int|null */
private int|null $playhead = null;
Expand Down Expand Up @@ -66,27 +64,22 @@ public function event(): object
return $this->event;
}

/**
* @return class-string<AggregateRoot>
*
* @throws HeaderNotFound
*/
public function aggregateClass(): string
/** @throws HeaderNotFound */
public function aggregateName(): string
{
$value = $this->aggregateClass;
$value = $this->aggregateName;

if ($value === null) {
throw HeaderNotFound::aggregateClass();
throw HeaderNotFound::aggregateName();
}

return $value;
}

/** @param class-string<AggregateRoot> $value */
public function withAggregateClass(string $value): self
public function withAggregateName(string $value): self
{
$message = clone $this;
$message->aggregateClass = $value;
$message->aggregateName = $value;

return $message;
}
Expand Down Expand Up @@ -220,8 +213,8 @@ public function headers(): array
{
$headers = $this->customHeaders;

if ($this->aggregateClass !== null) {
$headers[self::HEADER_AGGREGATE_CLASS] = $this->aggregateClass;
if ($this->aggregateName !== null) {
$headers[self::HEADER_AGGREGATE_NAME] = $this->aggregateName;
}

if ($this->aggregateId !== null) {
Expand All @@ -247,8 +240,8 @@ public static function createWithHeaders(object $event, array $headers): self
{
$message = self::create($event);

if (array_key_exists(self::HEADER_AGGREGATE_CLASS, $headers)) {
$message = $message->withAggregateClass($headers[self::HEADER_AGGREGATE_CLASS]);
if (array_key_exists(self::HEADER_AGGREGATE_NAME, $headers)) {
$message = $message->withAggregateName($headers[self::HEADER_AGGREGATE_NAME]);
}

if (array_key_exists(self::HEADER_AGGREGATE_ID, $headers)) {
Expand All @@ -272,7 +265,7 @@ public static function createWithHeaders(object $event, array $headers): self
}

unset(
$headers[self::HEADER_AGGREGATE_CLASS],
$headers[self::HEADER_AGGREGATE_NAME],
$headers[self::HEADER_AGGREGATE_ID],
$headers[self::HEADER_PLAYHEAD],
$headers[self::HEADER_RECORDED_ON],
Expand Down
8 changes: 3 additions & 5 deletions src/Outbox/DoctrineOutboxStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Schema\SchemaConfigurator;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
Expand All @@ -24,7 +23,6 @@ final class DoctrineOutboxStore implements OutboxStore, SchemaConfigurator
public function __construct(
private readonly Connection $connection,
private readonly EventSerializer $serializer,
private readonly AggregateRootRegistry $aggregateRootRegistry,
private readonly string $outboxTable = 'outbox',
) {
}
Expand All @@ -41,7 +39,7 @@ function (Connection $connection) use ($messages): void {
$connection->insert(
$this->outboxTable,
[
'aggregate' => $this->aggregateRootRegistry->aggregateName($message->aggregateClass()),
'aggregate' => $message->aggregateName(),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
'event' => $data->name,
Expand Down Expand Up @@ -77,7 +75,7 @@ function (array $data) use ($platform) {
$event = $this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload']));

return Message::create($event)
->withAggregateClass($this->aggregateRootRegistry->aggregateClass($data['aggregate']))
->withAggregateName($data['aggregate'])
->withAggregateId($data['aggregate_id'])
->withPlayhead(DoctrineHelper::normalizePlayhead($data['playhead'], $platform))
->withRecordedOn(DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform))
Expand All @@ -95,7 +93,7 @@ function (Connection $connection) use ($messages): void {
$connection->delete(
$this->outboxTable,
[
'aggregate' => $this->aggregateRootRegistry->aggregateName($message->aggregateClass()),
'aggregate' => $message->aggregateName(),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
],
Expand Down
25 changes: 10 additions & 15 deletions src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@

namespace Patchlevel\EventSourcing\Pipeline\Middleware;

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\EventBus\Message;

use function array_key_exists;

final class RecalculatePlayheadMiddleware implements Middleware
{
/** @var array<class-string<AggregateRoot>, array<string, positive-int>> */
/** @var array<string, array<string, positive-int>> */
private array $index = [];

/** @return list<Message> */
public function __invoke(Message $message): array
{
$playhead = $this->nextPlayhead($message->aggregateClass(), $message->aggregateId());
$playhead = $this->nextPlayhead($message->aggregateName(), $message->aggregateId());

if ($message->playhead() === $playhead) {
return [$message];
Expand All @@ -33,23 +32,19 @@ public function reset(): void
$this->index = [];
}

/**
* @param class-string<AggregateRoot> $aggregateClass
*
* @return positive-int
*/
private function nextPlayhead(string $aggregateClass, string $aggregateId): int
/** @return positive-int */
private function nextPlayhead(string $aggregateName, string $aggregateId): int
{
if (!array_key_exists($aggregateClass, $this->index)) {
$this->index[$aggregateClass] = [];
if (!array_key_exists($aggregateName, $this->index)) {
$this->index[$aggregateName] = [];
}

if (!array_key_exists($aggregateId, $this->index[$aggregateClass])) {
$this->index[$aggregateClass][$aggregateId] = 1;
if (!array_key_exists($aggregateId, $this->index[$aggregateName])) {
$this->index[$aggregateName][$aggregateId] = 1;
} else {
$this->index[$aggregateClass][$aggregateId]++;
$this->index[$aggregateName][$aggregateId]++;
}

return $this->index[$aggregateClass][$aggregateId];
return $this->index[$aggregateName][$aggregateId];
}
}
49 changes: 26 additions & 23 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function load(AggregateRootId $id): AggregateRoot
$this->logger->debug(
sprintf(
'Repository: Aggregate "%s" with the id "%s" loaded from snapshot.',
$this->metadata->className,
$this->metadata->name,
$id->toString(),
),
);
Expand All @@ -78,7 +78,7 @@ public function load(AggregateRootId $id): AggregateRoot
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" could not be rebuild from snapshot.',
$this->metadata->className,
$this->metadata->name,
$id->toString(),
),
);
Expand All @@ -88,23 +88,23 @@ public function load(AggregateRootId $id): AggregateRoot
$this->logger->debug(
sprintf(
'Repository: Snapshot for aggregate "%s" with the id "%s" not found.',
$this->metadata->className,
$this->metadata->name,
$id->toString(),
),
);
} catch (SnapshotVersionInvalid) {
$this->logger->debug(
sprintf(
'Repository: Snapshot for aggregate "%s" with the id "%s" is invalid.',
$this->metadata->className,
$this->metadata->name,
$id->toString(),
),
);
}
}

$criteria = (new CriteriaBuilder())
->aggregateClass($this->metadata->className)
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->archived(false)
->build();
Expand All @@ -120,7 +120,7 @@ public function load(AggregateRootId $id): AggregateRoot
$this->logger->debug(
sprintf(
'Repository: Aggregate "%s" with the id "%s" not found.',
$this->metadata->className,
$this->metadata->name,
$id->toString(),
),
);
Expand All @@ -145,7 +145,7 @@ public function load(AggregateRootId $id): AggregateRoot
$this->logger->debug(
sprintf(
'Repository: Aggregate "%s" with the id "%s" loaded from store.',
$this->metadata->className,
$this->metadata->name,
$id->toString(),
),
);
Expand All @@ -156,7 +156,7 @@ public function load(AggregateRootId $id): AggregateRoot
public function has(AggregateRootId $id): bool
{
$criteria = (new CriteriaBuilder())
->aggregateClass($this->metadata->className)
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->build();

Expand All @@ -167,6 +167,7 @@ public function has(AggregateRootId $id): bool
public function save(AggregateRoot $aggregate): void
{
$this->assertValidAggregate($aggregate);
$aggregateId = $aggregate->aggregateRootId()->toString();

try {
$events = $aggregate->releaseEvents();
Expand All @@ -183,8 +184,8 @@ public function save(AggregateRoot $aggregate): void
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" is unknown.',
$this->metadata->className,
$aggregate->aggregateRootId()->toString(),
$this->metadata->name,
$aggregateId,
),
);

Expand All @@ -195,8 +196,8 @@ public function save(AggregateRoot $aggregate): void
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" has a playhead mismatch. Expected "%d" but got "%d".',
$this->metadata->className,
$aggregate->aggregateRootId()->toString(),
$this->metadata->name,
$aggregateId,
$aggregate->playhead(),
$eventCount,
),
Expand All @@ -213,11 +214,13 @@ public function save(AggregateRoot $aggregate): void
$messageDecorator = $this->messageDecorator;
$clock = $this->clock;

$aggregateName = $this->metadata->name;

$messages = array_map(
static function (object $event) use ($aggregate, &$playhead, $messageDecorator, $clock) {
static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $messageDecorator, $clock) {
$message = Message::create($event)
->withAggregateClass($aggregate::class)
->withAggregateId($aggregate->aggregateRootId()->toString())
->withAggregateName($aggregateName)
->withAggregateId($aggregateId)
->withPlayhead(++$playhead)
->withRecordedOn($clock->now());

Expand All @@ -230,7 +233,7 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator,
$events,
);

$this->store->transactional(function () use ($messages, $aggregate, $newAggregate): void {
$this->store->transactional(function () use ($messages, $aggregate, $aggregateId, $newAggregate): void {
try {
$this->store->save(...$messages);
} catch (UniqueConstraintViolation) {
Expand All @@ -239,7 +242,7 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator,
sprintf(
'Repository: Aggregate "%s" with the id "%s" already exists.',
$aggregate::class,
$aggregate->aggregateRootId()->toString(),
$aggregateId,
),
);

Expand All @@ -250,7 +253,7 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator,
sprintf(
'Repository: Aggregate "%s" with the id "%s" is outdated.',
$aggregate::class,
$aggregate->aggregateRootId()->toString(),
$aggregateId,
),
);

Expand All @@ -266,8 +269,8 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator,
$this->logger->debug(
sprintf(
'Repository: Aggregate "%s" with the id "%s" saved.',
$this->metadata->className,
$aggregate->aggregateRootId()->toString(),
$this->metadata->name,
$aggregateId,
),
);
} catch (Throwable $exception) {
Expand All @@ -289,7 +292,7 @@ private function loadFromSnapshot(string $aggregateClass, AggregateRootId $id):
$aggregate = $this->snapshotStore->load($aggregateClass, $id);

$criteria = (new CriteriaBuilder())
->aggregateClass($this->metadata->className)
->aggregateName($this->metadata->name)
->aggregateId($id->toString())
->fromPlayhead($aggregate->playhead())
->build();
Expand Down Expand Up @@ -380,15 +383,15 @@ private function archive(Message ...$messages): void
}

$this->store->archiveMessages(
$lastMessageWithNewStreamStart->aggregateClass(),
$lastMessageWithNewStreamStart->aggregateName(),
$lastMessageWithNewStreamStart->aggregateId(),
$lastMessageWithNewStreamStart->playhead(),
);

$this->logger->debug(
sprintf(
'Repository: Archive messages for aggregate "%s" with the id "%s" until playhead "%d".',
$lastMessageWithNewStreamStart->aggregateClass(),
$lastMessageWithNewStreamStart->aggregateName(),
$lastMessageWithNewStreamStart->aggregateId(),
$lastMessageWithNewStreamStart->playhead(),
),
Expand Down
Loading

0 comments on commit ef1cc8a

Please sign in to comment.