Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipe & reducer #643

Merged
merged 12 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@
<code><![CDATA[$method->getName()]]></code>
</MixedMethodCall>
</file>
<file src="src/Message/Pipe.php">
<LessSpecificImplementedReturnType>
<code><![CDATA[Traversable<Message>]]></code>
</LessSpecificImplementedReturnType>
</file>
<file src="src/Message/Reducer.php">
<InvalidPropertyAssignmentValue>
<code><![CDATA[$this->handlers]]></code>
<code><![CDATA[[]]]></code>
</InvalidPropertyAssignmentValue>
</file>
<file src="src/Message/Serializer/DefaultHeadersSerializer.php">
<MixedArgumentTypeCoercion>
<code><![CDATA[$headerPayload]]></code>
Expand Down
60 changes: 60 additions & 0 deletions docs/pages/message.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,66 @@ use Patchlevel\EventSourcing\Message\Message;
/** @var Message $message */
$message->header(ApplicationHeader::class);
```
## Pipe

```php
$messages = new Pipe(
$messages,
new ExcludeEventTranslator([ProfileCreated::class]),
);

foreach ($messages as $message) {
// do something with the message
}
```
## Reducer

### Initial state

```php
$state = (new Reducer())
->initialState(['count' => 0])
->reduce($messages);

// state is ['count' => 0]
```
### When

```php
$state = (new Reducer())
->initialState([
'names' => [],
])
->when(
ProfileCreated::class,
static function (Message $message, array $state): array {
$state['names'][] = $message->event()->name;

return $state;
},
)
->reduce($messages);

// state is ['names' => ['foo', 'bar']]
```
### Match

```php
$state = (new Reducer())
->match([
ProfileCreated::class => static function (Message $message, array $state): array {
return [...$state, $message];
},
])
->reduce($messages);
```
### Any


### Finalize



## Translator

Translator can be used to manipulate, filter or expand messages or events.
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ At this step, you must process all the data.
The `rollbackBatch` method is called when an error occurs and the batching needs to be aborted.
Here, you can respond to the error and potentially perform a database rollback.

The method `forceCommit` is called after each handled event,
The method `forceCommit` is called after each handled event,
and you can decide whether the batch commit process should start now.
This helps to determine the batch size and thus avoid memory overflow.

Expand Down
9 changes: 9 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ public function withHeader(object $header): self
return $message;
}

/** @param class-string $name */
public function removeHeader(string $name): self
{
$message = clone $this;
unset($message->headers[$name]);

return $message;
}

/** @return list<object> */
public function headers(): array
{
Expand Down
66 changes: 66 additions & 0 deletions src/Message/Pipe.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message;

use Generator;
use IteratorAggregate;
use Patchlevel\EventSourcing\Message\Translator\ChainTranslator;
use Patchlevel\EventSourcing\Message\Translator\Translator;
use Traversable;

use function array_values;
use function iterator_to_array;

/** @implements IteratorAggregate<int, Message> */
final class Pipe implements IteratorAggregate
{
private Translator $translator;

/**
* @param iterable<Message> $messages
* @param list<Translator>|Translator $translators
*/
public function __construct(
private readonly iterable $messages,
array|Translator $translators = [],
) {
$this->translator = $translators instanceof Translator
? $translators
: new ChainTranslator($translators);
}

/** @return Traversable<Message> */
public function getIterator(): Traversable
{
return $this->createGenerator(
$this->messages,
$this->translator,
);
}

/** @return list<Message> */
public function toArray(): array
{
return array_values(
iterator_to_array($this->getIterator()),
);
}

/**
* @param iterable<Message> $messages
*
* @return Generator<Message>
*/
private function createGenerator(iterable $messages, Translator $translator): Generator
{
foreach ($messages as $message) {
$result = $translator($message);

foreach ($result as $m) {
yield $m;
}
DavidBadura marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
126 changes: 126 additions & 0 deletions src/Message/Reducer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message;

use Closure;

/**
* @template STATE of array<array-key, mixed>
* @template OUT of array<array-key, mixed> = STATE
*/
final class Reducer
{
/** @var STATE */
private array $initState = [];

/** @var array<class-string, list<Closure(Message, STATE): STATE>> */
private array $handlers = [];

/** @var list<Closure(Message, STATE): STATE> */
private array $anyHandlers = [];

/** @var (Closure(STATE): OUT)|null */
private Closure|null $finalizeHandler = null;

/**
* @param STATE $initState
*
* @return $this
*/
public function initState(array $initState): self
{
$this->initState = $initState;

return $this;
}

/**
* @param class-string<T1> $event
* @param Closure(Message<T1>, STATE): STATE $closure
*
* @return $this
*
* @template T1 of object
*/
public function when(string $event, Closure $closure): self
{
if (!isset($this->handlers[$event])) {
$this->handlers[$event] = [];
}

$this->handlers[$event][] = $closure;

return $this;
}

/**
* @param Closure(Message, STATE): STATE $closure
*
* @return $this
*/
public function any(Closure $closure): self
{
$this->anyHandlers[] = $closure;

return $this;
}

/**
* @param array<class-string, Closure(Message, STATE): STATE> $map
*
* @return $this
*/
public function match(array $map): self
{
foreach ($map as $event => $closure) {
$this->when($event, $closure);
}

return $this;
}

/**
* @param Closure(STATE): OUT $closure
*
* @return $this
*/
public function finalize(Closure $closure): self
{
$this->finalizeHandler = $closure;

return $this;
}

/**
* @param iterable<Message> $messages
*
* @return OUT|STATE
* @psalm-return (OUT is STATE ? STATE : OUT)
*/
public function reduce(iterable $messages): array
{
$state = $this->initState;

foreach ($messages as $message) {
$event = $message->event();

if (isset($this->handlers[$event::class])) {
foreach ($this->handlers[$event::class] as $handler) {
$state = $handler($message, $state);
}
}

foreach ($this->anyHandlers as $handler) {
$state = $handler($message, $state);
}
}

if ($this->finalizeHandler !== null) {
$state = ($this->finalizeHandler)($state);
}

return $state;
}
}
33 changes: 33 additions & 0 deletions src/Message/Translator/AggregateToStreamHeaderTranslator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\StreamHeader;

/** @experimental */
final class AggregateToStreamHeaderTranslator implements Translator
{
/** @return list<Message> */
public function __invoke(Message $message): array
{
if (!$message->hasHeader(AggregateHeader::class)) {
return [$message];
}

$aggregateHeader = $message->header(AggregateHeader::class);

return [
$message
->removeHeader(AggregateHeader::class)
->withHeader(new StreamHeader(
$aggregateHeader->streamName(),
$aggregateHeader->playhead,
$aggregateHeader->recordedOn,
)),
];
}
}
23 changes: 23 additions & 0 deletions src/Message/Translator/ClosureMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message\Translator;

use Closure;
use Patchlevel\EventSourcing\Message\Message;

final class ClosureMiddleware implements Translator
{
/** @param Closure(Message): list<Message> $callable */
public function __construct(
private readonly Closure $callable,
) {
}

/** @return list<Message> */
public function __invoke(Message $message): array
{
return ($this->callable)($message);
}
}
5 changes: 5 additions & 0 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ public function setupSubscription(): void
));
}

public function connection(): Connection
{
return $this->connection;
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->config['table_name']);
Expand Down
5 changes: 5 additions & 0 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ public function setupSubscription(): void
));
}

public function connection(): Connection
{
return $this->connection;
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->config['table_name']);
Expand Down
Loading
Loading