Skip to content

Commit

Permalink
add state processor
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Oct 20, 2024
1 parent 2ce9348 commit d2128be
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 44 deletions.
86 changes: 42 additions & 44 deletions docs/pages/pipeline.md
Original file line number Diff line number Diff line change
@@ -1,59 +1,69 @@
# Pipeline / Anti Corruption Layer

Sobald man den Fall hat, dass man eine Menge an Events von A nach B fliesen lassen möchte,
und evtl sogar dabei noch einfluss auf die Events nehmen möchte in Form von Filtern oder Manipulationen,
dann kommt die Pipeline ins Spiel.
As soon as you have the case where you want to flow a lot of events from A to B,
and maybe even influence the events in the form of filters or manipulations,
then the pipeline comes into play.

Es gibt mehrere Situationen, in denen eine Pipeline sinnvoll ist:
There are several situations in which a pipeline makes sense:

* Migration vom event store und deren events.
* Als Anti Corruption Layer beim Publizieren von Events an andere Systeme
* Oder als Anti Corruption Layer beim Importieren von Events aus anderen Systemen
* Migration of the event store and its events.
* As an anti-corruption layer when publishing events to other systems.
* Or as an anti-corruption layer when importing events from other systems.

In diesem Beispiel wird eine Pipeline verwendet,
um einen neuen Event Store zu erstellen und dabei alte Events durch neue zu ersetzen.
## Pipe

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware;
use Patchlevel\EventSourcing\Pipeline\Pipeline;
use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\Profile

$pipeline = new Pipeline(
new StoreTarget($newStore),
new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) {
return new NewVisited($oldVisited->profileId());
}),
$pipe = new Pipe(
$oldStore->load(),
new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) {
return new NewVisited($oldVisited->profileId());
}),
);

$pipeline->run($oldStore->load());
```
$unwarp = function (iterable $messages) {
foreach ($messages as $message) {
yield $message->event();
}
};

!!! danger
Profile::createFromEvents($unwarp($pipe));
```

Unter keinen Umständen darf der selbe Store als Target verwendet werden
wie der, der als Source verwendet wird. Ansonsten wird der Store danach kaputt sein!
## Pipeline

The pipeline can also be used to create or rebuild a projection:
The pipeline uses the pipe internally and is an abstraction layer on top of it.
The pipeline is used when it comes to moving a lot of events from A to B.
A `target` must be defined where the events should flow to.
You can also define whether buffering should take place or not.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware;
use Patchlevel\EventSourcing\Pipeline\Pipeline;
use Patchlevel\EventSourcing\Pipeline\Source\StoreSource;
use Patchlevel\EventSourcing\Pipeline\Target\ProjectionTarget;
use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget;

$pipeline = new Pipeline(
new StoreSource($store),
new ProjectionTarget($projection)
new StoreTarget($newStore),
new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) {
return new NewVisited($oldVisited->profileId());
}),
);

$pipeline->run($oldStore->load());
```

The principle remains the same.
There is a source where the data comes from.
A target where the data should flow.
And any number of middlewares to do something with the data beforehand.
!!! danger

Under no circumstances should the same store be used as target as the one used as source.
Otherwise the store will be broken afterwards!

## Target

After you have a source, you still need the destination of the pipeline.


### Store

Expand All @@ -75,27 +85,15 @@ $target = new StoreTarget($store);
It does not matter whether the previous store was a SingleTable or a MultiTable.
You can switch back and forth between both store types using the pipeline.

### Projection
### Event Bus

A projection can also be used as a target.
For example, to set up a new projection or to build a new projection.

```php
use Patchlevel\EventSourcing\Pipeline\Target\ProjectionTarget;

$target = new ProjectionTarget($projection);
```

### Projection Handler

If you want to build or create all projections from scratch,
then you can also use the ProjectionRepositoryTarget.
In this, the individual projections are iterated and the events are then passed on.

```php
use Patchlevel\EventSourcing\Pipeline\Target\ProjectionHandlerTarget;
use Patchlevel\EventSourcing\Pipeline\Target\EventBusTarget;

$target = new ProjectionHandlerTarget($projectionHandler);
$target = new EventBusTarget($projection);
```

### In Memory
Expand Down
178 changes: 178 additions & 0 deletions src/Pipeline/StateProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
<?php

namespace Patchlevel\EventSourcing\Pipeline;

use Closure;
use Doctrine\Migrations\Version\State;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Pipeline\Middleware\ClosureMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated;

/**
* @template STATE of array<array-key, mixed>
* @template OUT of array<array-key, mixed> = STATE
*/
final class StateProcessor
{
/**
* @var STATE
*/
private array $state = [];

Check failure on line 21 in src/Pipeline/StateProcessor.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidPropertyAssignmentValue

src/Pipeline/StateProcessor.php:21:28: InvalidPropertyAssignmentValue: $this->state with declared type 'STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>' cannot be assigned type 'array<never, never>' (see https://psalm.dev/145)

/**
* @var array<class-string, list<Closure(Message, STATE): STATE|null>>
*/
private array $handlers = [];

/**
* @var list<Closure(Message, STATE): STATE>
*/
private array $anyHandlers = [];

/**
* @var (Closure(STATE): OUT)|null
*/
private Closure|null $finalizeHandler = null;

/**
* @var list<Middleware>
*/
private array $middlewares = [];

/**
* @param STATE $state
*
* @return $this
*/
public function initState(array $state): self
{
$this->state = $state;

return $this;
}

/**
* @template T1 of object
*
* @param class-string<T1> $event
* @param Closure(Message<T1>, STATE): STATE $closure
*
* @return $this
*/
public function when(string $event, Closure $closure): self
{
$this->handlers[$event][] = $closure;

Check failure on line 65 in src/Pipeline/StateProcessor.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidPropertyAssignmentValue

src/Pipeline/StateProcessor.php:65:9: InvalidPropertyAssignmentValue: $this->handlers with declared type 'array<class-string, list<Closure(Patchlevel\EventSourcing\Message\Message, STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>):STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>|null>>' cannot be assigned type 'non-empty-array<class-string|(class-string<T1:fn-patchlevel\eventsourcing\pipeline\stateprocessor::when as object>), list<Closure(Patchlevel\EventSourcing\Message\Message, STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>):STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>|Closure(Patchlevel\EventSourcing\Message\Message<T1:fn-patchlevel\eventsourcing\pipeline\stateprocessor::when as object>, STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>):STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>|null>>' (see https://psalm.dev/145)

return $this;
}

/**
* @param Closure(Message, STATE): STATE $closure
*
* @return $this
*/
public function any(Closure $closure): self
{
$this->anyHandlers[] = $closure;

return $this;
}

/**
* @param array<class-string, Closure(Message, STATE): STATE> $map
*
* @return $this
*/
public function match(array $map): self
{
foreach ($map as $event => $closure) {
$this->when($event, $closure);
}

return $this;
}

/**
* @param Closure(STATE): OUT $closure
*
* @return $this
*/
public function finalize(Closure $closure): self
{
$this->finalizeHandler = $closure;

return $this;
}

public function middleware(Middleware ...$middlewares): self
{
foreach ($middlewares as $middleware) {
$this->middlewares[] = $middleware;
}

return $this;
}

/**
* @param iterable<Message> $messages
*
* @return OUT

Check failure on line 120 in src/Pipeline/StateProcessor.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidReturnType

src/Pipeline/StateProcessor.php:120:16: InvalidReturnType: The declared return type 'OUT:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>' for Patchlevel\EventSourcing\Pipeline\StateProcessor::process is incorrect, got '(OUT:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>)|(STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>)' (see https://psalm.dev/011)
*/
public function process(iterable $messages): array
{
if ($this->middlewares !== []) {
$messages = new Pipe($messages, $this->middlewares);
}

foreach ($messages as $message) {
$event = $message->event();

if (isset($this->handlers[$event::class])) {
foreach ($this->handlers[$event::class] as $handler) {
$this->state = $handler($message, $this->state);

Check failure on line 133 in src/Pipeline/StateProcessor.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

PossiblyNullFunctionCall

src/Pipeline/StateProcessor.php:133:36: PossiblyNullFunctionCall: Cannot call function on possibly null value (see https://psalm.dev/094)

Check failure on line 133 in src/Pipeline/StateProcessor.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Trying to invoke (Closure(Patchlevel\EventSourcing\Message\Message, STATE): STATE)|null but it might not be a callable.
}
}

foreach ($this->anyHandlers as $handler) {
$this->state = $handler($message, $this->state);
}
}

if ($this->finalizeHandler !== null) {
$this->state = ($this->finalizeHandler)($this->state);

Check failure on line 143 in src/Pipeline/StateProcessor.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidPropertyAssignmentValue

src/Pipeline/StateProcessor.php:143:28: InvalidPropertyAssignmentValue: $this->state with declared type 'STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>' cannot be assigned type 'OUT:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>' (see https://psalm.dev/145)
}

return $this->state;

Check failure on line 146 in src/Pipeline/StateProcessor.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

InvalidReturnStatement

src/Pipeline/StateProcessor.php:146:16: InvalidReturnStatement: The inferred type '(OUT:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>)|(STATE:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>)' does not match the declared return type 'OUT:Patchlevel\EventSourcing\Pipeline\StateProcessor as array<array-key, mixed>' for Patchlevel\EventSourcing\Pipeline\StateProcessor::process (see https://psalm.dev/128)
}
}

/**
* @var StateProcessor<array{string, true}, list<string>> $state
*/
$state = (new StateProcessor())
->when(ProfileCreated::class, function (Message $message, array $state): array {
$event = $message->event();

$state[$event->email->toString()] = true;

return $state;
})
->finalize(function (array $state): array {
return array_keys($state);
})
->middleware(new ClosureMiddleware(static function (Message $message): array {
return [$message];
}))
->process([]);


$state = (new StateProcessor())
->initState(['foo' => 'bar'])
->any(function (Message $message, array $state): array {
return $state;
})
->finalize(function (array $state): array {
return $state;
})
->process([]);

0 comments on commit d2128be

Please sign in to comment.