Skip to content

Commit

Permalink
Merge pull request #643 from patchlevel/pipeline
Browse files Browse the repository at this point in the history
Add pipe & reducer
  • Loading branch information
DavidBadura authored Nov 7, 2024
2 parents ab35359 + be9af28 commit f289c56
Show file tree
Hide file tree
Showing 22 changed files with 1,310 additions and 11 deletions.
11 changes: 11 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@
<code><![CDATA[$method->getName()]]></code>
</MixedMethodCall>
</file>
<file src="src/Message/Pipe.php">
<LessSpecificImplementedReturnType>
<code><![CDATA[Traversable<Message>]]></code>
</LessSpecificImplementedReturnType>
</file>
<file src="src/Message/Reducer.php">
<InvalidPropertyAssignmentValue>
<code><![CDATA[$this->handlers]]></code>
<code><![CDATA[[]]]></code>
</InvalidPropertyAssignmentValue>
</file>
<file src="src/Message/Serializer/DefaultHeadersSerializer.php">
<MixedArgumentTypeCoercion>
<code><![CDATA[$headerPayload]]></code>
Expand Down
115 changes: 114 additions & 1 deletion docs/pages/message.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,31 @@ use Patchlevel\EventSourcing\Message\Message;
/** @var Message $message */
$message->header(ApplicationHeader::class);
```
## Pipe

The `Pipe` is a construct that allows you to chain multiple translators.
This can be used to manipulate, filter or expand messages or events.
This can be used for anti-corruption layers, data migration, or to fix errors in the event stream.

```php
use Patchlevel\EventSourcing\Message\Pipe;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$messages = new Pipe(
$messages,
new ExcludeEventTranslator([ProfileCreated::class]),
new RecalculatePlayheadTranslator(),
);

foreach ($messages as $message) {
// do something with the message
}
```
## Translator

Translator can be used to manipulate, filter or expand messages or events.
This can be used for anti-corruption layers, data migration, or to fix errors in the event stream.
Translators can also be seen as middlewares.

### Exclude

Expand Down Expand Up @@ -190,6 +211,11 @@ use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$translator = new RecalculatePlayheadTranslator();
```
!!! warning

The `RecalculatePlayheadTranslator` is and need to be stateful.
You can't reuse the translator for multiple streams.

!!! tip

If you migrate your event stream, you can use the `RecalculatePlayheadTranslator` to fix the playhead.
Expand Down Expand Up @@ -260,6 +286,93 @@ final class SplitProfileCreatedTranslator implements Translator
You don't have to migrate the store directly for every change,
but you can also use the [upcasting](upcasting.md) feature.

## Reducer

The `Reducer` is a construct that allows you to reduce messages to a state.
This can be used to build temporal projections or to create a read model.

### Initial state

The initial state is the state that is used at the beginning of the reduction.

```php
use Patchlevel\EventSourcing\Message\Reducer;

$state = (new Reducer())
->initialState(['count' => 0])
->reduce($messages); // state is ['count' => 0]
```
### When

The `when` method is used to define a function that is called when a specific event occurs.
It gets the message and the current state and returns the new state.

```php
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Message\Reducer;

$state = (new Reducer())
->initialState([
'names' => [],
])
->when(
ProfileCreated::class,
static function (Message $message, array $state): array {
$state['names'][] = $message->event()->name;

return $state;
},
)
->reduce($messages); // state is ['names' => ['foo', 'bar']]
```
### Match

You can also use the `match` method to define multiple events at once.

```php
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Message\Reducer;

$state = (new Reducer())
->match([
ProfileCreated::class => static function (Message $message, array $state): array {
return [...$state, $message];
},
])
->reduce($messages);
```
### Any

If you want to react to any event, you can use the `any` method.

```php
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Message\Reducer;

$state = (new Reducer())
->any(
static function (Message $message, array $state): array {
return [...$state, $message];
},
)
->reduce($messages);
```
### Finalize

If you want to do something with the state after the reduction, you can use the `finalize` method.
This method gets the state and returns the new state.

```php
use Patchlevel\EventSourcing\Message\Reducer;

$state = (new Reducer())
->finalize(
static function (array $state): array {
return ['count' => count($state['messages'])];
},
)
->reduce($messages); // state is ['count' => 2]
```
## Learn more

* [How to decorate messages](message_decorator.md)
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ At this step, you must process all the data.
The `rollbackBatch` method is called when an error occurs and the batching needs to be aborted.
Here, you can respond to the error and potentially perform a database rollback.

The method `forceCommit` is called after each handled event,
The method `forceCommit` is called after each handled event,
and you can decide whether the batch commit process should start now.
This helps to determine the batch size and thus avoid memory overflow.

Expand Down
9 changes: 9 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ public function withHeader(object $header): self
return $message;
}

/** @param class-string $name */
public function removeHeader(string $name): self
{
$message = clone $this;
unset($message->headers[$name]);

return $message;
}

/** @return list<object> */
public function headers(): array
{
Expand Down
59 changes: 59 additions & 0 deletions src/Message/Pipe.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message;

use Generator;
use IteratorAggregate;
use Patchlevel\EventSourcing\Message\Translator\ChainTranslator;
use Patchlevel\EventSourcing\Message\Translator\Translator;
use Traversable;

use function array_values;
use function iterator_to_array;

/** @implements IteratorAggregate<int, Message> */
final class Pipe implements IteratorAggregate
{
private Translator $translator;

/** @param iterable<Message> $messages */
public function __construct(
private readonly iterable $messages,
Translator ...$translators,
) {
$this->translator = new ChainTranslator($translators);
}

/** @return Traversable<Message> */
public function getIterator(): Traversable
{
return $this->createGenerator(
$this->messages,
$this->translator,
);
}

/** @return list<Message> */
public function toArray(): array
{
return array_values(
iterator_to_array($this->getIterator()),
);
}

/**
* @param iterable<Message> $messages
*
* @return Generator<Message>
*/
private function createGenerator(iterable $messages, Translator $translator): Generator
{
foreach ($messages as $message) {
foreach ($translator($message) as $translatedMessage) {
yield $translatedMessage;
}
}
}
}
126 changes: 126 additions & 0 deletions src/Message/Reducer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message;

use Closure;

/**
* @template STATE of array<array-key, mixed>
* @template OUT of array<array-key, mixed> = STATE
*/
final class Reducer
{
/** @var STATE */
private array $initState = [];

/** @var array<class-string, list<Closure(Message, STATE): STATE>> */
private array $handlers = [];

/** @var list<Closure(Message, STATE): STATE> */
private array $anyHandlers = [];

/** @var (Closure(STATE): OUT)|null */
private Closure|null $finalizeHandler = null;

/**
* @param STATE $initState
*
* @return $this
*/
public function initState(array $initState): self
{
$this->initState = $initState;

return $this;
}

/**
* @param class-string<T1> $event
* @param Closure(Message<T1>, STATE): STATE $closure
*
* @return $this
*
* @template T1 of object
*/
public function when(string $event, Closure $closure): self
{
if (!isset($this->handlers[$event])) {
$this->handlers[$event] = [];
}

$this->handlers[$event][] = $closure;

return $this;
}

/**
* @param Closure(Message, STATE): STATE $closure
*
* @return $this
*/
public function any(Closure $closure): self
{
$this->anyHandlers[] = $closure;

return $this;
}

/**
* @param array<class-string, Closure(Message, STATE): STATE> $map
*
* @return $this
*/
public function match(array $map): self
{
foreach ($map as $event => $closure) {
$this->when($event, $closure);
}

return $this;
}

/**
* @param Closure(STATE): OUT $closure
*
* @return $this
*/
public function finalize(Closure $closure): self
{
$this->finalizeHandler = $closure;

return $this;
}

/**
* @param iterable<Message> $messages
*
* @return OUT|STATE
* @psalm-return (OUT is STATE ? STATE : OUT)
*/
public function reduce(iterable $messages): array
{
$state = $this->initState;

foreach ($messages as $message) {
$event = $message->event();

if (isset($this->handlers[$event::class])) {
foreach ($this->handlers[$event::class] as $handler) {
$state = $handler($message, $state);
}
}

foreach ($this->anyHandlers as $handler) {
$state = $handler($message, $state);
}
}

if ($this->finalizeHandler !== null) {
$state = ($this->finalizeHandler)($state);
}

return $state;
}
}
Loading

0 comments on commit f289c56

Please sign in to comment.