Skip to content

Commit

Permalink
Merge pull request #492 from patchlevel/chain-event-bus
Browse files Browse the repository at this point in the history
add chain event bus and rewrite projectionist event bus
  • Loading branch information
DavidBadura authored Feb 6, 2024
2 parents 066739e + 3e2f822 commit 6c6317f
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 71 deletions.
13 changes: 13 additions & 0 deletions docs/pages/event_bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,19 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher);

You can't use the `Subscribe` attribute with the psr-14 event bus.

## Chain Event Bus

If you want to use multiple event buses, you can use the `ChainEventBus`.

```php
use Patchlevel\EventSourcing\EventBus\ChainEventBus;

$eventBus = new ChainEventBus([
$eventBus1,
$eventBus2,
]);
```

## Learn more

* [How to decorate messages](message_decorator.md)
Expand Down
9 changes: 5 additions & 4 deletions docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,11 @@ After we have defined everything, we still have to plug the whole thing together

```php
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
Expand Down Expand Up @@ -320,12 +321,12 @@ $projectionist = new DefaultProjectionist(
$projectorRepository,
);

$eventBus = SyncProjectionistEventBusWrapper::createWithDefaultLockStrategy(
$eventBus = new ChainEventBus([
DefaultEventBus::create([
new SendCheckInEmailProcessor($mailer),
]),
$projectionist,
);
ProjectionistEventBus::createWithDefaultLockStrategy($projectionist)
]);

$repositoryManager = new DefaultRepositoryManager(
$aggregateRegistry,
Expand Down
21 changes: 21 additions & 0 deletions src/EventBus/ChainEventBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

final class ChainEventBus implements EventBus
{
/** @param iterable<EventBus> $eventBuses */
public function __construct(
private readonly iterable $eventBuses,
) {
}

public function dispatch(Message ...$messages): void
{
foreach ($this->eventBuses as $eventBus) {
$eventBus->dispatch(...$messages);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\FlockStore;

final class SyncProjectionistEventBusWrapper implements EventBus
final class ProjectionistEventBus implements EventBus
{
public function __construct(
private readonly EventBus $parentEventBus,
private readonly Projectionist $projectionist,
private readonly LockFactory $lockFactory,
private readonly bool $throwByError = true,
Expand All @@ -21,8 +20,6 @@ public function __construct(

public function dispatch(Message ...$messages): void
{
$this->parentEventBus->dispatch(...$messages);

$lock = $this->lockFactory->createLock('projectionist-run');

if (!$lock->acquire(true)) {
Expand All @@ -36,10 +33,9 @@ public function dispatch(Message ...$messages): void
}
}

public static function createWithDefaultLockStrategy(EventBus $parentEventBus, Projectionist $projectionist): self
public static function createWithDefaultLockStrategy(Projectionist $projectionist): self
{
return new self(
$parentEventBus,
$projectionist,
new LockFactory(
new FlockStore(),
Expand Down
19 changes: 10 additions & 9 deletions tests/Benchmark/SyncProjectionistBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

use Doctrine\DBAL\Driver\PDO\SQLite\Driver;
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Lock\DoctrineDbalStoreSchemaAdapter;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
Expand Down Expand Up @@ -71,18 +72,18 @@ public function setUp(): void
$projectionRepository,
);

$innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]);

$lockStorage = new LockDoctrineDbalStore($connection);
$lockStorageAdapter = new DoctrineDbalStoreSchemaAdapter($lockStorage);

$this->bus = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockDoctrineDbalStore($connection),
$this->bus = new ChainEventBus([
DefaultEventBus::create([new SendEmailProcessor()]),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockDoctrineDbalStore($connection),
),
),
);
]);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata());

Expand Down
21 changes: 12 additions & 9 deletions tests/Integration/BankAccountSplitStream/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\Decorator\ChainMessageDecorator;
use Patchlevel\EventSourcing\EventBus\Decorator\SplitStreamDecorator;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
Expand All @@ -13,7 +14,7 @@
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand Down Expand Up @@ -63,18 +64,20 @@ public function testSuccessful(): void
$projectionRepository,
);

$eventStream = new SyncProjectionistEventBusWrapper(
$eventBus = new ChainEventBus([
DefaultEventBus::create(),
$projectionist,
new LockFactory(
new LockInMemoryStore(),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
),
);
]);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventStream,
$eventBus,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand Down Expand Up @@ -107,7 +110,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventStream,
$eventBus,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand Down Expand Up @@ -141,7 +144,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventStream,
$eventBus,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand Down
47 changes: 26 additions & 21 deletions tests/Integration/BasicImplementation/BasicIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand Down Expand Up @@ -62,20 +63,22 @@ public function testSuccessful(): void
$projectorRepository,
);

$innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]);

$eventStream = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
$eventBus = new ChainEventBus([
DefaultEventBus::create([
new SendEmailProcessor(),
]),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
),
);
]);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
$eventBus,
null,
new FooMessageDecorator(),
);
Expand Down Expand Up @@ -103,7 +106,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
$eventBus,
);
$repository = $manager->get(Profile::class);
$profile = $repository->load($profileId);
Expand Down Expand Up @@ -134,20 +137,22 @@ public function testSnapshot(): void
$projectorRepository,
);

$innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]);

$eventStream = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
$eventBus = new ChainEventBus([
DefaultEventBus::create([
new SendEmailProcessor(),
]),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
),
);
]);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
$eventBus,
new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]),
new FooMessageDecorator(),
);
Expand Down Expand Up @@ -175,7 +180,7 @@ public function testSnapshot(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
$eventBus,
);
$repository = $manager->get(Profile::class);
$profile = $repository->load($profileId);
Expand Down
17 changes: 10 additions & 7 deletions tests/Integration/Outbox/OutboxTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Integration\Outbox;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
Expand All @@ -13,7 +14,7 @@
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator;
Expand Down Expand Up @@ -76,15 +77,17 @@ public function testSuccessful(): void

$eventBusConsumer = DefaultConsumer::create([new SendEmailProcessor()]);

$eventStream = new SyncProjectionistEventBusWrapper(
$eventBus = new ChainEventBus([
$outboxEventBus,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
new ProjectionistEventBus(
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
),
);
]);

$repository = new DefaultRepository($store, $eventStream, Profile::metadata());
$repository = new DefaultRepository($store, $eventBus, Profile::metadata());

$schemaDirector = new DoctrineSchemaDirector(
$this->connection,
Expand Down
13 changes: 8 additions & 5 deletions tests/Integration/Projectionist/ProjectionistTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
namespace Patchlevel\EventSourcing\Tests\Integration\Projectionist;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Lock\DoctrineDbalStoreSchemaAdapter;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator;
Expand Down Expand Up @@ -117,11 +118,13 @@ public function testSync(): void
$manager = new DefaultRepositoryManager(
$aggregateRegistry,
$store,
new SyncProjectionistEventBusWrapper(
new ChainEventBus([
DefaultEventBus::create(),
$projectionist,
new LockFactory($lockStore),
),
new ProjectionistEventBus(
$projectionist,
new LockFactory($lockStore),
),
]),
);

$repository = $manager->get(Profile::class);
Expand Down
Loading

0 comments on commit 6c6317f

Please sign in to comment.