Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add chain event bus and rewrite projectionist event bus #492

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/pages/event_bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,19 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher);

You can't use the `Subscribe` attribute with the psr-14 event bus.

## Chain Event Bus

If you want to use multiple event buses, you can use the `ChainEventBus`.

```php
use Patchlevel\EventSourcing\EventBus\ChainEventBus;

$eventBus = new ChainEventBus([
$eventBus1,
$eventBus2,
]);
```

## Learn more

* [How to decorate messages](message_decorator.md)
Expand Down
9 changes: 5 additions & 4 deletions docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,11 @@ After we have defined everything, we still have to plug the whole thing together

```php
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
Expand Down Expand Up @@ -320,12 +321,12 @@ $projectionist = new DefaultProjectionist(
$projectorRepository,
);

$eventBus = SyncProjectionistEventBusWrapper::createWithDefaultLockStrategy(
$eventBus = new ChainEventBus([
DefaultEventBus::create([
new SendCheckInEmailProcessor($mailer),
]),
$projectionist,
);
ProjectionistEventBus::createWithDefaultLockStrategy($projectionist)
]);

$repositoryManager = new DefaultRepositoryManager(
$aggregateRegistry,
Expand Down
21 changes: 21 additions & 0 deletions src/EventBus/ChainEventBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

final class ChainEventBus implements EventBus
{
/** @param iterable<EventBus> $eventBuses */
public function __construct(
private readonly iterable $eventBuses,
) {
}

public function dispatch(Message ...$messages): void
{
foreach ($this->eventBuses as $eventBus) {
$eventBus->dispatch(...$messages);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\FlockStore;

final class SyncProjectionistEventBusWrapper implements EventBus
final class ProjectionistEventBus implements EventBus
{
public function __construct(
private readonly EventBus $parentEventBus,
private readonly Projectionist $projectionist,
private readonly LockFactory $lockFactory,
private readonly bool $throwByError = true,
Expand All @@ -21,8 +20,6 @@ public function __construct(

public function dispatch(Message ...$messages): void
{
$this->parentEventBus->dispatch(...$messages);

$lock = $this->lockFactory->createLock('projectionist-run');

if (!$lock->acquire(true)) {
Expand All @@ -36,10 +33,9 @@ public function dispatch(Message ...$messages): void
}
}

public static function createWithDefaultLockStrategy(EventBus $parentEventBus, Projectionist $projectionist): self
public static function createWithDefaultLockStrategy(Projectionist $projectionist): self
{
return new self(
$parentEventBus,
$projectionist,
new LockFactory(
new FlockStore(),
Expand Down
19 changes: 10 additions & 9 deletions tests/Benchmark/SyncProjectionistBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

use Doctrine\DBAL\Driver\PDO\SQLite\Driver;
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Lock\DoctrineDbalStoreSchemaAdapter;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
Expand Down Expand Up @@ -71,18 +72,18 @@ public function setUp(): void
$projectionRepository,
);

$innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]);

$lockStorage = new LockDoctrineDbalStore($connection);
$lockStorageAdapter = new DoctrineDbalStoreSchemaAdapter($lockStorage);

$this->bus = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockDoctrineDbalStore($connection),
$this->bus = new ChainEventBus([
DefaultEventBus::create([new SendEmailProcessor()]),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockDoctrineDbalStore($connection),
),
),
);
]);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata());

Expand Down
21 changes: 12 additions & 9 deletions tests/Integration/BankAccountSplitStream/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\Decorator\ChainMessageDecorator;
use Patchlevel\EventSourcing\EventBus\Decorator\SplitStreamDecorator;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
Expand All @@ -13,7 +14,7 @@
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand Down Expand Up @@ -63,18 +64,20 @@ public function testSuccessful(): void
$projectionRepository,
);

$eventStream = new SyncProjectionistEventBusWrapper(
$eventBus = new ChainEventBus([
DefaultEventBus::create(),
$projectionist,
new LockFactory(
new LockInMemoryStore(),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
),
);
]);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventStream,
$eventBus,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand Down Expand Up @@ -107,7 +110,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventStream,
$eventBus,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand Down Expand Up @@ -141,7 +144,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventStream,
$eventBus,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand Down
47 changes: 26 additions & 21 deletions tests/Integration/BasicImplementation/BasicIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand Down Expand Up @@ -62,20 +63,22 @@ public function testSuccessful(): void
$projectorRepository,
);

$innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]);

$eventStream = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
$eventBus = new ChainEventBus([
DefaultEventBus::create([
new SendEmailProcessor(),
]),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
),
);
]);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
$eventBus,
null,
new FooMessageDecorator(),
);
Expand Down Expand Up @@ -103,7 +106,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
$eventBus,
);
$repository = $manager->get(Profile::class);
$profile = $repository->load($profileId);
Expand Down Expand Up @@ -134,20 +137,22 @@ public function testSnapshot(): void
$projectorRepository,
);

$innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]);

$eventStream = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
$eventBus = new ChainEventBus([
DefaultEventBus::create([
new SendEmailProcessor(),
]),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
),
);
]);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
$eventBus,
new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]),
new FooMessageDecorator(),
);
Expand Down Expand Up @@ -175,7 +180,7 @@ public function testSnapshot(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
$eventBus,
);
$repository = $manager->get(Profile::class);
$profile = $repository->load($profileId);
Expand Down
17 changes: 10 additions & 7 deletions tests/Integration/Outbox/OutboxTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Integration\Outbox;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
Expand All @@ -13,7 +14,7 @@
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator;
Expand Down Expand Up @@ -76,15 +77,17 @@ public function testSuccessful(): void

$eventBusConsumer = DefaultConsumer::create([new SendEmailProcessor()]);

$eventStream = new SyncProjectionistEventBusWrapper(
$eventBus = new ChainEventBus([
$outboxEventBus,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
),
);
]);

$repository = new DefaultRepository($store, $eventStream, Profile::metadata());
$repository = new DefaultRepository($store, $eventBus, Profile::metadata());

$schemaDirector = new DoctrineSchemaDirector(
$this->connection,
Expand Down
13 changes: 8 additions & 5 deletions tests/Integration/Projectionist/ProjectionistTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
namespace Patchlevel\EventSourcing\Tests\Integration\Projectionist;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Lock\DoctrineDbalStoreSchemaAdapter;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator;
Expand Down Expand Up @@ -117,11 +118,13 @@ public function testSync(): void
$manager = new DefaultRepositoryManager(
$aggregateRegistry,
$store,
new SyncProjectionistEventBusWrapper(
new ChainEventBus([
DefaultEventBus::create(),
$projectionist,
new LockFactory($lockStore),
),
new ProjectionistEventBus(
$projectionist,
new LockFactory($lockStore),
),
]),
);

$repository = $manager->get(Profile::class);
Expand Down
Loading
Loading