Skip to content

Commit

Permalink
Merge pull request #540 from patchlevel/replace-pipeline
Browse files Browse the repository at this point in the history
replace pipeline with subscription engine
  • Loading branch information
DavidBadura authored Mar 25, 2024
2 parents 83eb814 + 31c81d9 commit 4b98344
Show file tree
Hide file tree
Showing 43 changed files with 140 additions and 1,088 deletions.
12 changes: 2 additions & 10 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ deptrac:
value: MetadataMessage
- type: layer
value: MetadataSubscriber
- name: Pipeline
collectors:
- type: directory
value: src/Pipeline/.*
- name: Repository
collectors:
- type: directory
Expand Down Expand Up @@ -117,8 +113,10 @@ deptrac:
- Attribute
- Message
Message:
- Aggregate
- MetadataMessage
- Serializer
- Store
Metadata:
MetadataAggregate:
- Aggregate
Expand All @@ -137,12 +135,6 @@ deptrac:
- Attribute
- Metadata
- Subscription
Pipeline:
- Aggregate
- EventBus
- Message
- Store
- Subscription
Subscription:
- Attribute
- Clock
Expand Down
56 changes: 28 additions & 28 deletions docs/pages/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ whether the migration worked.
In this example the event `PrivacyAdded` is removed and the event `OldVisited` is replaced by `NewVisited`:

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;
use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator;
use Patchlevel\EventSourcing\Pipeline\Pipeline;
use Patchlevel\EventSourcing\Pipeline\Source\StoreSource;
use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget;
Expand All @@ -21,11 +21,11 @@ $pipeline = new Pipeline(
new StoreSource($oldStore),
new StoreTarget($newStore),
[
new ExcludeEventMiddleware([PrivacyAdded::class]),
new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) {
new ExcludeEventTranslator([PrivacyAdded::class]),
new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) {
return new NewVisited($oldVisited->profileId());
}),
new RecalculatePlayheadMiddleware(),
new RecalculatePlayheadTranslator(),
],
);
```
Expand Down Expand Up @@ -197,9 +197,9 @@ Middelwares can be used to manipulate, delete or expand messages or events durin
With this middleware you can exclude certain events.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;

$middleware = new ExcludeEventMiddleware([EmailChanged::class]);
$middleware = new ExcludeEventTranslator([EmailChanged::class]);
```
!!! warning

Expand All @@ -210,9 +210,9 @@ $middleware = new ExcludeEventMiddleware([EmailChanged::class]);
With this middleware you can only allow certain events.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\IncludeEventTranslator;

$middleware = new IncludeEventMiddleware([ProfileCreated::class]);
$middleware = new IncludeEventTranslator([ProfileCreated::class]);
```
!!! warning

Expand All @@ -226,9 +226,9 @@ This middleware expects a callback that returns either true to allow events or f

```php
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Pipeline\Middleware\FilterEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\FilterEventTranslator;

$middleware = new FilterEventMiddleware(static function (AggregateChanged $event) {
$middleware = new FilterEventTranslator(static function (AggregateChanged $event) {
if (!$event instanceof ProfileCreated) {
return true;
}
Expand All @@ -245,9 +245,9 @@ $middleware = new FilterEventMiddleware(static function (AggregateChanged $event
With this middleware you can exclude archived events.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeArchivedEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ExcludeArchivedEventTranslator;

$middleware = new ExcludeArchivedEventMiddleware();
$middleware = new ExcludeArchivedEventTranslator();
```
!!! warning

Expand All @@ -258,9 +258,9 @@ $middleware = new ExcludeArchivedEventMiddleware();
With this middleware you can only allow archived events.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\OnlyArchivedEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\OnlyArchivedEventTranslator;

$middleware = new OnlyArchivedEventMiddleware();
$middleware = new OnlyArchivedEventTranslator();
```
!!! warning

Expand All @@ -273,9 +273,9 @@ The first parameter you have to define is the event class that you want to repla
And as a second parameter a callback, that the old event awaits and a new event returns.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator;

$middleware = new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) {
$middleware = new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) {
return new NewVisited($oldVisited->profileId());
});
```
Expand All @@ -302,9 +302,9 @@ The playhead must always be in ascending order so that the data is valid.
Some middleware can break this order and the middleware `RecalculatePlayheadMiddleware` can fix this problem.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$middleware = new RecalculatePlayheadMiddleware();
$middleware = new RecalculatePlayheadTranslator();
```
!!! note

Expand All @@ -315,13 +315,13 @@ $middleware = new RecalculatePlayheadMiddleware();
If you want to group your middleware, you can use one or more `ChainMiddleware`.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ChainMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ChainTranslator;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$middleware = new ChainMiddleware([
new ExcludeEventMiddleware([EmailChanged::class]),
new RecalculatePlayheadMiddleware(),
$middleware = new ChainTranslator([
new ExcludeEventTranslator([EmailChanged::class]),
new RecalculatePlayheadTranslator(),
]);
```
### Custom middleware
Expand All @@ -341,9 +341,9 @@ which should replace the `ProfileCreated` event.

```php
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware;
use Patchlevel\EventSourcing\Message\Translator\Translator;

final class SplitProfileCreatedMiddleware implements Middleware
final class SplitProfileCreatedMiddleware implements Translator
{
public function __invoke(Message $message): array
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

use function array_values;

final class ChainMiddleware implements Middleware
final class ChainTranslator implements Translator
{
/** @param iterable<Middleware> $middlewares */
/** @param iterable<Translator> $translators */
public function __construct(
private readonly iterable $middlewares,
private readonly iterable $translators,
) {
}

Expand All @@ -21,8 +21,8 @@ public function __invoke(Message $message): array
{
$messages = [$message];

foreach ($this->middlewares as $middleware) {
$messages = $this->processMiddleware($middleware, $messages);
foreach ($this->translators as $middleware) {
$messages = $this->process($middleware, $messages);
}

return $messages;
Expand All @@ -33,12 +33,12 @@ public function __invoke(Message $message): array
*
* @return list<Message>
*/
private function processMiddleware(Middleware $middleware, array $messages): array
private function process(Translator $translator, array $messages): array
{
$result = [];

foreach ($messages as $message) {
$result += $middleware($message);
$result += $translator($message);
}

return array_values($result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\ArchivedHeader;

final class ExcludeArchivedEventMiddleware implements Middleware
final class ExcludeArchivedEventTranslator implements Translator
{
/** @return list<Message> */
public function __invoke(Message $message): array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class ExcludeEventMiddleware implements Middleware
final class ExcludeEventTranslator implements Translator
{
/** @param list<class-string> $classes */
public function __construct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class FilterEventMiddleware implements Middleware
final class FilterEventTranslator implements Translator
{
/** @var callable(object $event):bool */
private $callable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class IncludeEventMiddleware implements Middleware
final class IncludeEventTranslator implements Translator
{
/** @param list<class-string> $classes */
public function __construct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\ArchivedHeader;

final class OnlyArchivedEventMiddleware implements Middleware
final class OnlyArchivedEventTranslator implements Translator
{
/** @return list<Message> */
public function __invoke(Message $message): array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\Message;

use function array_key_exists;

final class RecalculatePlayheadMiddleware implements Middleware
final class RecalculatePlayheadTranslator implements Translator
{
/** @var array<string, array<string, positive-int>> */
private array $index = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

/** @template T of object */
final class ReplaceEventMiddleware implements Middleware
final class ReplaceEventTranslator implements Translator
{
/** @var callable(T $event):object */
private $callable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

interface Middleware
interface Translator
{
/** @return list<Message> */
public function __invoke(Message $message): array;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\Message;

final class UntilEventMiddleware implements Middleware
final class UntilEventTranslator implements Translator
{
public function __construct(
private readonly DateTimeImmutable $until,
Expand Down
Loading

0 comments on commit 4b98344

Please sign in to comment.