diff --git a/docs/pages/event_bus.md b/docs/pages/event_bus.md index f7b36def7..045ca59f4 100644 --- a/docs/pages/event_bus.md +++ b/docs/pages/event_bus.md @@ -209,6 +209,19 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher); You can't use the `Subscribe` attribute with the psr-14 event bus. +## Chain Event Bus + +If you want to use multiple event buses, you can use the `ChainEventBus`. + +```php +use Patchlevel\EventSourcing\EventBus\ChainEventBus; + +$eventBus = new ChainEventBus([ + $eventBus1, + $eventBus2, +]); +``` + ## Learn more * [How to decorate messages](message_decorator.md) diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index 4ef9dadf8..4f3c22ea2 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -282,10 +282,11 @@ After we have defined everything, we still have to plug the whole thing together ```php use Doctrine\DBAL\DriverManager; +use Patchlevel\EventSourcing\EventBus\ChainEventBus; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus; use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; @@ -320,12 +321,12 @@ $projectionist = new DefaultProjectionist( $projectorRepository, ); -$eventBus = SyncProjectionistEventBusWrapper::createWithDefaultLockStrategy( +$eventBus = new ChainEventBus([ DefaultEventBus::create([ new SendCheckInEmailProcessor($mailer), ]), - $projectionist, -); + ProjectionistEventBus::createWithDefaultLockStrategy($projectionist) +]); $repositoryManager = new DefaultRepositoryManager( $aggregateRegistry, diff --git a/src/EventBus/ChainEventBus.php b/src/EventBus/ChainEventBus.php new file mode 100644 index 000000000..5077293a1 --- /dev/null +++ b/src/EventBus/ChainEventBus.php @@ -0,0 +1,21 @@ + $eventBuses */ + public function __construct( + private readonly iterable $eventBuses, + ) { + } + + public function dispatch(Message ...$messages): void + { + foreach ($this->eventBuses as $eventBus) { + $eventBus->dispatch(...$messages); + } + } +} diff --git a/src/Projection/Projectionist/SyncProjectionistEventBusWrapper.php b/src/Projection/Projectionist/ProjectionistEventBus.php similarity index 75% rename from src/Projection/Projectionist/SyncProjectionistEventBusWrapper.php rename to src/Projection/Projectionist/ProjectionistEventBus.php index 66fd37282..36bd9a33a 100644 --- a/src/Projection/Projectionist/SyncProjectionistEventBusWrapper.php +++ b/src/Projection/Projectionist/ProjectionistEventBus.php @@ -9,10 +9,9 @@ use Symfony\Component\Lock\LockFactory; use Symfony\Component\Lock\Store\FlockStore; -final class SyncProjectionistEventBusWrapper implements EventBus +final class ProjectionistEventBus implements EventBus { public function __construct( - private readonly EventBus $parentEventBus, private readonly Projectionist $projectionist, private readonly LockFactory $lockFactory, private readonly bool $throwByError = true, @@ -21,8 +20,6 @@ public function __construct( public function dispatch(Message ...$messages): void { - $this->parentEventBus->dispatch(...$messages); - $lock = $this->lockFactory->createLock('projectionist-run'); if (!$lock->acquire(true)) { @@ -36,10 +33,9 @@ public function dispatch(Message ...$messages): void } } - public static function createWithDefaultLockStrategy(EventBus $parentEventBus, Projectionist $projectionist): self + public static function createWithDefaultLockStrategy(Projectionist $projectionist): self { return new self( - $parentEventBus, $projectionist, new LockFactory( new FlockStore(), diff --git a/tests/Benchmark/SyncProjectionistBench.php b/tests/Benchmark/SyncProjectionistBench.php index 535fe20f7..7e60da4ad 100644 --- a/tests/Benchmark/SyncProjectionistBench.php +++ b/tests/Benchmark/SyncProjectionistBench.php @@ -6,12 +6,13 @@ use Doctrine\DBAL\Driver\PDO\SQLite\Driver; use Doctrine\DBAL\DriverManager; +use Patchlevel\EventSourcing\EventBus\ChainEventBus; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Lock\DoctrineDbalStoreSchemaAdapter; use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; @@ -71,18 +72,18 @@ public function setUp(): void $projectionRepository, ); - $innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]); - $lockStorage = new LockDoctrineDbalStore($connection); $lockStorageAdapter = new DoctrineDbalStoreSchemaAdapter($lockStorage); - $this->bus = new SyncProjectionistEventBusWrapper( - $innerEventStream, - $projectionist, - new LockFactory( - new LockDoctrineDbalStore($connection), + $this->bus = new ChainEventBus([ + DefaultEventBus::create([new SendEmailProcessor()]), + new ProjectionistEventBus( + $projectionist, + new LockFactory( + new LockDoctrineDbalStore($connection), + ), ), - ); + ]); $this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata()); diff --git a/tests/Integration/BankAccountSplitStream/IntegrationTest.php b/tests/Integration/BankAccountSplitStream/IntegrationTest.php index 342c7059b..6e1034b35 100644 --- a/tests/Integration/BankAccountSplitStream/IntegrationTest.php +++ b/tests/Integration/BankAccountSplitStream/IntegrationTest.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream; use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\EventBus\ChainEventBus; use Patchlevel\EventSourcing\EventBus\Decorator\ChainMessageDecorator; use Patchlevel\EventSourcing\EventBus\Decorator\SplitStreamDecorator; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; @@ -13,7 +14,7 @@ use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -63,18 +64,20 @@ public function testSuccessful(): void $projectionRepository, ); - $eventStream = new SyncProjectionistEventBusWrapper( + $eventBus = new ChainEventBus([ DefaultEventBus::create(), - $projectionist, - new LockFactory( - new LockInMemoryStore(), + new ProjectionistEventBus( + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), ), - ); + ]); $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), $store, - $eventStream, + $eventBus, null, new ChainMessageDecorator([ new SplitStreamDecorator(new AttributeEventMetadataFactory()), @@ -107,7 +110,7 @@ public function testSuccessful(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), $store, - $eventStream, + $eventBus, null, new ChainMessageDecorator([ new SplitStreamDecorator(new AttributeEventMetadataFactory()), @@ -141,7 +144,7 @@ public function testSuccessful(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), $store, - $eventStream, + $eventBus, null, new ChainMessageDecorator([ new SplitStreamDecorator(new AttributeEventMetadataFactory()), diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index 1b1d86bbd..b05ef8d03 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -5,12 +5,13 @@ namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation; use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\EventBus\ChainEventBus; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -62,20 +63,22 @@ public function testSuccessful(): void $projectorRepository, ); - $innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]); - - $eventStream = new SyncProjectionistEventBusWrapper( - $innerEventStream, - $projectionist, - new LockFactory( - new LockInMemoryStore(), + $eventBus = new ChainEventBus([ + DefaultEventBus::create([ + new SendEmailProcessor(), + ]), + new ProjectionistEventBus( + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), ), - ); + ]); $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - $eventStream, + $eventBus, null, new FooMessageDecorator(), ); @@ -103,7 +106,7 @@ public function testSuccessful(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - $eventStream, + $eventBus, ); $repository = $manager->get(Profile::class); $profile = $repository->load($profileId); @@ -134,20 +137,22 @@ public function testSnapshot(): void $projectorRepository, ); - $innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]); - - $eventStream = new SyncProjectionistEventBusWrapper( - $innerEventStream, - $projectionist, - new LockFactory( - new LockInMemoryStore(), + $eventBus = new ChainEventBus([ + DefaultEventBus::create([ + new SendEmailProcessor(), + ]), + new ProjectionistEventBus( + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), ), - ); + ]); $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - $eventStream, + $eventBus, new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), new FooMessageDecorator(), ); @@ -175,7 +180,7 @@ public function testSnapshot(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - $eventStream, + $eventBus, ); $repository = $manager->get(Profile::class); $profile = $repository->load($profileId); diff --git a/tests/Integration/Outbox/OutboxTest.php b/tests/Integration/Outbox/OutboxTest.php index 5781abcd6..6146901b0 100644 --- a/tests/Integration/Outbox/OutboxTest.php +++ b/tests/Integration/Outbox/OutboxTest.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Integration\Outbox; use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\EventBus\ChainEventBus; use Patchlevel\EventSourcing\EventBus\DefaultConsumer; use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore; use Patchlevel\EventSourcing\Outbox\EventBusPublisher; @@ -13,7 +14,7 @@ use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator; @@ -76,15 +77,17 @@ public function testSuccessful(): void $eventBusConsumer = DefaultConsumer::create([new SendEmailProcessor()]); - $eventStream = new SyncProjectionistEventBusWrapper( + $eventBus = new ChainEventBus([ $outboxEventBus, - $projectionist, - new LockFactory( - new LockInMemoryStore(), + new ProjectionistEventBus( + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), ), - ); + ]); - $repository = new DefaultRepository($store, $eventStream, Profile::metadata()); + $repository = new DefaultRepository($store, $eventBus, Profile::metadata()); $schemaDirector = new DoctrineSchemaDirector( $this->connection, diff --git a/tests/Integration/Projectionist/ProjectionistTest.php b/tests/Integration/Projectionist/ProjectionistTest.php index dca481b29..6dee3e073 100644 --- a/tests/Integration/Projectionist/ProjectionistTest.php +++ b/tests/Integration/Projectionist/ProjectionistTest.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Integration\Projectionist; use Doctrine\DBAL\Connection; +use Patchlevel\EventSourcing\EventBus\ChainEventBus; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Lock\DoctrineDbalStoreSchemaAdapter; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; @@ -12,7 +13,7 @@ use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator; @@ -117,11 +118,13 @@ public function testSync(): void $manager = new DefaultRepositoryManager( $aggregateRegistry, $store, - new SyncProjectionistEventBusWrapper( + new ChainEventBus([ DefaultEventBus::create(), - $projectionist, - new LockFactory($lockStore), - ), + new ProjectionistEventBus( + $projectionist, + new LockFactory($lockStore), + ), + ]), ); $repository = $manager->get(Profile::class); diff --git a/tests/Unit/EventBus/ChainEventBusTest.php b/tests/Unit/EventBus/ChainEventBusTest.php new file mode 100644 index 000000000..e6bb572da --- /dev/null +++ b/tests/Unit/EventBus/ChainEventBusTest.php @@ -0,0 +1,46 @@ +prophesize(EventBus::class); + $eventBus1->dispatch($message1, $message2)->shouldBeCalled(); + + $eventBus2 = $this->prophesize(EventBus::class); + $eventBus2->dispatch($message1, $message2)->shouldBeCalled(); + + $chainEventBus = new ChainEventBus([$eventBus1->reveal(), $eventBus2->reveal()]); + $chainEventBus->dispatch($message1, $message2); + } +} diff --git a/tests/Unit/Projection/Projectionist/SyncProjectionistEventBusWrapperTest.php b/tests/Unit/Projection/Projectionist/ProjectionistEventBusTest.php similarity index 64% rename from tests/Unit/Projection/Projectionist/SyncProjectionistEventBusWrapperTest.php rename to tests/Unit/Projection/Projectionist/ProjectionistEventBusTest.php index 99d3224ad..55537d2e7 100644 --- a/tests/Unit/Projection/Projectionist/SyncProjectionistEventBusWrapperTest.php +++ b/tests/Unit/Projection/Projectionist/ProjectionistEventBusTest.php @@ -4,17 +4,16 @@ namespace Patchlevel\EventSourcing\Tests\Unit\Projection\Projectionist; -use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\Projection\Projectionist\Projectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\TestCase; use Prophecy\PhpUnit\ProphecyTrait; -/** @covers \Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper */ -final class SyncProjectionistEventBusWrapperTest extends TestCase +/** @covers \Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistEventBus */ +final class ProjectionistEventBusTest extends TestCase { use ProphecyTrait; @@ -27,16 +26,11 @@ public function testDispatch(): void Message::create(new ProfileVisited(ProfileId::fromString('2'))), ]; - $parentEventBus = $this->prophesize(EventBus::class); - $parentEventBus->dispatch(...$messages)->shouldBeCalledOnce(); - $parentEventBus->reveal(); - $projectionist = $this->prophesize(Projectionist::class); $projectionist->run()->shouldBeCalledOnce(); $projectionist->reveal(); - $eventBus = new SyncProjectionistEventBusWrapper( - $parentEventBus->reveal(), + $eventBus = new ProjectionistEventBus( $projectionist->reveal(), );