From 00a3299ac6e4de1afcd5c7dba572629e5fea9735 Mon Sep 17 00:00:00 2001 From: David Badura Date: Thu, 11 Jan 2024 16:23:35 +0100 Subject: [PATCH] refactor event bus & drop symfony event bus --- composer.json | 3 +- composer.lock | 142 +++++++-------- docs/pages/event_bus.md | 28 +-- docs/pages/getting_started.md | 17 +- src/EventBus/AttributeListenerProvider.php | 50 +++++ src/EventBus/DefaultEventBus.php | 45 ++++- src/EventBus/DuplicateSubscribeMethod.php | 27 --- src/EventBus/Listener.php | 10 - src/EventBus/ListenerDescriptor.php | 43 +++++ src/EventBus/ListenerProvider.php | 11 ++ src/EventBus/Subscriber.php | 59 ------ src/EventBus/SymfonyEventBus.php | 43 ----- src/WatchServer/WatchEventBusWrapper.php | 30 +++ src/WatchServer/WatchListener.php | 25 --- .../Processor/SendEmailProcessor.php | 3 +- tests/Benchmark/SimpleSetupBench.php | 2 +- tests/Benchmark/SnapshotsBench.php | 2 +- tests/Benchmark/SplitStreamBench.php | 2 +- tests/Benchmark/SyncProjectionistBench.php | 3 +- tests/Benchmark/blackfire.php | 2 +- .../IntegrationTest.php | 2 +- .../BasicIntegrationTest.php | 84 +-------- .../Processor/SendEmailProcessor.php | 3 +- tests/Integration/Outbox/OutboxTest.php | 10 +- .../Outbox/Processor/SendEmailProcessor.php | 9 +- .../Pipeline/PipelineChangeStoreTest.php | 4 +- .../Projectionist/ProjectionistTest.php | 4 +- .../AttributeListenerProviderTest.php | 115 ++++++++++++ tests/Unit/EventBus/DefaultEventBusTest.php | 100 +++++----- .../Unit/EventBus/ListenerDescriptorTest.php | 48 +++++ tests/Unit/EventBus/SubscriberTest.php | 172 ------------------ tests/Unit/EventBus/SymfonyEventBusTest.php | 112 ------------ tests/Unit/Fixture/DummyListener.php | 12 ++ ...rTest.php => WatchEventBusWrapperTest.php} | 31 +++- 34 files changed, 525 insertions(+), 728 deletions(-) create mode 100644 src/EventBus/AttributeListenerProvider.php delete mode 100644 src/EventBus/DuplicateSubscribeMethod.php delete mode 100644 src/EventBus/Listener.php create mode 100644 src/EventBus/ListenerDescriptor.php create mode 100644 src/EventBus/ListenerProvider.php delete mode 100644 src/EventBus/Subscriber.php delete mode 100644 src/EventBus/SymfonyEventBus.php create mode 100644 src/WatchServer/WatchEventBusWrapper.php delete mode 100644 src/WatchServer/WatchListener.php create mode 100644 tests/Unit/EventBus/AttributeListenerProviderTest.php create mode 100644 tests/Unit/EventBus/ListenerDescriptorTest.php delete mode 100644 tests/Unit/EventBus/SubscriberTest.php delete mode 100644 tests/Unit/EventBus/SymfonyEventBusTest.php create mode 100644 tests/Unit/Fixture/DummyListener.php rename tests/Unit/WatchServer/{WatchListenerTest.php => WatchEventBusWrapperTest.php} (54%) diff --git a/composer.json b/composer.json index 0b48a17f5..dcc1b0f1c 100644 --- a/composer.json +++ b/composer.json @@ -55,8 +55,7 @@ "symfony/messenger": "for an advanced event bus" }, "conflict": { - "doctrine/migrations": "<3.3", - "symfony/messenger": "<5.4" + "doctrine/migrations": "<3.3" }, "config": { "preferred-install": { diff --git a/composer.lock b/composer.lock index 3a6cde5af..48ca55f06 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "6d8b289f331bdf12d6cd2dca8ef7ff36", + "content-hash": "fd722c74270b3c9e87c812ffd7853620", "packages": [ { "name": "brick/math", @@ -1016,16 +1016,16 @@ }, { "name": "symfony/console", - "version": "v7.0.1", + "version": "v7.0.2", "source": { "type": "git", "url": "https://github.com/symfony/console.git", - "reference": "cdce5c684b2f920bb1343deecdfba356ffad83d5" + "reference": "f8587c4cdc5acad67af71c37db34ef03af91e59c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/console/zipball/cdce5c684b2f920bb1343deecdfba356ffad83d5", - "reference": "cdce5c684b2f920bb1343deecdfba356ffad83d5", + "url": "https://api.github.com/repos/symfony/console/zipball/f8587c4cdc5acad67af71c37db34ef03af91e59c", + "reference": "f8587c4cdc5acad67af71c37db34ef03af91e59c", "shasum": "" }, "require": { @@ -1089,7 +1089,7 @@ "terminal" ], "support": { - "source": "https://github.com/symfony/console/tree/v7.0.1" + "source": "https://github.com/symfony/console/tree/v7.0.2" }, "funding": [ { @@ -1105,20 +1105,20 @@ "type": "tidelift" } ], - "time": "2023-12-01T15:10:06+00:00" + "time": "2023-12-10T16:54:46+00:00" }, { "name": "symfony/event-dispatcher", - "version": "v7.0.0", + "version": "v7.0.2", "source": { "type": "git", "url": "https://github.com/symfony/event-dispatcher.git", - "reference": "c459b40ffe67c49af6fd392aac374c9edf8a027e" + "reference": "098b62ae81fdd6cbf941f355059f617db28f4f9a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/event-dispatcher/zipball/c459b40ffe67c49af6fd392aac374c9edf8a027e", - "reference": "c459b40ffe67c49af6fd392aac374c9edf8a027e", + "url": "https://api.github.com/repos/symfony/event-dispatcher/zipball/098b62ae81fdd6cbf941f355059f617db28f4f9a", + "reference": "098b62ae81fdd6cbf941f355059f617db28f4f9a", "shasum": "" }, "require": { @@ -1169,7 +1169,7 @@ "description": "Provides tools that allow your application components to communicate with each other by dispatching events and listening to them", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/event-dispatcher/tree/v7.0.0" + "source": "https://github.com/symfony/event-dispatcher/tree/v7.0.2" }, "funding": [ { @@ -1185,7 +1185,7 @@ "type": "tidelift" } ], - "time": "2023-07-27T16:29:09+00:00" + "time": "2023-12-27T22:24:19+00:00" }, { "name": "symfony/event-dispatcher-contracts", @@ -1329,16 +1329,16 @@ }, { "name": "symfony/lock", - "version": "v7.0.0", + "version": "v7.0.2", "source": { "type": "git", "url": "https://github.com/symfony/lock.git", - "reference": "68fec7f9e06bfad1a927380655937da79b799186" + "reference": "0d11500147d50f045b5878c0a235f6e8e8a740d4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/lock/zipball/68fec7f9e06bfad1a927380655937da79b799186", - "reference": "68fec7f9e06bfad1a927380655937da79b799186", + "url": "https://api.github.com/repos/symfony/lock/zipball/0d11500147d50f045b5878c0a235f6e8e8a740d4", + "reference": "0d11500147d50f045b5878c0a235f6e8e8a740d4", "shasum": "" }, "require": { @@ -1387,7 +1387,7 @@ "semaphore" ], "support": { - "source": "https://github.com/symfony/lock/tree/v7.0.0" + "source": "https://github.com/symfony/lock/tree/v7.0.2" }, "funding": [ { @@ -1403,7 +1403,7 @@ "type": "tidelift" } ], - "time": "2023-11-21T15:08:38+00:00" + "time": "2023-12-19T11:23:03+00:00" }, { "name": "symfony/polyfill-ctype", @@ -1819,16 +1819,16 @@ }, { "name": "symfony/string", - "version": "v7.0.0", + "version": "v7.0.2", "source": { "type": "git", "url": "https://github.com/symfony/string.git", - "reference": "92bd2bfbba476d4a1838e5e12168bef2fd1e6620" + "reference": "cc78f14f91f5e53b42044d0620961c48028ff9f5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/string/zipball/92bd2bfbba476d4a1838e5e12168bef2fd1e6620", - "reference": "92bd2bfbba476d4a1838e5e12168bef2fd1e6620", + "url": "https://api.github.com/repos/symfony/string/zipball/cc78f14f91f5e53b42044d0620961c48028ff9f5", + "reference": "cc78f14f91f5e53b42044d0620961c48028ff9f5", "shasum": "" }, "require": { @@ -1885,7 +1885,7 @@ "utf8" ], "support": { - "source": "https://github.com/symfony/string/tree/v7.0.0" + "source": "https://github.com/symfony/string/tree/v7.0.2" }, "funding": [ { @@ -1901,7 +1901,7 @@ "type": "tidelift" } ], - "time": "2023-11-29T08:40:23+00:00" + "time": "2023-12-10T16:54:46+00:00" } ], "packages-dev": [ @@ -4732,16 +4732,16 @@ }, { "name": "phpdocumentor/type-resolver", - "version": "1.7.3", + "version": "1.8.0", "source": { "type": "git", "url": "https://github.com/phpDocumentor/TypeResolver.git", - "reference": "3219c6ee25c9ea71e3d9bbaf39c67c9ebd499419" + "reference": "fad452781b3d774e3337b0c0b245dd8e5a4455fc" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpDocumentor/TypeResolver/zipball/3219c6ee25c9ea71e3d9bbaf39c67c9ebd499419", - "reference": "3219c6ee25c9ea71e3d9bbaf39c67c9ebd499419", + "url": "https://api.github.com/repos/phpDocumentor/TypeResolver/zipball/fad452781b3d774e3337b0c0b245dd8e5a4455fc", + "reference": "fad452781b3d774e3337b0c0b245dd8e5a4455fc", "shasum": "" }, "require": { @@ -4784,9 +4784,9 @@ "description": "A PSR-5 based resolver of Class names, Types and Structural Element Names", "support": { "issues": "https://github.com/phpDocumentor/TypeResolver/issues", - "source": "https://github.com/phpDocumentor/TypeResolver/tree/1.7.3" + "source": "https://github.com/phpDocumentor/TypeResolver/tree/1.8.0" }, - "time": "2023-08-12T11:01:26+00:00" + "time": "2024-01-11T11:49:22+00:00" }, { "name": "phpspec/prophecy", @@ -4911,16 +4911,16 @@ }, { "name": "phpstan/phpdoc-parser", - "version": "1.24.5", + "version": "1.25.0", "source": { "type": "git", "url": "https://github.com/phpstan/phpdoc-parser.git", - "reference": "fedf211ff14ec8381c9bf5714e33a7a552dd1acc" + "reference": "bd84b629c8de41aa2ae82c067c955e06f1b00240" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpdoc-parser/zipball/fedf211ff14ec8381c9bf5714e33a7a552dd1acc", - "reference": "fedf211ff14ec8381c9bf5714e33a7a552dd1acc", + "url": "https://api.github.com/repos/phpstan/phpdoc-parser/zipball/bd84b629c8de41aa2ae82c067c955e06f1b00240", + "reference": "bd84b629c8de41aa2ae82c067c955e06f1b00240", "shasum": "" }, "require": { @@ -4952,22 +4952,22 @@ "description": "PHPDoc parser with support for nullable, intersection and generic types", "support": { "issues": "https://github.com/phpstan/phpdoc-parser/issues", - "source": "https://github.com/phpstan/phpdoc-parser/tree/1.24.5" + "source": "https://github.com/phpstan/phpdoc-parser/tree/1.25.0" }, - "time": "2023-12-16T09:33:33+00:00" + "time": "2024-01-04T17:06:16+00:00" }, { "name": "phpstan/phpstan", - "version": "1.10.50", + "version": "1.10.55", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "06a98513ac72c03e8366b5a0cb00750b487032e4" + "reference": "9a88f9d18ddf4cf54c922fbeac16c4cb164c5949" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/06a98513ac72c03e8366b5a0cb00750b487032e4", - "reference": "06a98513ac72c03e8366b5a0cb00750b487032e4", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/9a88f9d18ddf4cf54c922fbeac16c4cb164c5949", + "reference": "9a88f9d18ddf4cf54c922fbeac16c4cb164c5949", "shasum": "" }, "require": { @@ -5016,7 +5016,7 @@ "type": "tidelift" } ], - "time": "2023-12-13T10:59:42+00:00" + "time": "2024-01-08T12:32:40+00:00" }, { "name": "phpunit/php-code-coverage", @@ -6869,16 +6869,16 @@ }, { "name": "symfony/clock", - "version": "v7.0.0", + "version": "v7.0.2", "source": { "type": "git", "url": "https://github.com/symfony/clock.git", - "reference": "c696b075befdd4bcffe5ef2eab9a32a1a9c0d29d" + "reference": "67c5ae749ebabe7d8c84c3cab2544331eee7d2cf" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/clock/zipball/c696b075befdd4bcffe5ef2eab9a32a1a9c0d29d", - "reference": "c696b075befdd4bcffe5ef2eab9a32a1a9c0d29d", + "url": "https://api.github.com/repos/symfony/clock/zipball/67c5ae749ebabe7d8c84c3cab2544331eee7d2cf", + "reference": "67c5ae749ebabe7d8c84c3cab2544331eee7d2cf", "shasum": "" }, "require": { @@ -6923,7 +6923,7 @@ "time" ], "support": { - "source": "https://github.com/symfony/clock/tree/v7.0.0" + "source": "https://github.com/symfony/clock/tree/v7.0.2" }, "funding": [ { @@ -6939,7 +6939,7 @@ "type": "tidelift" } ], - "time": "2023-11-25T20:15:12+00:00" + "time": "2023-12-27T08:42:13+00:00" }, { "name": "symfony/deprecation-contracts", @@ -7465,16 +7465,16 @@ }, { "name": "symfony/process", - "version": "v7.0.0", + "version": "v7.0.2", "source": { "type": "git", "url": "https://github.com/symfony/process.git", - "reference": "13bdb1670c7f510494e04fcb2bfa29af63db9c0d" + "reference": "acd3eb5cb02382c1cb0287ba29b2908cc6ffa83a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/process/zipball/13bdb1670c7f510494e04fcb2bfa29af63db9c0d", - "reference": "13bdb1670c7f510494e04fcb2bfa29af63db9c0d", + "url": "https://api.github.com/repos/symfony/process/zipball/acd3eb5cb02382c1cb0287ba29b2908cc6ffa83a", + "reference": "acd3eb5cb02382c1cb0287ba29b2908cc6ffa83a", "shasum": "" }, "require": { @@ -7506,7 +7506,7 @@ "description": "Executes commands in sub-processes", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/process/tree/v7.0.0" + "source": "https://github.com/symfony/process/tree/v7.0.2" }, "funding": [ { @@ -7522,7 +7522,7 @@ "type": "tidelift" } ], - "time": "2023-11-20T16:43:42+00:00" + "time": "2023-12-24T09:15:37+00:00" }, { "name": "symfony/stopwatch", @@ -7588,16 +7588,16 @@ }, { "name": "symfony/var-dumper", - "version": "v7.0.0", + "version": "v7.0.2", "source": { "type": "git", "url": "https://github.com/symfony/var-dumper.git", - "reference": "cf0220fc7607476fd0d001ab3ed9e830d1fdda56" + "reference": "5f6f1a527002068f6d40fda068399220eabebf71" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/var-dumper/zipball/cf0220fc7607476fd0d001ab3ed9e830d1fdda56", - "reference": "cf0220fc7607476fd0d001ab3ed9e830d1fdda56", + "url": "https://api.github.com/repos/symfony/var-dumper/zipball/5f6f1a527002068f6d40fda068399220eabebf71", + "reference": "5f6f1a527002068f6d40fda068399220eabebf71", "shasum": "" }, "require": { @@ -7651,7 +7651,7 @@ "dump" ], "support": { - "source": "https://github.com/symfony/var-dumper/tree/v7.0.0" + "source": "https://github.com/symfony/var-dumper/tree/v7.0.2" }, "funding": [ { @@ -7667,20 +7667,20 @@ "type": "tidelift" } ], - "time": "2023-11-27T12:39:18+00:00" + "time": "2023-12-28T19:18:20+00:00" }, { "name": "symfony/var-exporter", - "version": "v7.0.1", + "version": "v7.0.2", "source": { "type": "git", "url": "https://github.com/symfony/var-exporter.git", - "reference": "a3d7c877414fcd59ab7075ecdc3b8f9c00f7bcc3" + "reference": "345c62fefe92243c3a06fc0cc65f2ec1a47e0764" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/var-exporter/zipball/a3d7c877414fcd59ab7075ecdc3b8f9c00f7bcc3", - "reference": "a3d7c877414fcd59ab7075ecdc3b8f9c00f7bcc3", + "url": "https://api.github.com/repos/symfony/var-exporter/zipball/345c62fefe92243c3a06fc0cc65f2ec1a47e0764", + "reference": "345c62fefe92243c3a06fc0cc65f2ec1a47e0764", "shasum": "" }, "require": { @@ -7725,7 +7725,7 @@ "serialize" ], "support": { - "source": "https://github.com/symfony/var-exporter/tree/v7.0.1" + "source": "https://github.com/symfony/var-exporter/tree/v7.0.2" }, "funding": [ { @@ -7741,7 +7741,7 @@ "type": "tidelift" } ], - "time": "2023-11-30T11:38:21+00:00" + "time": "2023-12-27T08:42:13+00:00" }, { "name": "thecodingmachine/safe", @@ -7934,16 +7934,16 @@ }, { "name": "vimeo/psalm", - "version": "5.18.0", + "version": "5.19.0", "source": { "type": "git", "url": "https://github.com/vimeo/psalm.git", - "reference": "b113f3ed0259fd6e212d87c3df80eec95a6abf19" + "reference": "06b71be009a6bd6d81b9811855d6629b9fe90e1b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/vimeo/psalm/zipball/b113f3ed0259fd6e212d87c3df80eec95a6abf19", - "reference": "b113f3ed0259fd6e212d87c3df80eec95a6abf19", + "url": "https://api.github.com/repos/vimeo/psalm/zipball/06b71be009a6bd6d81b9811855d6629b9fe90e1b", + "reference": "06b71be009a6bd6d81b9811855d6629b9fe90e1b", "shasum": "" }, "require": { @@ -8040,7 +8040,7 @@ "issues": "https://github.com/vimeo/psalm/issues", "source": "https://github.com/vimeo/psalm" }, - "time": "2023-12-16T09:37:35+00:00" + "time": "2024-01-09T21:02:43+00:00" }, { "name": "webmozart/assert", diff --git a/docs/pages/event_bus.md b/docs/pages/event_bus.md index ed856a903..fe2d2af7b 100644 --- a/docs/pages/event_bus.md +++ b/docs/pages/event_bus.md @@ -137,40 +137,16 @@ $eventBus = new SymfonyEventBus($symfonyMessenger); ## Listener -A listener must implement the `Listener` interface and define the `__invoke` method. +You can listen for specific events with the attribute `Subscribe`. This listener is then called for all saved events / messages. -```php -use Patchlevel\EventSourcing\EventBus\Listener; -use Patchlevel\EventSourcing\EventBus\Message; - -final class WelcomeListener implements Listener -{ - public function __invoke(Message $message): void - { - if ($message->event() instanceof ProfileCreated) { - echo 'Welcome!'; - } - } -} -``` - -!!! warning - - If you only want to listen to certain messages, - then you have to check it in the `__invoke` method or use the subscriber. - -## Subscriber - -A `Subscriber` is a listener, except that it has implemented the invoke method itself. -Instead, you can define your own and multiple methods and listen for specific events with the attribute `Subscribe`. ```php use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\EventBus\Listener; use Patchlevel\EventSourcing\EventBus\Message; -final class WelcomeSubscriber extends Subscriber +final class WelcomeSubscriber { #[Subscribe(ProfileCreated::class)] public function onProfileCreated(Message $message): void diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index 86597dc24..63aa6b980 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -251,7 +251,7 @@ use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\EventBus\Subscriber; -final class SendCheckInEmailProcessor extends Subscriber +final class SendCheckInEmailProcessor { public function __construct( private readonly Mailer $mailer @@ -283,6 +283,7 @@ use Doctrine\DBAL\DriverManager; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; +use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; @@ -294,9 +295,6 @@ $connection = DriverManager::getConnection([ $mailer = /* your own mailer */; -$eventBus = new DefaultEventBus(); -$eventBus->addListener(new SendCheckInEmailProcessor($mailer)); - $serializer = DefaultEventSerializer::createFromPaths(['src/Domain/Hotel/Event']); $aggregateRegistry = (new AttributeAggregateRootRegistryFactory)->create(['src/Domain/Hotel']); @@ -320,6 +318,17 @@ $projectionist = new DefaultProjectionist( $projectorRepository ); +$eventBus = SyncProjectionistEventBusWrapper::createWithDefaultLockStrategy( + DefaultEventBus::create([ + new SyncProjectorListener($projectionist), + ]), + $projectionist +); + +$eventBus = DefaultEventBus::create([ + new SendCheckInEmailProcessor($mailer), +]); + $repositoryManager = new DefaultRepositoryManager( $aggregateRegistry, $store, diff --git a/src/EventBus/AttributeListenerProvider.php b/src/EventBus/AttributeListenerProvider.php new file mode 100644 index 000000000..06a732baa --- /dev/null +++ b/src/EventBus/AttributeListenerProvider.php @@ -0,0 +1,50 @@ +>|null */ + private array|null $subscribeMethods = null; + + /** @param iterable $listeners */ + public function __construct( + private readonly iterable $listeners, + ) { + } + + /** @return iterable */ + public function listenersForEvent(object $event): iterable + { + if ($this->subscribeMethods !== null) { + return $this->subscribeMethods[$event::class] ?? []; + } + + $this->subscribeMethods = []; + + foreach ($this->listeners as $listener) { + $reflection = new ReflectionClass($listener); + $methods = $reflection->getMethods(); + + foreach ($methods as $method) { + $attributes = $method->getAttributes(Subscribe::class); + + foreach ($attributes as $attribute) { + $instance = $attribute->newInstance(); + $eventClass = $instance->eventClass; + + $this->subscribeMethods[$eventClass][] = new ListenerDescriptor( + $listener->{$method->getName()}(...), + ); + } + } + } + + return $this->subscribeMethods[$event::class] ?? []; + } +} diff --git a/src/EventBus/DefaultEventBus.php b/src/EventBus/DefaultEventBus.php index 7168e38ca..1e0aa8d37 100644 --- a/src/EventBus/DefaultEventBus.php +++ b/src/EventBus/DefaultEventBus.php @@ -4,8 +4,10 @@ namespace Patchlevel\EventSourcing\EventBus; -use function array_merge; +use Psr\Log\LoggerInterface; + use function array_shift; +use function sprintf; final class DefaultEventBus implements EventBus { @@ -13,9 +15,9 @@ final class DefaultEventBus implements EventBus private array $queue; private bool $processing; - /** @param list $listeners */ public function __construct( - private array $listeners = [], + private readonly ListenerProvider $listenerProvider, + private readonly LoggerInterface|null $logger = null, ) { $this->queue = []; $this->processing = false; @@ -23,25 +25,52 @@ public function __construct( public function dispatch(Message ...$messages): void { - $this->queue = array_merge($this->queue, $messages); + foreach ($messages as $message) { + $this->logger?->debug(sprintf( + 'EventBus: Add message "%s" to queue.', + $message->event()::class, + )); + + $this->queue[] = $message; + } if ($this->processing) { + $this->logger?->debug('EventBus: Is already processing, dont start new processing.'); + return; } $this->processing = true; + $this->logger?->debug('EventBus: Start processing queue.'); + while ($message = array_shift($this->queue)) { - foreach ($this->listeners as $listener) { - $listener($message); + $this->logger?->debug(sprintf( + 'EventBus: Dispatch message "%s" to listeners.', + $message->event()::class, + )); + + $listeners = $this->listenerProvider->listenersForEvent($message->event()); + + foreach ($listeners as $listener) { + $this->logger?->info(sprintf( + 'EventBus: Dispatch message with event "%s" to listener "%s".', + $message->event()::class, + $listener->name(), + )); + + ($listener->callable())($message); } } $this->processing = false; + + $this->logger?->debug('EventBus: Finished processing queue.'); } - public function addListener(Listener $listener): void + /** @param iterable $listeners */ + public static function create(iterable $listeners = []): self { - $this->listeners[] = $listener; + return new self(new AttributeListenerProvider($listeners)); } } diff --git a/src/EventBus/DuplicateSubscribeMethod.php b/src/EventBus/DuplicateSubscribeMethod.php deleted file mode 100644 index a2daaa0fc..000000000 --- a/src/EventBus/DuplicateSubscribeMethod.php +++ /dev/null @@ -1,27 +0,0 @@ - $subscriber - * @param class-string $event - */ - public function __construct(string $subscriber, string $event, string $fistMethod, string $secondMethod) - { - parent::__construct( - sprintf( - 'Two methods "%s" and "%s" on the subscriber "%s" want to subscribe the same event "%s". Only one method can subscribe an event.', - $fistMethod, - $secondMethod, - $subscriber, - $event, - ), - ); - } -} diff --git a/src/EventBus/Listener.php b/src/EventBus/Listener.php deleted file mode 100644 index 2bc72b03e..000000000 --- a/src/EventBus/Listener.php +++ /dev/null @@ -1,10 +0,0 @@ -callable = $callable; + + $r = new ReflectionFunction($callable); + + if ($r->isAnonymous()) { + $this->name = 'Closure'; + } elseif (!$callable = $r->getClosureThis()) { + $class = $r->getClosureCalledClass(); + + $this->name = ($class ? $class->name . '::' : '') . $r->name; + } else { + $this->name = $callable::class . '::' . $r->name; + } + } + + public function name(): string + { + return $this->name; + } + + public function callable(): callable + { + return $this->callable; + } +} diff --git a/src/EventBus/ListenerProvider.php b/src/EventBus/ListenerProvider.php new file mode 100644 index 000000000..2bf4155ab --- /dev/null +++ b/src/EventBus/ListenerProvider.php @@ -0,0 +1,11 @@ + */ + public function listenersForEvent(object $event): iterable; +} diff --git a/src/EventBus/Subscriber.php b/src/EventBus/Subscriber.php deleted file mode 100644 index 994bdfacf..000000000 --- a/src/EventBus/Subscriber.php +++ /dev/null @@ -1,59 +0,0 @@ -|null */ - private array|null $subscribeMethods = null; - - final public function __invoke(Message $message): void - { - if ($this->subscribeMethods === null) { - $this->init(); - } - - $method = $this->subscribeMethods[$message->event()::class] ?? null; - - if (!$method) { - return; - } - - $this->$method($message); - } - - private function init(): void - { - $reflection = new ReflectionClass(static::class); - $methods = $reflection->getMethods(); - - $this->subscribeMethods = []; - - foreach ($methods as $method) { - $attributes = $method->getAttributes(Subscribe::class); - - foreach ($attributes as $attribute) { - $instance = $attribute->newInstance(); - $eventClass = $instance->eventClass; - - if (array_key_exists($eventClass, $this->subscribeMethods)) { - throw new DuplicateSubscribeMethod( - static::class, - $eventClass, - $this->subscribeMethods[$eventClass], - $method->getName(), - ); - } - - $this->subscribeMethods[$eventClass] = $method->getName(); - } - } - } -} diff --git a/src/EventBus/SymfonyEventBus.php b/src/EventBus/SymfonyEventBus.php deleted file mode 100644 index da9705280..000000000 --- a/src/EventBus/SymfonyEventBus.php +++ /dev/null @@ -1,43 +0,0 @@ -with(new DispatchAfterCurrentBusStamp()); - - $this->bus->dispatch($envelope); - } - } - - /** @param list $listeners */ - public static function create(array $listeners = []): static - { - $bus = new MessageBus([ - new HandleMessageMiddleware( - new HandlersLocator([Message::class => $listeners]), - true, - ), - ]); - - return new static($bus); - } -} diff --git a/src/WatchServer/WatchEventBusWrapper.php b/src/WatchServer/WatchEventBusWrapper.php new file mode 100644 index 000000000..025787f9a --- /dev/null +++ b/src/WatchServer/WatchEventBusWrapper.php @@ -0,0 +1,30 @@ +client->send($message); + } + } catch (SendingFailed) { + // to nothing + } + + $this->eventBus->dispatch(...$messages); + } +} diff --git a/src/WatchServer/WatchListener.php b/src/WatchServer/WatchListener.php deleted file mode 100644 index 5c1cf7d1b..000000000 --- a/src/WatchServer/WatchListener.php +++ /dev/null @@ -1,25 +0,0 @@ -client->send($message); - } catch (SendingFailed) { - // to nothing - } - } -} diff --git a/tests/Benchmark/BasicImplementation/Processor/SendEmailProcessor.php b/tests/Benchmark/BasicImplementation/Processor/SendEmailProcessor.php index 9a83040c5..d4ec19a64 100644 --- a/tests/Benchmark/BasicImplementation/Processor/SendEmailProcessor.php +++ b/tests/Benchmark/BasicImplementation/Processor/SendEmailProcessor.php @@ -6,11 +6,10 @@ use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\EventBus\Message; -use Patchlevel\EventSourcing\EventBus\Subscriber; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\SendEmailMock; -final class SendEmailProcessor extends Subscriber +final class SendEmailProcessor { #[Subscribe(ProfileCreated::class)] public function onProfileCreated(Message $message): void diff --git a/tests/Benchmark/SimpleSetupBench.php b/tests/Benchmark/SimpleSetupBench.php index a3db07219..c529aa527 100644 --- a/tests/Benchmark/SimpleSetupBench.php +++ b/tests/Benchmark/SimpleSetupBench.php @@ -45,7 +45,7 @@ public function setUp(): void 'path' => self::DB_PATH, ]); - $this->bus = new DefaultEventBus(); + $this->bus = DefaultEventBus::create(); $this->store = new DoctrineDbalStore( $connection, diff --git a/tests/Benchmark/SnapshotsBench.php b/tests/Benchmark/SnapshotsBench.php index 411989cec..4ccefc3f6 100644 --- a/tests/Benchmark/SnapshotsBench.php +++ b/tests/Benchmark/SnapshotsBench.php @@ -51,7 +51,7 @@ public function setUp(): void 'path' => self::DB_PATH, ]); - $this->bus = new DefaultEventBus(); + $this->bus = DefaultEventBus::create(); $this->store = new DoctrineDbalStore( $connection, diff --git a/tests/Benchmark/SplitStreamBench.php b/tests/Benchmark/SplitStreamBench.php index db3413f68..7af6ddf48 100644 --- a/tests/Benchmark/SplitStreamBench.php +++ b/tests/Benchmark/SplitStreamBench.php @@ -50,7 +50,7 @@ public function setUp(): void 'path' => self::DB_PATH, ]); - $this->bus = new DefaultEventBus(); + $this->bus = DefaultEventBus::create(); $this->store = new DoctrineDbalStore( $connection, diff --git a/tests/Benchmark/SyncProjectionistBench.php b/tests/Benchmark/SyncProjectionistBench.php index 667f36b24..64b723d64 100644 --- a/tests/Benchmark/SyncProjectionistBench.php +++ b/tests/Benchmark/SyncProjectionistBench.php @@ -73,8 +73,7 @@ public function setUp(): void $projectionRepository, ); - $innerEventStream = new DefaultEventBus(); - $innerEventStream->addListener(new SendEmailProcessor()); + $innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]); $lockStorage = new LockDoctrineDbalStore($connection); $lockStorageAdapter = new DoctrineDbalStoreSchemaAdapter($lockStorage); diff --git a/tests/Benchmark/blackfire.php b/tests/Benchmark/blackfire.php index fea73a725..da71ebf9b 100644 --- a/tests/Benchmark/blackfire.php +++ b/tests/Benchmark/blackfire.php @@ -26,7 +26,7 @@ 'path' => DB_PATH, ]); -$bus = new DefaultEventBus(); +$bus = DefaultEventBus::create(); $store = new DoctrineDbalStore( $connection, diff --git a/tests/Integration/BankAccountSplitStream/IntegrationTest.php b/tests/Integration/BankAccountSplitStream/IntegrationTest.php index 9d97ddc42..b8b58a7e4 100644 --- a/tests/Integration/BankAccountSplitStream/IntegrationTest.php +++ b/tests/Integration/BankAccountSplitStream/IntegrationTest.php @@ -65,7 +65,7 @@ public function testSuccessful(): void ); $eventStream = new SyncProjectionistEventBusWrapper( - new DefaultEventBus(), + DefaultEventBus::create(), $projectionist, new LockFactory( new LockInMemoryStore(), diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index 0073e53f1..dbf5c6b6b 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -6,7 +6,6 @@ use Doctrine\DBAL\Connection; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; -use Patchlevel\EventSourcing\EventBus\SymfonyEventBus; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory; use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; @@ -64,8 +63,7 @@ public function testSuccessful(): void $projectorRepository, ); - $innerEventStream = new DefaultEventBus(); - $innerEventStream->addListener(new SendEmailProcessor()); + $innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]); $eventStream = new SyncProjectionistEventBusWrapper( $innerEventStream, @@ -118,83 +116,6 @@ public function testSuccessful(): void self::assertSame(1, SendEmailMock::count()); } - public function testWithSymfonySuccessful(): void - { - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']), - 'eventstore', - ); - - $profileProjection = new ProfileProjector($this->connection); - $projectorRepository = new InMemoryProjectorRepository( - [$profileProjection], - ); - - $projectionist = new DefaultProjectionist( - $store, - new InMemoryStore(), - $projectorRepository, - ); - - $innerEventStream = SymfonyEventBus::create([ - new SendEmailProcessor(), - ]); - - $eventStream = new SyncProjectionistEventBusWrapper( - $innerEventStream, - $projectionist, - new LockFactory( - new LockInMemoryStore(), - ), - ); - - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - $eventStream, - ); - - $repository = $manager->get(Profile::class); - - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - $store, - ); - - $schemaDirector->create(); - $projectionist->boot(); - - $profileId = ProfileId::fromString('1'); - $profile = Profile::create($profileId, 'John'); - $repository->save($profile); - - $result = $this->connection->fetchAssociative('SELECT * FROM projection_profile WHERE id = ?', ['1']); - - self::assertIsArray($result); - self::assertArrayHasKey('id', $result); - self::assertSame('1', $result['id']); - self::assertSame('John', $result['name']); - - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - $eventStream, - null, - new FooMessageDecorator(), - ); - $repository = $manager->get(Profile::class); - - $profile = $repository->load($profileId); - - self::assertInstanceOf(Profile::class, $profile); - self::assertEquals($profileId, $profile->aggregateRootId()); - self::assertSame(1, $profile->playhead()); - self::assertSame('John', $profile->name()); - self::assertSame(1, SendEmailMock::count()); - } - public function testSnapshot(): void { $store = new DoctrineDbalStore( @@ -215,8 +136,7 @@ public function testSnapshot(): void $projectorRepository, ); - $innerEventStream = new DefaultEventBus(); - $innerEventStream->addListener(new SendEmailProcessor()); + $innerEventStream = DefaultEventBus::create([new SendEmailProcessor()]); $eventStream = new SyncProjectionistEventBusWrapper( $innerEventStream, diff --git a/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php index 911ec7d85..2e9c26072 100644 --- a/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php +++ b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php @@ -6,11 +6,10 @@ use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\EventBus\Message; -use Patchlevel\EventSourcing\EventBus\Subscriber; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\SendEmailMock; -final class SendEmailProcessor extends Subscriber +final class SendEmailProcessor { #[Subscribe(ProfileCreated::class)] public function onProfileCreated(Message $message): void diff --git a/tests/Integration/Outbox/OutboxTest.php b/tests/Integration/Outbox/OutboxTest.php index 466ba7f02..025e6467c 100644 --- a/tests/Integration/Outbox/OutboxTest.php +++ b/tests/Integration/Outbox/OutboxTest.php @@ -63,8 +63,9 @@ public function testSuccessful(): void 'outbox', ); - $realEventBus = new DefaultEventBus(); - $realEventBus->addListener(new SendEmailProcessor()); + $realEventBus = DefaultEventBus::create([ + new SendEmailProcessor(), + ]); $outboxEventBus = new OutboxEventBus($outboxStore); @@ -79,8 +80,9 @@ public function testSuccessful(): void $projectorRepository, ); - $realEventBus = new DefaultEventBus(); - $realEventBus->addListener(new SendEmailProcessor()); + $realEventBus = DefaultEventBus::create([ + new SendEmailProcessor(), + ]); $eventStream = new SyncProjectionistEventBusWrapper( $outboxEventBus, diff --git a/tests/Integration/Outbox/Processor/SendEmailProcessor.php b/tests/Integration/Outbox/Processor/SendEmailProcessor.php index e269a3f0a..381ea256a 100644 --- a/tests/Integration/Outbox/Processor/SendEmailProcessor.php +++ b/tests/Integration/Outbox/Processor/SendEmailProcessor.php @@ -4,19 +4,16 @@ namespace Patchlevel\EventSourcing\Tests\Integration\Outbox\Processor; -use Patchlevel\EventSourcing\EventBus\Listener; +use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\Tests\Integration\Outbox\Events\ProfileCreated; use Patchlevel\EventSourcing\Tests\Integration\Outbox\SendEmailMock; -final class SendEmailProcessor implements Listener +final class SendEmailProcessor { + #[Subscribe(ProfileCreated::class)] public function __invoke(Message $message): void { - if (!$message->event() instanceof ProfileCreated) { - return; - } - SendEmailMock::send(); } } diff --git a/tests/Integration/Pipeline/PipelineChangeStoreTest.php b/tests/Integration/Pipeline/PipelineChangeStoreTest.php index df3b0b993..e8ff19beb 100644 --- a/tests/Integration/Pipeline/PipelineChangeStoreTest.php +++ b/tests/Integration/Pipeline/PipelineChangeStoreTest.php @@ -75,8 +75,8 @@ public function testSuccessful(): void $newSchemaDirector->create(); - $oldRepository = new DefaultRepository($oldStore, new DefaultEventBus(), Profile::metadata()); - $newRepository = new DefaultRepository($newStore, new DefaultEventBus(), Profile::metadata()); + $oldRepository = new DefaultRepository($oldStore, DefaultEventBus::create(), Profile::metadata()); + $newRepository = new DefaultRepository($newStore, DefaultEventBus::create(), Profile::metadata()); $profileId = ProfileId::fromString('1'); $profile = Profile::create($profileId); diff --git a/tests/Integration/Projectionist/ProjectionistTest.php b/tests/Integration/Projectionist/ProjectionistTest.php index a1f84dec3..013791a04 100644 --- a/tests/Integration/Projectionist/ProjectionistTest.php +++ b/tests/Integration/Projectionist/ProjectionistTest.php @@ -54,7 +54,7 @@ public function testAsync(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), $store, - new DefaultEventBus(), + DefaultEventBus::create(), ); $repository = $manager->get(Profile::class); @@ -119,7 +119,7 @@ public function testSync(): void $aggregateRegistry, $store, new SyncProjectionistEventBusWrapper( - new DefaultEventBus(), + DefaultEventBus::create(), $projectionist, new LockFactory($lockStore), ), diff --git a/tests/Unit/EventBus/AttributeListenerProviderTest.php b/tests/Unit/EventBus/AttributeListenerProviderTest.php new file mode 100644 index 000000000..3138f63c2 --- /dev/null +++ b/tests/Unit/EventBus/AttributeListenerProviderTest.php @@ -0,0 +1,115 @@ +listenersForEvent($event); + + self::assertSame([], $listeners); + } + + public function testProvideMatchOneListener(): void + { + $listener = new class { + #[Subscribe(ProfileCreated::class)] + public function foo(Message $message): void + { + } + + #[Subscribe(ProfileVisited::class)] + public function bar(Message $message): void + { + } + }; + + $event = new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('info@patchlevel.de'), + ); + + $eventBus = new AttributeListenerProvider([$listener]); + $listeners = $eventBus->listenersForEvent($event); + + self::assertEquals([new ListenerDescriptor($listener->foo(...))], $listeners); + } + + public function testFindMultipleMethods(): void + { + $listener = new class { + #[Subscribe(ProfileCreated::class)] + public function foo(Message $message): void + { + } + + #[Subscribe(ProfileCreated::class)] + public function bar(Message $message): void + { + } + }; + + $event = new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('info@patchlevel.de'), + ); + + $eventBus = new AttributeListenerProvider([$listener]); + $listeners = $eventBus->listenersForEvent($event); + + self::assertEquals([ + new ListenerDescriptor($listener->foo(...)), + new ListenerDescriptor($listener->bar(...)), + ], $listeners); + } + + public function testMultipleListener(): void + { + $listener1 = new class { + #[Subscribe(ProfileCreated::class)] + public function __invoke(Message $message): void + { + } + }; + + $listener2 = new class { + #[Subscribe(ProfileCreated::class)] + public function __invoke(Message $message): void + { + } + }; + + $event = new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('info@patchlevel.de'), + ); + + $eventBus = new AttributeListenerProvider([$listener1, $listener2]); + $listeners = $eventBus->listenersForEvent($event); + + self::assertEquals([ + new ListenerDescriptor($listener1->__invoke(...)), + new ListenerDescriptor($listener2->__invoke(...)), + ], $listeners); + } +} diff --git a/tests/Unit/EventBus/DefaultEventBusTest.php b/tests/Unit/EventBus/DefaultEventBusTest.php index 4f82d5b9a..51dcf5084 100644 --- a/tests/Unit/EventBus/DefaultEventBusTest.php +++ b/tests/Unit/EventBus/DefaultEventBusTest.php @@ -5,7 +5,8 @@ namespace Patchlevel\EventSourcing\Tests\Unit\EventBus; use Patchlevel\EventSourcing\EventBus\DefaultEventBus; -use Patchlevel\EventSourcing\EventBus\Listener; +use Patchlevel\EventSourcing\EventBus\ListenerDescriptor; +use Patchlevel\EventSourcing\EventBus\ListenerProvider; use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; use Patchlevel\EventSourcing\Tests\Unit\Fixture\NameChanged; @@ -13,15 +14,18 @@ use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\TestCase; +use Prophecy\PhpUnit\ProphecyTrait; use function microtime; /** @covers \Patchlevel\EventSourcing\EventBus\DefaultEventBus */ final class DefaultEventBusTest extends TestCase { + use ProphecyTrait; + public function testDispatchEvent(): void { - $listener = new class implements Listener { + $listener = new class { public Message|null $message = null; public function __invoke(Message $message): void @@ -37,7 +41,10 @@ public function __invoke(Message $message): void ), ); - $eventBus = new DefaultEventBus([$listener]); + $provider = $this->prophesize(ListenerProvider::class); + $provider->listenersForEvent($message->event())->willReturn([new ListenerDescriptor($listener->__invoke(...))]); + + $eventBus = new DefaultEventBus($provider->reveal()); $eventBus->dispatch($message); self::assertSame($message, $listener->message); @@ -45,7 +52,7 @@ public function __invoke(Message $message): void public function testDispatchMultipleMessages(): void { - $listener = new class implements Listener { + $listener = new class { /** @var list */ public array $message = []; @@ -69,7 +76,11 @@ public function __invoke(Message $message): void ), ); - $eventBus = new DefaultEventBus([$listener]); + $provider = $this->prophesize(ListenerProvider::class); + $provider->listenersForEvent($message1->event())->willReturn([new ListenerDescriptor($listener->__invoke(...))]); + $provider->listenersForEvent($message2->event())->willReturn([new ListenerDescriptor($listener->__invoke(...))]); + + $eventBus = new DefaultEventBus($provider->reveal()); $eventBus->dispatch($message1, $message2); self::assertCount(2, $listener->message); @@ -77,47 +88,30 @@ public function __invoke(Message $message): void self::assertSame($message2, $listener->message[1]); } - public function testDynamicListener(): void + public function testSynchroneEvents(): void { - $listener = new class implements Listener { - public Message|null $message = null; - - public function __invoke(Message $message): void - { - $this->message = $message; - } - }; - - $message = new Message( + $messageA = new Message( new ProfileCreated( ProfileId::fromString('1'), Email::fromString('info@patchlevel.de'), ), ); - $eventBus = new DefaultEventBus(); - $eventBus->addListener($listener); - $eventBus->dispatch($message); - - self::assertSame($message, $listener->message); - } - - public function testSynchroneEvents(): void - { - $messageA = new Message( - new ProfileCreated( + $messageB = new Message( + new ProfileVisited( ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), ), ); - $eventBus = new DefaultEventBus(); + $provider = $this->prophesize(ListenerProvider::class); + $eventBus = new DefaultEventBus($provider->reveal()); - $listenerA = new class ($eventBus) implements Listener { + $listenerA = new class ($eventBus, $messageB) { public float|null $time = null; public function __construct( private DefaultEventBus $bus, + private Message $message, ) { } @@ -127,19 +121,13 @@ public function __invoke(Message $message): void return; } - $messageB = new Message( - new ProfileVisited( - ProfileId::fromString('1'), - ), - ); - - $this->bus->dispatch($messageB); + $this->bus->dispatch($this->message); $this->time = microtime(true); } }; - $listenerB = new class implements Listener { + $listenerB = new class { public float|null $time = null; public function __invoke(Message $message): void @@ -152,8 +140,8 @@ public function __invoke(Message $message): void } }; - $eventBus->addListener($listenerA); - $eventBus->addListener($listenerB); + $provider->listenersForEvent($messageA->event())->willReturn([new ListenerDescriptor($listenerA->__invoke(...))]); + $provider->listenersForEvent($messageB->event())->willReturn([new ListenerDescriptor($listenerB->__invoke(...))]); $eventBus->dispatch($messageA); @@ -178,13 +166,22 @@ public function testMultipleMessagesAddingNewEventInListener(): void ), ); - $eventBus = new DefaultEventBus(); + $messageC = new Message( + new NameChanged( + 'name', + ), + ); - $listenerA = new class ($eventBus) implements Listener { + $provider = $this->prophesize(ListenerProvider::class); + $eventBus = new DefaultEventBus($provider->reveal()); + + $listenerA = new class ($eventBus, $messageC) { public float|null $time = null; - public function __construct(private DefaultEventBus $bus) - { + public function __construct( + private DefaultEventBus $bus, + private Message $message, + ) { } public function __invoke(Message $message): void @@ -193,19 +190,13 @@ public function __invoke(Message $message): void return; } - $messageB = new Message( - new NameChanged( - 'name', - ), - ); - - $this->bus->dispatch($messageB); + $this->bus->dispatch($this->message); $this->time = microtime(true); } }; - $listenerB = new class implements Listener { + $listenerB = new class { public float|null $time = null; public function __invoke(Message $message): void @@ -218,8 +209,9 @@ public function __invoke(Message $message): void } }; - $eventBus->addListener($listenerA); - $eventBus->addListener($listenerB); + $provider->listenersForEvent($messageA->event())->willReturn([new ListenerDescriptor($listenerA->__invoke(...))]); + $provider->listenersForEvent($messageB->event())->willReturn([]); + $provider->listenersForEvent($messageC->event())->willReturn([new ListenerDescriptor($listenerB->__invoke(...))]); $eventBus->dispatch($messageA, $messageB); diff --git a/tests/Unit/EventBus/ListenerDescriptorTest.php b/tests/Unit/EventBus/ListenerDescriptorTest.php new file mode 100644 index 000000000..361d4665c --- /dev/null +++ b/tests/Unit/EventBus/ListenerDescriptorTest.php @@ -0,0 +1,48 @@ +__invoke(...)); + + self::assertEquals($listener->__invoke(...), $descriptor->callable()); + self::assertEquals('Patchlevel\EventSourcing\Tests\Unit\Fixture\DummyListener::__invoke', $descriptor->name()); + } + + public function testAnonymousFunction(): void + { + $listener = static function (): void { + }; + + $descriptor = new ListenerDescriptor($listener(...)); + + self::assertEquals($listener(...), $descriptor->callable()); + self::assertEquals('Closure', $descriptor->name()); + } + + public function testAnonymousClass(): void + { + $listener = new class { + public function __invoke(): void + { + } + }; + + $descriptor = new ListenerDescriptor($listener->__invoke(...)); + + self::assertEquals($listener->__invoke(...), $descriptor->callable()); + self::assertStringContainsString('class@anonymous', $descriptor->name()); + self::assertStringContainsString(__FILE__, $descriptor->name()); + } +} diff --git a/tests/Unit/EventBus/SubscriberTest.php b/tests/Unit/EventBus/SubscriberTest.php deleted file mode 100644 index 52131b6ce..000000000 --- a/tests/Unit/EventBus/SubscriberTest.php +++ /dev/null @@ -1,172 +0,0 @@ -message = $message; - } - }; - - $message = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $eventBus = new DefaultEventBus([$subscriber]); - $eventBus->dispatch($message); - - self::assertSame($message, $subscriber->message); - } - - public function testSubscribeWrongEvent(): void - { - $subscriber = new class extends Subscriber { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void - { - $this->message = $message; - } - }; - - $message = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $eventBus = new DefaultEventBus([$subscriber]); - $eventBus->dispatch($message); - - self::assertNull($subscriber->message); - } - - public function testSubscribeMultipleEvents(): void - { - $subscriber = new class extends Subscriber { - public Message|null $a = null; - public Message|null $b = null; - - #[Subscribe(ProfileCreated::class)] - public function handleA(Message $message): void - { - $this->a = $message; - } - - #[Subscribe(ProfileVisited::class)] - public function handleB(Message $message): void - { - $this->b = $message; - } - }; - - $message1 = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $message2 = new Message( - new ProfileVisited( - ProfileId::fromString('1'), - ), - ); - - $eventBus = new DefaultEventBus([$subscriber]); - $eventBus->dispatch($message1, $message2); - - self::assertSame($message1, $subscriber->a); - self::assertSame($message2, $subscriber->b); - } - - public function testSubscribeMultipleEventsOnSameMethod(): void - { - $subscriber = new class extends Subscriber { - /** @var list */ - public array $messages = []; - - #[Subscribe(ProfileCreated::class)] - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void - { - $this->messages[] = $message; - } - }; - - $message1 = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $message2 = new Message( - new ProfileVisited( - ProfileId::fromString('1'), - ), - ); - - $eventBus = new DefaultEventBus([$subscriber]); - $eventBus->dispatch($message1, $message2); - - self::assertCount(2, $subscriber->messages); - self::assertSame($message1, $subscriber->messages[0]); - self::assertSame($message2, $subscriber->messages[1]); - } - - public function testDuplicatedEvents(): void - { - $this->expectException(DuplicateSubscribeMethod::class); - - $subscriber = new class extends Subscriber { - #[Subscribe(ProfileCreated::class)] - public function handleA(Message $message): void - { - } - - #[Subscribe(ProfileCreated::class)] - public function handleB(Message $message): void - { - } - }; - - $message = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $eventBus = new DefaultEventBus(); - $eventBus->addListener($subscriber); - $eventBus->dispatch($message); - } -} diff --git a/tests/Unit/EventBus/SymfonyEventBusTest.php b/tests/Unit/EventBus/SymfonyEventBusTest.php deleted file mode 100644 index 8ccff282b..000000000 --- a/tests/Unit/EventBus/SymfonyEventBusTest.php +++ /dev/null @@ -1,112 +0,0 @@ -prophesize(MessageBusInterface::class); - $symfony->dispatch(Argument::that(static function ($envelope) use ($message) { - if (!$envelope instanceof Envelope) { - return false; - } - - return $envelope->getMessage() === $message; - }))->willReturn($envelope)->shouldBeCalled(); - - $eventBus = new SymfonyEventBus($symfony->reveal()); - $eventBus->dispatch($message); - } - - public function testDispatchMultipleMessages(): void - { - $message1 = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $message2 = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $envelope1 = new Envelope($message1); - - $symfony = $this->prophesize(MessageBusInterface::class); - $symfony->dispatch(Argument::that(static function ($envelope1) use ($message1) { - if (!$envelope1 instanceof Envelope) { - return false; - } - - return $envelope1->getMessage() === $message1; - }))->willReturn($envelope1)->shouldBeCalled(); - - $envelope2 = new Envelope($message2); - - $symfony->dispatch(Argument::that(static function ($envelope2) use ($message2) { - if (!$envelope2 instanceof Envelope) { - return false; - } - - return $envelope2->getMessage() === $message2; - }))->willReturn($envelope2)->shouldBeCalled(); - - $eventBus = new SymfonyEventBus($symfony->reveal()); - $eventBus->dispatch($message1, $message2); - } - - public function testDefaultEventBus(): void - { - $listener = new class implements Listener { - public Message|null $message = null; - - public function __invoke(Message $message): void - { - $this->message = $message; - } - }; - - $message = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('info@patchlevel.de'), - ), - ); - - $eventBus = SymfonyEventBus::create([$listener]); - $eventBus->dispatch($message); - - self::assertSame($message, $listener->message); - } -} diff --git a/tests/Unit/Fixture/DummyListener.php b/tests/Unit/Fixture/DummyListener.php new file mode 100644 index 000000000..f496044c4 --- /dev/null +++ b/tests/Unit/Fixture/DummyListener.php @@ -0,0 +1,12 @@ +prophesize(EventBus::class); + $eventBus->dispatch($message)->shouldBeCalled(); + $client = $this->prophesize(WatchServerClient::class); $client->send($message)->shouldBeCalled(); - $listener = new WatchListener($client->reveal()); - $listener->__invoke($message); + $wrapper = new WatchEventBusWrapper( + $eventBus->reveal(), + $client->reveal(), + ); + + $wrapper->dispatch($message); } public function testIgnoreErrors(): void { $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); + $eventBus = $this->prophesize(EventBus::class); + $eventBus->dispatch($message)->shouldBeCalled(); + $client = $this->prophesize(WatchServerClient::class); $client->send($message)->shouldBeCalled()->willThrow(SendingFailed::class); - $listener = new WatchListener($client->reveal()); - $listener->__invoke($message); + $wrapper = new WatchEventBusWrapper( + $eventBus->reveal(), + $client->reveal(), + ); + + $wrapper->dispatch($message); } }