Skip to content

Commit

Permalink
Merge pull request #493 from patchlevel/rewrite-outbox-store
Browse files Browse the repository at this point in the history
Update OutboxStore schema, extract MessageSerializer from WatchServer
  • Loading branch information
DavidBadura authored Feb 7, 2024
2 parents 6c6317f + b4c1ec8 commit 0b14347
Show file tree
Hide file tree
Showing 23 changed files with 547 additions and 374 deletions.
5 changes: 5 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<code>Headers</code>
</InvalidReturnType>
</file>
<file src="src/EventBus/Serializer/EventSerializerMessageSerializer.php">
<MixedArgumentTypeCoercion>
<code><![CDATA[$data['headers']]]></code>
</MixedArgumentTypeCoercion>
</file>
<file src="src/Metadata/AggregateRoot/AggregateRootMetadataAwareMetadataFactory.php">
<InvalidReturnStatement>
<code>$aggregate::metadata()</code>
Expand Down
1 change: 1 addition & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
xsi:noNamespaceSchemaLocation="/vendor/phpunit/phpunit/phpunit.xsd"
cacheDirectory=".phpunit.cache"
requireCoverageMetadata="false"
displayDetailsOnTestsThatTriggerWarnings="true"
>
<testsuites>
<testsuite name="unit">
Expand Down
4 changes: 2 additions & 2 deletions src/Console/InvalidArgumentGiven.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use InvalidArgumentException;

use function gettype;
use function get_debug_type;
use function sprintf;

final class InvalidArgumentGiven extends InvalidArgumentException
Expand All @@ -19,7 +19,7 @@ public function __construct(
sprintf(
'Invalid argument given: need type "%s" got "%s"',
$need,
gettype($value),
get_debug_type($value),
),
);
}
Expand Down
43 changes: 27 additions & 16 deletions src/Console/OutputStyle.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,43 @@
use Symfony\Component\Console\Style\SymfonyStyle;
use Throwable;

use function array_keys;
use function array_values;
use function sprintf;

final class OutputStyle extends SymfonyStyle
{
public function message(EventSerializer $serializer, Message $message): void
{
$event = $message->event();
$data = $serializer->serialize($event, [Encoder::OPTION_PRETTY_PRINT => true]);

try {
$data = $serializer->serialize($event, [Encoder::OPTION_PRETTY_PRINT => true]);
} catch (Throwable $error) {
$this->error(
sprintf(
'Error while serializing event "%s": %s',
$message->event()::class,
$error->getMessage(),
),
);

if ($this->isVeryVerbose()) {
$this->throwable($error);
}

return;
}

$this->title($data->name);

$this->horizontalTable([
'eventClass',
'aggregateName',
'aggregateId',
'playhead',
'recordedOn',
], [
[
$event::class,
$message->aggregateName(),
$message->aggregateId(),
$message->playhead(),
$message->recordedOn()->format(DateTimeInterface::ATOM),
],
]);
$headers = $message->headers();

if (isset($headers['recordedOn']) && $headers['recordedOn'] instanceof DateTimeInterface) {
$headers['recordedOn'] = $headers['recordedOn']->format(DateTimeInterface::ATOM);
}

$this->horizontalTable(array_keys($headers), [array_values($headers)]);

$this->block($data->payload);
}
Expand Down
28 changes: 28 additions & 0 deletions src/EventBus/Serializer/DeserializeFailed.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus\Serializer;

use RuntimeException;

use function get_debug_type;
use function sprintf;

final class DeserializeFailed extends RuntimeException
{
public static function decodeFailed(): self
{
return new self('Error while decoding message');
}

public static function invalidData(mixed $value): self
{
return new self(
sprintf(
'Invalid data: %s',
get_debug_type($value),
),
);
}
}
71 changes: 71 additions & 0 deletions src/EventBus/Serializer/EventSerializerMessageSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus\Serializer;

use DateTimeImmutable;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;

use function base64_decode;
use function base64_encode;
use function is_array;
use function is_string;
use function serialize;
use function unserialize;

final class EventSerializerMessageSerializer implements MessageSerializer
{
public function __construct(
private readonly EventSerializer $eventSerializer,
) {
}

public function serialize(Message $message): string
{
$serializedEvent = $this->eventSerializer->serialize($message->event());

return base64_encode(
serialize(
[
'serializedEvent' => $serializedEvent,
'headers' => $message->headers(),
],
),
);
}

public function deserialize(string $content): Message
{
$decodedString = base64_decode($content, true);

if (!is_string($decodedString)) {
throw DeserializeFailed::decodeFailed();
}

$data = unserialize(
$decodedString,
[
'allowed_classes' => [
SerializedEvent::class,
DateTimeImmutable::class,
],
],
);

if (
!is_array($data)
|| !isset($data['serializedEvent'], $data['headers'])
|| !$data['serializedEvent'] instanceof SerializedEvent
|| !is_array($data['headers'])
) {
throw DeserializeFailed::invalidData($data);
}

$event = $this->eventSerializer->deserialize($data['serializedEvent']);

return Message::createWithHeaders($event, $data['headers']);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\WatchServer;
namespace Patchlevel\EventSourcing\EventBus\Serializer;

use Patchlevel\EventSourcing\EventBus\Message;

Expand Down
41 changes: 41 additions & 0 deletions src/EventBus/Serializer/PhpNativeMessageSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus\Serializer;

use Patchlevel\EventSourcing\EventBus\Message;

use function base64_decode;
use function base64_encode;
use function is_string;
use function serialize;
use function unserialize;

final class PhpNativeMessageSerializer implements MessageSerializer
{
public function serialize(Message $message): string
{
return base64_encode(serialize($message));
}

public function deserialize(string $content): Message
{
$decodedString = base64_decode($content, true);

if (!is_string($decodedString)) {
throw DeserializeFailed::decodeFailed();
}

$message = unserialize(
$decodedString,
['allowed_classes' => true],
);

if (!$message instanceof Message) {
throw DeserializeFailed::invalidData($message);
}

return $message;
}
}
93 changes: 36 additions & 57 deletions src/Outbox/DoctrineOutboxStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\EventBus\HeaderNotFound;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer;
use Patchlevel\EventSourcing\Schema\SchemaConfigurator;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
use Patchlevel\EventSourcing\Store\DoctrineHelper;
use Patchlevel\EventSourcing\Store\WrongQueryResult;

use function array_map;
Expand All @@ -20,9 +19,11 @@

final class DoctrineOutboxStore implements OutboxStore, SchemaConfigurator
{
public const HEADER_OUTBOX_IDENTIFIER = 'outboxIdentifier';

public function __construct(
private readonly Connection $connection,
private readonly EventSerializer $serializer,
private readonly MessageSerializer $messageSerializer,
private readonly string $outboxTable = 'outbox',
) {
}
Expand All @@ -32,24 +33,10 @@ public function saveOutboxMessage(Message ...$messages): void
$this->connection->transactional(
function (Connection $connection) use ($messages): void {
foreach ($messages as $message) {
$event = $message->event();

$data = $this->serializer->serialize($event);

$connection->insert(
$this->outboxTable,
[
'aggregate' => $message->aggregateName(),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
'event' => $data->name,
'payload' => $data->payload,
'recorded_on' => $message->recordedOn(),
'custom_headers' => $message->customHeaders(),
],
[
'recorded_on' => Types::DATETIMETZ_IMMUTABLE,
'custom_headers' => Types::JSON,
'message' => $this->messageSerializer->serialize($message),
],
);
}
Expand All @@ -66,20 +53,14 @@ public function retrieveOutboxMessages(int|null $limit = null): array
->setMaxResults($limit)
->getSQL();

/** @var list<array{aggregate: string, aggregate_id: string, playhead: string|int, event: string, payload: string, recorded_on: string, custom_headers: string}> $result */
/** @var list<array{id: int, message: string}> $result */
$result = $this->connection->fetchAllAssociative($sql);
$platform = $this->connection->getDatabasePlatform();

return array_map(
function (array $data) use ($platform) {
$event = $this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload']));

return Message::create($event)
->withAggregateName($data['aggregate'])
->withAggregateId($data['aggregate_id'])
->withPlayhead(DoctrineHelper::normalizePlayhead($data['playhead'], $platform))
->withRecordedOn(DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform))
->withCustomHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform));
function (array $data) {
$message = $this->messageSerializer->deserialize($data['message']);

return $message->withCustomHeader(self::HEADER_OUTBOX_IDENTIFIER, $data['id']);
},
$result,
);
Expand All @@ -90,14 +71,8 @@ public function markOutboxMessageConsumed(Message ...$messages): void
$this->connection->transactional(
function (Connection $connection) use ($messages): void {
foreach ($messages as $message) {
$connection->delete(
$this->outboxTable,
[
'aggregate' => $message->aggregateName(),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
],
);
$id = $this->extractId($message);
$connection->delete($this->outboxTable, ['id' => $id]);
}
},
);
Expand All @@ -123,24 +98,28 @@ public function configureSchema(Schema $schema, Connection $connection): void
{
$table = $schema->createTable($this->outboxTable);

$table->addColumn('aggregate', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('aggregate_id', Types::STRING)
->setLength(32)
->setNotnull(true);
$table->addColumn('playhead', Types::INTEGER)
->setNotnull(true);
$table->addColumn('event', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('payload', Types::JSON)
->setNotnull(true);
$table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE)
->setNotnull(false);
$table->addColumn('custom_headers', Types::JSON)
->setNotnull(true);

$table->setPrimaryKey(['aggregate', 'aggregate_id', 'playhead']);
$table->addColumn('id', Types::INTEGER)
->setNotnull(true)
->setAutoincrement(true);
$table->addColumn('message', Types::STRING)
->setNotnull(true)
->setLength(16_000);

$table->setPrimaryKey(['id']);
}

private function extractId(Message $message): int
{
try {
$value = $message->customHeader(self::HEADER_OUTBOX_IDENTIFIER);
} catch (HeaderNotFound) {
throw OutboxHeaderIssue::missingHeader(self::HEADER_OUTBOX_IDENTIFIER);
}

if (!is_int($value)) {
throw OutboxHeaderIssue::invalidHeaderType($value);
}

return $value;
}
}
Loading

0 comments on commit 0b14347

Please sign in to comment.