Skip to content

Commit

Permalink
rewrite pipeline api
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Oct 18, 2024
1 parent af1e343 commit d601cc3
Show file tree
Hide file tree
Showing 21 changed files with 72 additions and 131 deletions.
4 changes: 1 addition & 3 deletions src/Message/Translator/ChainTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

use function array_values;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ChainMiddleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ChainMiddleware instead */
final class ChainTranslator implements Translator

Check failure on line 12 in src/Message/Translator/ChainTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/ChainTranslator.php:12:13: DeprecatedInterface: Patchlevel\EventSourcing\Message\Translator\Translator is marked deprecated (see https://psalm.dev/152)
{
/** @param iterable<Translator> $translators */

Check failure on line 14 in src/Message/Translator/ChainTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/ChainTranslator.php:14:16: DeprecatedInterface: Interface Patchlevel\EventSourcing\Message\Translator\Translator is marked as deprecated (see https://psalm.dev/152)
Expand Down
4 changes: 1 addition & 3 deletions src/Message/Translator/ExcludeEventTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware instead */
final class ExcludeEventTranslator implements Translator

Check failure on line 10 in src/Message/Translator/ExcludeEventTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/ExcludeEventTranslator.php:10:13: DeprecatedInterface: Patchlevel\EventSourcing\Message\Translator\Translator is marked deprecated (see https://psalm.dev/152)
{
/** @param list<class-string> $classes */
Expand Down
4 changes: 1 addition & 3 deletions src/Message/Translator/ExcludeEventWithHeaderTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventWithHeaderMiddleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventWithHeaderMiddleware instead */
final class ExcludeEventWithHeaderTranslator implements Translator

Check failure on line 10 in src/Message/Translator/ExcludeEventWithHeaderTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/ExcludeEventWithHeaderTranslator.php:10:13: DeprecatedInterface: Patchlevel\EventSourcing\Message\Translator\Translator is marked deprecated (see https://psalm.dev/152)
{
/** @param class-string $header */
Expand Down
4 changes: 1 addition & 3 deletions src/Message/Translator/FilterEventTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\FilterEventMiddleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\FilterEventMiddleware instead */
final class FilterEventTranslator implements Translator

Check failure on line 10 in src/Message/Translator/FilterEventTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/FilterEventTranslator.php:10:13: DeprecatedInterface: Patchlevel\EventSourcing\Message\Translator\Translator is marked deprecated (see https://psalm.dev/152)
{
/** @var callable(object $event):bool */
Expand Down
4 changes: 1 addition & 3 deletions src/Message/Translator/IncludeEventTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventWithHeaderMiddleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventWithHeaderMiddleware instead */
final class IncludeEventTranslator implements Translator

Check failure on line 10 in src/Message/Translator/IncludeEventTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/IncludeEventTranslator.php:10:13: DeprecatedInterface: Patchlevel\EventSourcing\Message\Translator\Translator is marked deprecated (see https://psalm.dev/152)
{
/** @param list<class-string> $classes */
Expand Down
4 changes: 1 addition & 3 deletions src/Message/Translator/IncludeEventWithHeaderTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventWithHeaderMiddleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventWithHeaderMiddleware instead */
final class IncludeEventWithHeaderTranslator implements Translator

Check failure on line 10 in src/Message/Translator/IncludeEventWithHeaderTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/IncludeEventWithHeaderTranslator.php:10:13: DeprecatedInterface: Patchlevel\EventSourcing\Message\Translator\Translator is marked deprecated (see https://psalm.dev/152)
{
/** @param class-string $header */
Expand Down
4 changes: 1 addition & 3 deletions src/Message/Translator/RecalculatePlayheadTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@

use function array_key_exists;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware instead */
final class RecalculatePlayheadTranslator implements Translator

Check failure on line 15 in src/Message/Translator/RecalculatePlayheadTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/RecalculatePlayheadTranslator.php:15:13: DeprecatedInterface: Patchlevel\EventSourcing\Message\Translator\Translator is marked deprecated (see https://psalm.dev/152)
{
/** @var array<string, positive-int> */
Expand Down
4 changes: 2 additions & 2 deletions src/Message/Translator/ReplaceEventTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
use Patchlevel\EventSourcing\Message\Message;

/**
* @template T of object
*
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware instead
*
* @template T of object
*/
final class ReplaceEventTranslator implements Translator

Check failure on line 14 in src/Message/Translator/ReplaceEventTranslator.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

DeprecatedInterface

src/Message/Translator/ReplaceEventTranslator.php:14:13: DeprecatedInterface: Patchlevel\EventSourcing\Message\Translator\Translator is marked deprecated (see https://psalm.dev/152)
{
Expand Down
4 changes: 1 addition & 3 deletions src/Message/Translator/Translator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware instead */
interface Translator
{
/** @return list<Message> */
Expand Down
4 changes: 1 addition & 3 deletions src/Message/Translator/UntilEventTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;

/**
* @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\UntilEventMiddleware instead
*/
/** @deprecated use Patchlevel\EventSourcing\Pipeline\Middleware\UntilEventMiddleware instead */
final class UntilEventTranslator implements Translator
{
public function __construct(
Expand Down
6 changes: 2 additions & 4 deletions src/Pipeline/Middleware/AggregateToStreamHeaderMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;

/**
* @experimental
*/
/** @experimental */
final class AggregateToStreamHeaderMiddleware implements Middleware
{
/** @return list<Message> */
Expand All @@ -29,7 +27,7 @@ public function __invoke(Message $message): array
$aggregateHeader->streamName(),
$aggregateHeader->playhead,
$aggregateHeader->recordedOn,
))
)),
];
}
}
39 changes: 18 additions & 21 deletions src/Pipeline/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@

namespace Patchlevel\EventSourcing\Pipeline;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Pipeline\Middleware\ChainMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware;
use Patchlevel\EventSourcing\Pipeline\Source\Source;
use Patchlevel\EventSourcing\Pipeline\Target\Target;

use function array_push;
use function count;
use function is_array;

final class Pipeline
{
private readonly Middleware $middleware;

/** @param list<Middleware>|Middleware $middlewares */
public function __construct(
private readonly Source $source,
private readonly Target $target,
array|Middleware $middlewares = [],
private readonly int $bufferSize = 1_000,
private readonly float|int $bufferSize = INF,
) {
if (is_array($middlewares)) {
$this->middleware = new ChainMiddleware($middlewares);
Expand All @@ -27,34 +30,28 @@ public function __construct(
}
}

public function run(): void
/** @param iterable<Message> $messages */
public function run(iterable $messages): void
{
$buffer = [];

foreach ($this->source->load() as $message) {
foreach ($messages as $message) {
$result = ($this->middleware)($message);

array_push($buffer, ...$result);

if (count($buffer) >= $this->bufferSize) {
$this->target->save(...$result);
$buffer = [];
if (count($buffer) < $this->bufferSize) {
continue;
}
}

if (count($buffer) > 0) {
$this->target->save(...$buffer);
$buffer = [];
}
}

public static function execute(
Source $source,
Target $target,
array|Middleware $middlewares = [],
$bufferSize = 1_000,
): void
{
$pipeline = new self($source, $target, $middlewares, $bufferSize);
$pipeline->run();
if ($buffer === []) {
return;
}

$this->target->save(...$buffer);
}
}
}
22 changes: 0 additions & 22 deletions src/Pipeline/Source/InMemorySource.php

This file was deleted.

13 changes: 0 additions & 13 deletions src/Pipeline/Source/Source.php

This file was deleted.

21 changes: 0 additions & 21 deletions src/Pipeline/Source/StoreSource.php

This file was deleted.

21 changes: 21 additions & 0 deletions src/Pipeline/Target/EventBusTarget.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Target;

use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Message;

final class EventBusTarget implements Target
{
public function __construct(
private readonly EventBus $eventBus,
) {
}

public function save(Message ...$message): void
{
$this->eventBus->dispatch(...$message);
}
}
4 changes: 2 additions & 2 deletions src/Pipeline/Target/InMemoryTarget.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ final class InMemoryTarget implements Target
public function save(Message ...$message): void
{
foreach ($message as $m) {
$this->messages[] = $m;
$this->messages[] = $m;
}
}

Expand All @@ -28,4 +28,4 @@ public function clear(): void
{
$this->messages = [];
}
}
}
4 changes: 2 additions & 2 deletions src/Pipeline/Target/StoreTarget.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
final class StoreTarget implements Target
{
public function __construct(
private readonly Store $store
private readonly Store $store,
) {
}

public function save(Message ...$message): void
{
$this->store->save(...$message);
}
}
}
2 changes: 1 addition & 1 deletion src/Pipeline/Target/Target.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
interface Target
{
public function save(Message ...$message): void;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Pipeline\Middleware\AggregateToStreamHeaderMiddleware;
use Patchlevel\EventSourcing\Pipeline\Pipeline;
use Patchlevel\EventSourcing\Pipeline\Source\InMemorySource;
use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand All @@ -20,26 +19,32 @@
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber;

use function count;

#[Subscriber('migrate', RunMode::Once)]
final class MigrateAggregateToStreamStoreSubscriber implements BatchableSubscriber
{
private const BATCH_SIZE = 10_000;

private readonly SchemaDirector $schemaDirector;

/**
* @var list<Message>
*/
/** @var list<Message> */
private array $messages = [];

private readonly Pipeline $pipeline;

public function __construct(
private readonly StreamDoctrineDbalStore $targetStore,
) {
$this->schemaDirector = new DoctrineSchemaDirector(
$targetStore->connection(),
new ChainDoctrineSchemaConfigurator([
$targetStore,
]),
new ChainDoctrineSchemaConfigurator([$targetStore]),
);

$this->pipeline = new Pipeline(
new StoreTarget($this->targetStore),
new AggregateToStreamHeaderMiddleware(),
self::BATCH_SIZE * 10, // make sure we have only one batch
);
}

Expand All @@ -56,15 +61,9 @@ public function beginBatch(): void

public function commitBatch(): void
{
$messages = $this->messages;
$this->messages = [];
$this->pipeline->run($this->messages);

Pipeline::execute(
new InMemorySource($messages),
new StoreTarget($this->targetStore),
new AggregateToStreamHeaderMiddleware(),
self::BATCH_SIZE * 10, // make sure we have only one batch
);
$this->messages = [];
}

public function rollbackBatch(): void
Expand Down
Loading

0 comments on commit d601cc3

Please sign in to comment.