diff --git a/docs/pages/pipeline.md b/docs/pages/pipeline.md index 1aaaefd27..69ad8ce6a 100644 --- a/docs/pages/pipeline.md +++ b/docs/pages/pipeline.md @@ -1,59 +1,69 @@ # Pipeline / Anti Corruption Layer -Sobald man den Fall hat, dass man eine Menge an Events von A nach B fliesen lassen möchte, -und evtl sogar dabei noch einfluss auf die Events nehmen möchte in Form von Filtern oder Manipulationen, -dann kommt die Pipeline ins Spiel. +As soon as you have the case where you want to flow a lot of events from A to B, +and maybe even influence the events in the form of filters or manipulations, +then the pipeline comes into play. -Es gibt mehrere Situationen, in denen eine Pipeline sinnvoll ist: +There are several situations in which a pipeline makes sense: -* Migration vom event store und deren events. -* Als Anti Corruption Layer beim Publizieren von Events an andere Systeme -* Oder als Anti Corruption Layer beim Importieren von Events aus anderen Systemen +* Migration of the event store and its events. +* As an anti-corruption layer when publishing events to other systems. +* Or as an anti-corruption layer when importing events from other systems. -In diesem Beispiel wird eine Pipeline verwendet, -um einen neuen Event Store zu erstellen und dabei alte Events durch neue zu ersetzen. +## Pipe ```php use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware; use Patchlevel\EventSourcing\Pipeline\Pipeline; use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\Profile -$pipeline = new Pipeline( - new StoreTarget($newStore), - new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) { - return new NewVisited($oldVisited->profileId()); - }), +$pipe = new Pipe( + $oldStore->load(), + new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) { + return new NewVisited($oldVisited->profileId()); + }), ); -$pipeline->run($oldStore->load()); -``` +$unwarp = function (iterable $messages) { + foreach ($messages as $message) { + yield $message->event(); + } +}; -!!! danger +Profile::createFromEvents($unwarp($pipe)); +``` - Unter keinen Umständen darf der selbe Store als Target verwendet werden - wie der, der als Source verwendet wird. Ansonsten wird der Store danach kaputt sein! +## Pipeline -The pipeline can also be used to create or rebuild a projection: +The pipeline uses the pipe internally and is an abstraction layer on top of it. +The pipeline is used when it comes to moving a lot of events from A to B. +A `target` must be defined where the events should flow to. +You can also define whether buffering should take place or not. ```php +use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware; use Patchlevel\EventSourcing\Pipeline\Pipeline; -use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; -use Patchlevel\EventSourcing\Pipeline\Target\ProjectionTarget; +use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget; $pipeline = new Pipeline( - new StoreSource($store), - new ProjectionTarget($projection) + new StoreTarget($newStore), + new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) { + return new NewVisited($oldVisited->profileId()); + }), ); + +$pipeline->run($oldStore->load()); ``` -The principle remains the same. -There is a source where the data comes from. -A target where the data should flow. -And any number of middlewares to do something with the data beforehand. +!!! danger + + Under no circumstances should the same store be used as target as the one used as source. + Otherwise the store will be broken afterwards! ## Target -After you have a source, you still need the destination of the pipeline. + ### Store @@ -75,27 +85,15 @@ $target = new StoreTarget($store); It does not matter whether the previous store was a SingleTable or a MultiTable. You can switch back and forth between both store types using the pipeline. -### Projection +### Event Bus A projection can also be used as a target. For example, to set up a new projection or to build a new projection. ```php -use Patchlevel\EventSourcing\Pipeline\Target\ProjectionTarget; - -$target = new ProjectionTarget($projection); -``` - -### Projection Handler - -If you want to build or create all projections from scratch, -then you can also use the ProjectionRepositoryTarget. -In this, the individual projections are iterated and the events are then passed on. - -```php -use Patchlevel\EventSourcing\Pipeline\Target\ProjectionHandlerTarget; +use Patchlevel\EventSourcing\Pipeline\Target\EventBusTarget; -$target = new ProjectionHandlerTarget($projectionHandler); +$target = new EventBusTarget($projection); ``` ### In Memory diff --git a/src/Pipeline/StateProcessor.php b/src/Pipeline/StateProcessor.php new file mode 100644 index 000000000..92b09ca3b --- /dev/null +++ b/src/Pipeline/StateProcessor.php @@ -0,0 +1,178 @@ + + * @template OUT of array = STATE + */ +final class StateProcessor +{ + /** + * @var STATE + */ + private array $state = []; + + /** + * @var array> + */ + private array $handlers = []; + + /** + * @var list + */ + private array $anyHandlers = []; + + /** + * @var (Closure(STATE): OUT)|null + */ + private Closure|null $finalizeHandler = null; + + /** + * @var list + */ + private array $middlewares = []; + + /** + * @param STATE $state + * + * @return $this + */ + public function initState(array $state): self + { + $this->state = $state; + + return $this; + } + + /** + * @template T1 of object + * + * @param class-string $event + * @param Closure(Message, STATE): STATE $closure + * + * @return $this + */ + public function when(string $event, Closure $closure): self + { + $this->handlers[$event][] = $closure; + + return $this; + } + + /** + * @param Closure(Message, STATE): STATE $closure + * + * @return $this + */ + public function any(Closure $closure): self + { + $this->anyHandlers[] = $closure; + + return $this; + } + + /** + * @param array $map + * + * @return $this + */ + public function match(array $map): self + { + foreach ($map as $event => $closure) { + $this->when($event, $closure); + } + + return $this; + } + + /** + * @param Closure(STATE): OUT $closure + * + * @return $this + */ + public function finalize(Closure $closure): self + { + $this->finalizeHandler = $closure; + + return $this; + } + + public function middleware(Middleware ...$middlewares): self + { + foreach ($middlewares as $middleware) { + $this->middlewares[] = $middleware; + } + + return $this; + } + + /** + * @param iterable $messages + * + * @return OUT + */ + public function process(iterable $messages): array + { + if ($this->middlewares !== []) { + $messages = new Pipe($messages, $this->middlewares); + } + + foreach ($messages as $message) { + $event = $message->event(); + + if (isset($this->handlers[$event::class])) { + foreach ($this->handlers[$event::class] as $handler) { + $this->state = $handler($message, $this->state); + } + } + + foreach ($this->anyHandlers as $handler) { + $this->state = $handler($message, $this->state); + } + } + + if ($this->finalizeHandler !== null) { + $this->state = ($this->finalizeHandler)($this->state); + } + + return $this->state; + } +} + +/** + * @var StateProcessor> $state + */ +$state = (new StateProcessor()) + ->when(ProfileCreated::class, function (Message $message, array $state): array { + $event = $message->event(); + + $state[$event->email->toString()] = true; + + return $state; + }) + ->finalize(function (array $state): array { + return array_keys($state); + }) + ->middleware(new ClosureMiddleware(static function (Message $message): array { + return [$message]; + })) + ->process([]); + + +$state = (new StateProcessor()) + ->initState(['foo' => 'bar']) + ->any(function (Message $message, array $state): array { + return $state; + }) + ->finalize(function (array $state): array { + return $state; + }) + ->process([]);