From 230e6d9651f42542a6a1f28aaf746ce3229a52ec Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 6 Oct 2024 10:13:14 +0200 Subject: [PATCH 1/6] filter events in subscription engine --- src/Store/Criteria/EventsCriterion.php | 14 ++++ src/Store/DoctrineDbalStore.php | 6 ++ src/Store/StreamDoctrineDbalStore.php | 6 ++ .../Engine/DefaultSubscriptionEngine.php | 64 ++++++++++++++++++- .../Subscriber/MetadataSubscriberAccessor.php | 7 ++ tests/Benchmark/SubscriptionEngineBench.php | 3 + .../Subscription/SubscriptionTest.php | 2 + 7 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 src/Store/Criteria/EventsCriterion.php diff --git a/src/Store/Criteria/EventsCriterion.php b/src/Store/Criteria/EventsCriterion.php new file mode 100644 index 000000000..310d74614 --- /dev/null +++ b/src/Store/Criteria/EventsCriterion.php @@ -0,0 +1,14 @@ + */ + public readonly array $events, + ) { + } +} diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 1916e025a..28a9a977b 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Store; use Closure; +use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Platforms\MariaDBPlatform; @@ -26,6 +27,7 @@ use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use PDO; @@ -156,6 +158,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void $builder->andWhere('id > :index'); $builder->setParameter('index', $criterion->fromIndex, Types::INTEGER); break; + case EventsCriterion::class: + $builder->andWhere('event IN (:events)'); + $builder->setParameter('events', $criterion->events, ArrayParameterType::STRING); + break; default: throw new UnsupportedCriterion($criterion::class); } diff --git a/src/Store/StreamDoctrineDbalStore.php b/src/Store/StreamDoctrineDbalStore.php index 703c561fb..04b0ff704 100644 --- a/src/Store/StreamDoctrineDbalStore.php +++ b/src/Store/StreamDoctrineDbalStore.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Store; use Closure; +use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Platforms\MariaDBPlatform; @@ -24,6 +25,7 @@ use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; @@ -169,6 +171,10 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void $builder->andWhere('id > :index'); $builder->setParameter('index', $criterion->fromIndex, Types::INTEGER); break; + case EventsCriterion::class: + $builder->andWhere('event IN (:events)'); + $builder->setParameter('events', $criterion->events, ArrayParameterType::STRING); + break; default: throw new UnsupportedCriterion($criterion::class); } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index cdb4b6b2c..bc43edff5 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -5,7 +5,9 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; use Patchlevel\EventSourcing\Message\Message; +use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; @@ -15,6 +17,7 @@ use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria; use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; +use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\RealSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; @@ -22,6 +25,7 @@ use Psr\Log\LoggerInterface; use Throwable; +use function array_keys; use function count; use function in_array; use function sprintf; @@ -41,6 +45,7 @@ public function __construct( private readonly SubscriberAccessorRepository $subscriberRepository, private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(), private readonly LoggerInterface|null $logger = null, + private readonly EventMetadataFactory|null $eventMetadataFactory = null, ) { $this->subscriptionManager = new SubscriptionManager($subscriptionStore); } @@ -193,9 +198,15 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $stream = $this->messageStore->load( - new Criteria(new FromIndexCriterion($startIndex)), - ); + $criteria = new Criteria(new FromIndexCriterion($startIndex)); + + $events = $this->events($subscriptions); + + if ($events) { + $criteria = $criteria->add(new EventsCriterion($events)); + } + + $stream = $this->messageStore->load($criteria); foreach ($stream as $message) { $messageCounter++; @@ -367,6 +378,13 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { try { $criteria = new Criteria(new FromIndexCriterion($startIndex)); + + $events = $this->events($subscriptions); + + if ($events) { + $criteria = $criteria->add(new EventsCriterion($events)); + } + $stream = $this->messageStore->load($criteria); foreach ($stream as $message) { @@ -1113,4 +1131,44 @@ private function shouldCommitBatch(Subscription $subscription): bool return $this->batching[$subscription->id()]->forceCommit(); } + + /** + * @param list $subscriptions + * + * @return list + */ + private function events(array $subscriptions): array + { + if ($this->eventMetadataFactory === null) { + return []; + } + + $eventNames = []; + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriber($subscription->id()); + + if (!$subscriber instanceof MetadataSubscriberAccessor) { + return []; + } + + $events = $subscriber->events(); + + foreach ($events as $event) { + if ($event === '*') { + return []; + } + + $metadata = $this->eventMetadataFactory->metadata($event); + + $eventNames[$metadata->name] = true; + + foreach ($metadata->aliases as $alias) { + $eventNames[$alias] = true; + } + } + } + + return array_keys($eventNames); + } } diff --git a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php index ab1ac328d..42336abee 100644 --- a/src/Subscription/Subscriber/MetadataSubscriberAccessor.php +++ b/src/Subscription/Subscriber/MetadataSubscriberAccessor.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\ArgumentResolver; use function array_key_exists; +use function array_keys; use function array_map; use function array_merge; @@ -84,6 +85,12 @@ public function teardownMethod(): Closure|null return $this->subscriber->$method(...); } + /** @return list */ + public function events(): array + { + return array_keys($this->metadata->subscribeMethods); + } + /** * @param class-string $eventClass * diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index fe5fdfde2..2dc0631de 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Tests\Benchmark; use Patchlevel\EventSourcing\Aggregate\AggregateRootId; +use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; @@ -63,6 +64,7 @@ public function setUp(): void $profile = Profile::create($this->id, 'Peter'); for ($i = 1; $i < 10_000; $i++) { + $profile->changeEmail('peter' . $i . '@example.com'); $profile->changeName('Peter ' . $i); } @@ -77,6 +79,7 @@ public function setUp(): void new SendEmailProcessor(), ], ), + eventMetadataFactory: new AttributeEventMetadataFactory(), ); } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index ede3f43b2..98f81c8b5 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\Clock\FrozenClock; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; +use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -96,6 +97,7 @@ public function testHappyPath(): void $store, $subscriptionStore, new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]), + eventMetadataFactory: new AttributeEventMetadataFactory(), ); self::assertEquals( From acf42881363dc2314ea0c88cb5f297cd7a11066c Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 6 Oct 2024 15:55:54 +0200 Subject: [PATCH 2/6] introduce message loader in subscription engine --- .../Engine/DefaultMessageLoader.php | 32 +++++++ .../Engine/DefaultSubscriptionEngine.php | 88 +++---------------- .../Engine/EventFilteredMessageLoader.php | 84 ++++++++++++++++++ src/Subscription/Engine/MessageLoader.php | 16 ++++ tests/Benchmark/SubscriptionEngineBench.php | 22 +++-- .../Subscription/SubscriptionTest.php | 8 +- 6 files changed, 164 insertions(+), 86 deletions(-) create mode 100644 src/Subscription/Engine/DefaultMessageLoader.php create mode 100644 src/Subscription/Engine/EventFilteredMessageLoader.php create mode 100644 src/Subscription/Engine/MessageLoader.php diff --git a/src/Subscription/Engine/DefaultMessageLoader.php b/src/Subscription/Engine/DefaultMessageLoader.php new file mode 100644 index 000000000..87c539786 --- /dev/null +++ b/src/Subscription/Engine/DefaultMessageLoader.php @@ -0,0 +1,32 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream + { + return $this->store->load(new Criteria(new FromIndexCriterion($startIndex))); + } + + public function lastIndex(): int + { + $stream = $this->store->load(null, 1, null, true); + + return $stream->index() ?: 0; + } +} diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index bc43edff5..4e8487d9e 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -5,10 +5,6 @@ namespace Patchlevel\EventSourcing\Subscription\Engine; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory; -use Patchlevel\EventSourcing\Store\Criteria\Criteria; -use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; -use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy; @@ -17,7 +13,6 @@ use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria; use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; -use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\RealSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; @@ -25,7 +20,6 @@ use Psr\Log\LoggerInterface; use Throwable; -use function array_keys; use function count; use function in_array; use function sprintf; @@ -39,14 +33,21 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine /** @var array */ private array $batching = []; + private readonly MessageLoader $messageLoader; + public function __construct( - private readonly Store $messageStore, + Store|MessageLoader $messageStore, SubscriptionStore $subscriptionStore, private readonly SubscriberAccessorRepository $subscriberRepository, private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(), private readonly LoggerInterface|null $logger = null, - private readonly EventMetadataFactory|null $eventMetadataFactory = null, ) { + if ($messageStore instanceof MessageLoader) { + $this->messageLoader = $messageStore; + } else { + $this->messageLoader = new DefaultMessageLoader($messageStore); + } + $this->subscriptionManager = new SubscriptionManager($subscriptionStore); } @@ -77,7 +78,7 @@ function (SubscriptionCollection $subscriptions) use ($skipBooting): Result { /** @var list $errors */ $errors = []; - $latestIndex = $this->latestIndex(); + $latestIndex = $this->messageLoader->lastIndex(); foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); @@ -198,15 +199,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); - - $events = $this->events($subscriptions); - - if ($events) { - $criteria = $criteria->add(new EventsCriterion($events)); - } - - $stream = $this->messageStore->load($criteria); + $stream = $this->messageLoader->load($startIndex, $subscriptions); foreach ($stream as $message) { $messageCounter++; @@ -377,15 +370,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); - - $events = $this->events($subscriptions); - - if ($events) { - $criteria = $criteria->add(new EventsCriterion($events)); - } - - $stream = $this->messageStore->load($criteria); + $stream = $this->messageLoader->load($startIndex, $subscriptions); foreach ($stream as $message) { $messageCounter++; @@ -984,7 +969,7 @@ private function discoverNewSubscriptions(): void if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) { if ($latestIndex === null) { - $latestIndex = $this->latestIndex(); + $latestIndex = $this->messageLoader->lastIndex(); } $subscription->changePosition($latestIndex); @@ -1004,13 +989,6 @@ private function discoverNewSubscriptions(): void $this->subscriptionManager->flush(); } - private function latestIndex(): int - { - $stream = $this->messageStore->load(null, 1, null, true); - - return $stream->index() ?: 0; - } - private function handleError(Subscription $subscription, Throwable $throwable): void { $subscription->error($throwable); @@ -1131,44 +1109,4 @@ private function shouldCommitBatch(Subscription $subscription): bool return $this->batching[$subscription->id()]->forceCommit(); } - - /** - * @param list $subscriptions - * - * @return list - */ - private function events(array $subscriptions): array - { - if ($this->eventMetadataFactory === null) { - return []; - } - - $eventNames = []; - - foreach ($subscriptions as $subscription) { - $subscriber = $this->subscriber($subscription->id()); - - if (!$subscriber instanceof MetadataSubscriberAccessor) { - return []; - } - - $events = $subscriber->events(); - - foreach ($events as $event) { - if ($event === '*') { - return []; - } - - $metadata = $this->eventMetadataFactory->metadata($event); - - $eventNames[$metadata->name] = true; - - foreach ($metadata->aliases as $alias) { - $eventNames[$alias] = true; - } - } - } - - return array_keys($eventNames); - } } diff --git a/src/Subscription/Engine/EventFilteredMessageLoader.php b/src/Subscription/Engine/EventFilteredMessageLoader.php new file mode 100644 index 000000000..9d1370589 --- /dev/null +++ b/src/Subscription/Engine/EventFilteredMessageLoader.php @@ -0,0 +1,84 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream + { + $criteria = new Criteria(new FromIndexCriterion($startIndex)); + + $events = $this->events($subscriptions); + + if ($events !== []) { + $criteria = $criteria->add(new EventsCriterion($events)); + } + + return $this->store->load($criteria); + } + + /** + * @param list $subscriptions + * + * @return list + */ + private function events(array $subscriptions): array + { + $eventNames = []; + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber instanceof MetadataSubscriberAccessor) { + return []; + } + + $events = $subscriber->events(); + + foreach ($events as $event) { + if ($event === '*') { + return []; + } + + $metadata = $this->eventMetadataFactory->metadata($event); + + $eventNames[$metadata->name] = true; + + foreach ($metadata->aliases as $alias) { + $eventNames[$alias] = true; + } + } + } + + return array_keys($eventNames); + } + + public function lastIndex(): int + { + $stream = $this->store->load(null, 1, null, true); + + return $stream->index() ?: 0; + } +} diff --git a/src/Subscription/Engine/MessageLoader.php b/src/Subscription/Engine/MessageLoader.php new file mode 100644 index 000000000..1bdf5a655 --- /dev/null +++ b/src/Subscription/Engine/MessageLoader.php @@ -0,0 +1,16 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream; + + public function lastIndex(): int; +} diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index 2dc0631de..4f41d0008 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -14,6 +14,7 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -70,16 +71,21 @@ public function setUp(): void $this->repository->save($profile); + $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository( + [ + new ProfileProjector($connection), + new SendEmailProcessor(), + ], + ); + $this->subscriptionEngine = new DefaultSubscriptionEngine( - $this->store, - $subscriptionStore, - new MetadataSubscriberAccessorRepository( - [ - new ProfileProjector($connection), - new SendEmailProcessor(), - ], + new EventFilteredMessageLoader( + $this->store, + new AttributeEventMetadataFactory(), + $subscriberAccessorRepository, ), - eventMetadataFactory: new AttributeEventMetadataFactory(), + $subscriptionStore, + $subscriberAccessorRepository, ); } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 98f81c8b5..e5b9200c7 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -22,6 +22,7 @@ use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -93,11 +94,12 @@ public function testHappyPath(): void $schemaDirector->create(); + $subscriberRepository = new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]); + $engine = new DefaultSubscriptionEngine( - $store, + new EventFilteredMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), $subscriptionStore, - new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]), - eventMetadataFactory: new AttributeEventMetadataFactory(), + $subscriberRepository, ); self::assertEquals( From da148588f345da74abbc6540c6c2e45d82dad7b2 Mon Sep 17 00:00:00 2001 From: David Badura Date: Thu, 5 Dec 2024 16:26:05 +0100 Subject: [PATCH 3/6] rename message loader classes --- src/Subscription/Engine/DefaultSubscriptionEngine.php | 2 +- ...dMessageLoader.php => EventFilteredStoreMessageLoader.php} | 2 +- .../{DefaultMessageLoader.php => StoreMessageLoader.php} | 2 +- tests/Benchmark/SubscriptionEngineBench.php | 4 ++-- tests/Integration/Subscription/SubscriptionTest.php | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) rename src/Subscription/Engine/{EventFilteredMessageLoader.php => EventFilteredStoreMessageLoader.php} (97%) rename src/Subscription/Engine/{DefaultMessageLoader.php => StoreMessageLoader.php} (93%) diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 4e8487d9e..986c852b1 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -45,7 +45,7 @@ public function __construct( if ($messageStore instanceof MessageLoader) { $this->messageLoader = $messageStore; } else { - $this->messageLoader = new DefaultMessageLoader($messageStore); + $this->messageLoader = new StoreMessageLoader($messageStore); } $this->subscriptionManager = new SubscriptionManager($subscriptionStore); diff --git a/src/Subscription/Engine/EventFilteredMessageLoader.php b/src/Subscription/Engine/EventFilteredStoreMessageLoader.php similarity index 97% rename from src/Subscription/Engine/EventFilteredMessageLoader.php rename to src/Subscription/Engine/EventFilteredStoreMessageLoader.php index 9d1370589..217588d40 100644 --- a/src/Subscription/Engine/EventFilteredMessageLoader.php +++ b/src/Subscription/Engine/EventFilteredStoreMessageLoader.php @@ -16,7 +16,7 @@ use function array_keys; -final class EventFilteredMessageLoader implements MessageLoader +final class EventFilteredStoreMessageLoader implements MessageLoader { public function __construct( private readonly Store $store, diff --git a/src/Subscription/Engine/DefaultMessageLoader.php b/src/Subscription/Engine/StoreMessageLoader.php similarity index 93% rename from src/Subscription/Engine/DefaultMessageLoader.php rename to src/Subscription/Engine/StoreMessageLoader.php index 87c539786..eae336dd1 100644 --- a/src/Subscription/Engine/DefaultMessageLoader.php +++ b/src/Subscription/Engine/StoreMessageLoader.php @@ -10,7 +10,7 @@ use Patchlevel\EventSourcing\Store\Stream; use Patchlevel\EventSourcing\Subscription\Subscription; -final class DefaultMessageLoader implements MessageLoader +final class StoreMessageLoader implements MessageLoader { public function __construct( private readonly Store $store, diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index 4f41d0008..37d2d7638 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -14,7 +14,7 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; -use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -79,7 +79,7 @@ public function setUp(): void ); $this->subscriptionEngine = new DefaultSubscriptionEngine( - new EventFilteredMessageLoader( + new EventFilteredStoreMessageLoader( $this->store, new AttributeEventMetadataFactory(), $subscriberAccessorRepository, diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index e5b9200c7..088f7ddf1 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -22,7 +22,7 @@ use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; -use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -97,7 +97,7 @@ public function testHappyPath(): void $subscriberRepository = new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]); $engine = new DefaultSubscriptionEngine( - new EventFilteredMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), $subscriptionStore, $subscriberRepository, ); From 6a235c5e3bd43bc2b72a17048a5f5f9500ee03f8 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 8 Dec 2024 12:56:00 +0100 Subject: [PATCH 4/6] fix deptrac --- deptrac.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/deptrac.yaml b/deptrac.yaml index b405ba863..b793814f6 100644 --- a/deptrac.yaml +++ b/deptrac.yaml @@ -154,6 +154,7 @@ deptrac: - Attribute - Clock - Message + - MetadataEvent - MetadataSubscriber - Repository - Schema From ba4958e1170160971a5908a68e338654373d08b6 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 8 Dec 2024 13:04:05 +0100 Subject: [PATCH 5/6] fix merge conflict --- src/Subscription/Engine/DefaultSubscriptionEngine.php | 4 ++-- src/Subscription/Engine/SubscriptionCollection.php | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 986c852b1..aa73984ed 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -199,7 +199,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $stream = $this->messageLoader->load($startIndex, $subscriptions); + $stream = $this->messageLoader->load($startIndex, $subscriptions->toArray()); foreach ($stream as $message) { $messageCounter++; @@ -370,7 +370,7 @@ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $stream = $this->messageLoader->load($startIndex, $subscriptions); + $stream = $this->messageLoader->load($startIndex, $subscriptions->toArray()); foreach ($stream as $message) { $messageCounter++; diff --git a/src/Subscription/Engine/SubscriptionCollection.php b/src/Subscription/Engine/SubscriptionCollection.php index f1ed60b55..a8fba233c 100644 --- a/src/Subscription/Engine/SubscriptionCollection.php +++ b/src/Subscription/Engine/SubscriptionCollection.php @@ -64,4 +64,10 @@ public function lowestPosition(): int return $min; } + + /** @return list */ + public function toArray(): array + { + return $this->subscriptions; + } } From be1020bf51a3ee512f5d7bf0d02cab211c522731 Mon Sep 17 00:00:00 2001 From: David Badura Date: Thu, 12 Dec 2024 20:40:48 +0100 Subject: [PATCH 6/6] add docs & missing events function in criteria builder --- .../our-backward-compatibility-promise.md | 1 + docs/pages/store.md | 3 ++ docs/pages/subscription.md | 46 +++++++++++++++++-- src/Store/Criteria/CriteriaBuilder.php | 15 ++++++ .../Store/Crtieria/CriteriaBuilderTest.php | 3 ++ 5 files changed, 64 insertions(+), 4 deletions(-) diff --git a/docs/pages/our-backward-compatibility-promise.md b/docs/pages/our-backward-compatibility-promise.md index 70c3ffe9d..70f7f1ffd 100644 --- a/docs/pages/our-backward-compatibility-promise.md +++ b/docs/pages/our-backward-compatibility-promise.md @@ -44,3 +44,4 @@ In our docs the features are marked like this: This feature is still experimental and may change in the future. Use it with caution. + \ No newline at end of file diff --git a/docs/pages/store.md b/docs/pages/store.md index 040cc8fcd..205f54263 100644 --- a/docs/pages/store.md +++ b/docs/pages/store.md @@ -321,6 +321,7 @@ use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion; use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion; use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; $criteria = new Criteria( @@ -329,6 +330,7 @@ $criteria = new Criteria( new FromPlayheadCriterion(2), new FromIndexCriterion(100), new ArchivedCriterion(true), + new EventsCriterion(['profile.created', 'profile.name_changed']), ); ``` Or you can the criteria builder to create the criteria. @@ -342,6 +344,7 @@ $criteria = (new CriteriaBuilder()) ->fromPlayhead(2) ->fromIndex(100) ->archived(true) + ->events(['profile.created', 'profile.name_changed']) ->build(); ``` #### Stream diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index b7e1335dc..52f8731b8 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -645,6 +645,44 @@ There are two options here: In order for the subscription engine to be able to do its work, you have to assemble it beforehand. +### Message Loader + +The subscription engine needs a message loader to load the messages. +We provide two implementations by default. +Which one has a better performance depends on the use case. + +#### Store Message Loader + +The store message loader loads all the messages from the event store. + +```php +use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Subscription\Engine\StoreMessageLoader; + +/** @var Store $store */ +$messageLoader = new StoreMessageLoader($store); +``` +#### Event Filtered Store Message Loader + +The event filtered store message loader loads only the messages that are relevant for the subscribers. +It looks before loading the messages which subscribers are interested in the events. +Then it loads with a filter only the relevant messages. + +```php +use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader; + +/** + * @var Store $store + * @var EventMetadataFactory $eventMetadataFactory + * @var SubscriberRepository $subscriberRepository + */ +$messageLoader = new EventFilteredStoreMessageLoader( + $store, + $eventMetadataFactory, + $subscriberRepository, +); +``` ### Subscription Store The Subscription Engine uses a subscription store to store the status of each subscription. @@ -730,24 +768,24 @@ $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([ ### Subscription Engine Now we can create the subscription engine and plug together the necessary services. -The event store is needed to load the events, the Subscription Store to store the subscription state +The message loader is needed to load the messages, the Subscription Store to store the subscription state and we need the subscriber accessor repository. Optionally, we can also pass a retry strategy. ```php -use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader; use Patchlevel\EventSourcing\Subscription\RetryStrategy\NoRetryStrategy; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; /** - * @var Store $eventStore + * @var MessageLoader $messageLoader * @var DoctrineSubscriptionStore $subscriptionStore * @var MetadataSubscriberAccessorRepository $subscriberAccessorRepository * @var NoRetryStrategy $retryStrategy */ $subscriptionEngine = new DefaultSubscriptionEngine( - $eventStore, + $messageLoader, $subscriptionStore, $subscriberAccessorRepository, $retryStrategy, diff --git a/src/Store/Criteria/CriteriaBuilder.php b/src/Store/Criteria/CriteriaBuilder.php index 1a45f1e74..850789040 100644 --- a/src/Store/Criteria/CriteriaBuilder.php +++ b/src/Store/Criteria/CriteriaBuilder.php @@ -13,6 +13,9 @@ final class CriteriaBuilder private int|null $fromPlayhead = null; private bool|null $archived = null; + /** @var list|null */ + private array|null $events = null; + /** @experimental */ public function streamName(string|null $streamName): self { @@ -56,6 +59,14 @@ public function archived(bool|null $archived): self return $this; } + /** @param list|null $events */ + public function events(array|null $events): self + { + $this->events = $events; + + return $this; + } + public function build(): Criteria { $criteria = []; @@ -84,6 +95,10 @@ public function build(): Criteria $criteria[] = new ArchivedCriterion($this->archived); } + if ($this->events !== null) { + $criteria[] = new EventsCriterion($this->events); + } + return new Criteria(...$criteria); } } diff --git a/tests/Unit/Store/Crtieria/CriteriaBuilderTest.php b/tests/Unit/Store/Crtieria/CriteriaBuilderTest.php index 6876fdd77..83f6d16b2 100644 --- a/tests/Unit/Store/Crtieria/CriteriaBuilderTest.php +++ b/tests/Unit/Store/Crtieria/CriteriaBuilderTest.php @@ -9,6 +9,7 @@ use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; +use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; use PHPUnit\Framework\TestCase; @@ -33,6 +34,7 @@ public function testFull(): void ->fromIndex(1) ->fromPlayhead(1) ->archived(true) + ->events(['foo', 'bar']) ->build(); self::assertEquals( @@ -42,6 +44,7 @@ public function testFull(): void new FromIndexCriterion(1), new FromPlayheadCriterion(1), new ArchivedCriterion(true), + new EventsCriterion(['foo', 'bar']), ), $criteria, );