diff --git a/src/Console/Command/SubscriptionBootCommand.php b/src/Console/Command/SubscriptionBootCommand.php index e63f864eb..57f087185 100644 --- a/src/Console/Command/SubscriptionBootCommand.php +++ b/src/Console/Command/SubscriptionBootCommand.php @@ -4,15 +4,19 @@ namespace Patchlevel\EventSourcing\Console\Command; +use Closure; use Patchlevel\EventSourcing\Console\InputHelper; +use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; +use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Logger\ConsoleLogger; use Symfony\Component\Console\Output\OutputInterface; #[AsCommand( 'event-sourcing:subscription:boot', - 'Prepare new subscriptions and catch up with the event store', + 'Catch up with the event store.', )] final class SubscriptionBootCommand extends SubscriptionCommand { @@ -22,20 +26,97 @@ public function configure(): void $this ->addOption( - 'limit', + 'run-limit', + null, + InputOption::VALUE_REQUIRED, + 'The maximum number of runs this command should execute', + ) + ->addOption( + 'message-limit', null, InputOption::VALUE_REQUIRED, 'How many messages should be consumed in one run', + 1000, + ) + ->addOption( + 'memory-limit', + null, + InputOption::VALUE_REQUIRED, + 'How much memory consumption should the worker be terminated (e.g. 250MB)', + ) + ->addOption( + 'time-limit', + null, + InputOption::VALUE_REQUIRED, + 'What is the maximum time the worker can run in seconds', + ) + ->addOption( + 'sleep', + null, + InputOption::VALUE_REQUIRED, + 'How much time should elapse before the next job is executed in milliseconds', + ) + ->addOption( + 'setup', + null, + InputOption::VALUE_NONE, + 'Setup new subscriptions', ); } protected function execute(InputInterface $input, OutputInterface $output): int { - $limit = InputHelper::nullablePositiveInt($input->getOption('limit')); + $runLimit = InputHelper::nullablePositiveInt($input->getOption('run-limit')); + $messageLimit = InputHelper::nullablePositiveInt($input->getOption('message-limit')); + $memoryLimit = InputHelper::nullableString($input->getOption('memory-limit')); + $timeLimit = InputHelper::nullablePositiveInt($input->getOption('time-limit')); + $sleep = InputHelper::positiveIntOrZero($input->getOption('sleep')); + $setup = InputHelper::bool($input->getOption('setup')); $criteria = $this->subscriptionEngineCriteria($input); - $this->engine->boot($criteria, $limit); - return 0; + if ($setup) { + $this->engine->setup($criteria); + } + + $logger = new ConsoleLogger($output); + + $finished = false; + + $worker = DefaultWorker::create( + function (Closure $stop) use ($criteria, $messageLimit, &$finished): void { + $this->engine->boot($criteria, $messageLimit); + + if (!$this->isBootingFinished($criteria)) { + return; + } + + $finished = true; + $stop(); + }, + [ + 'runLimit' => $runLimit, + 'memoryLimit' => $memoryLimit, + 'timeLimit' => $timeLimit, + ], + $logger, + ); + + $worker->run($sleep); + + return $finished ? 0 : 1; + } + + private function isBootingFinished(SubscriptionEngineCriteria $criteria): bool + { + $subscriptions = $this->engine->subscriptions($criteria); + + foreach ($subscriptions as $subscription) { + if ($subscription->isBooting()) { + return false; + } + } + + return true; } } diff --git a/src/Console/Command/SubscriptionSetupCommand.php b/src/Console/Command/SubscriptionSetupCommand.php new file mode 100644 index 000000000..1c9f74897 --- /dev/null +++ b/src/Console/Command/SubscriptionSetupCommand.php @@ -0,0 +1,24 @@ +subscriptionEngineCriteria($input); + $this->engine->setup($criteria); + + return 0; + } +} diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 13f66a91e..820121a8c 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -36,6 +36,94 @@ public function __construct( ) { } + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void + { + $criteria ??= new SubscriptionEngineCriteria(); + + $this->logger?->info( + 'Subscription Engine: Start to setup.', + ); + + $this->discoverNewSubscriptions(); + $this->retrySubscriptions($criteria); + + $this->findForUpdate( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::New], + ), + function (array $subscriptions) use ($skipBooting): void { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions to setup, finish setup.'); + + return; + } + + $latestIndex = $this->latestIndex(); + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriber($subscription->id()); + + if (!$subscriber) { + throw SubscriberNotFound::forSubscriptionId($subscription->id()); + } + + $setupMethod = $subscriber->setupMethod(); + + if (!$setupMethod) { + if ($subscription->runMode() === RunMode::FromNow) { + $subscription->changePosition($latestIndex); + $subscription->active(); + } else { + $skipBooting ? $subscription->active() : $subscription->booting(); + } + + $this->subscriptionStore->update($subscription); + + $this->logger?->debug(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has no setup method, set to %s.', + $subscriber::class, + $subscription->id(), + $subscription->runMode() === RunMode::FromNow || $skipBooting ? 'active' : 'booting', + )); + + continue; + } + + try { + $setupMethod(); + + if ($subscription->runMode() === RunMode::FromNow) { + $subscription->changePosition($latestIndex); + $subscription->active(); + } else { + $skipBooting ? $subscription->active() : $subscription->booting(); + } + + $this->subscriptionStore->update($subscription); + + $this->logger?->debug(sprintf( + 'Subscription Engine: For Subscriber "%s" for "%s" the setup method has been executed, set to %s.', + $subscriber::class, + $subscription->id(), + $subscription->runMode() === RunMode::FromNow || $skipBooting ? 'active' : 'booting', + )); + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has an error in the setup method: %s', + $subscriber::class, + $subscription->id(), + $e->getMessage(), + )); + + $this->handleError($subscription, $e); + } + } + }, + ); + } + public function boot( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, @@ -48,7 +136,6 @@ public function boot( $this->discoverNewSubscriptions(); $this->retrySubscriptions($criteria); - $this->setupNewSubscriptions($criteria); $this->findForUpdate( new SubscriptionCriteria( @@ -57,8 +144,6 @@ public function boot( status: [Status::Booting], ), function ($subscriptions) use ($limit): void { - $subscriptions = $this->fastForwardFromNowSubscriptions($subscriptions); - if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); @@ -131,6 +216,8 @@ function ($subscriptions) use ($limit): void { } } } finally { + $stream?->close(); + if ($messageCounter > 0) { foreach ($subscriptions as $subscription) { if (!$subscription->isBooting()) { @@ -140,8 +227,6 @@ function ($subscriptions) use ($limit): void { $this->subscriptionStore->update($subscription); } } - - $stream?->close(); } $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); @@ -262,6 +347,8 @@ function (array $subscriptions) use ($limit): void { } } } finally { + $stream?->close(); + if ($messageCounter > 0) { foreach ($subscriptions as $subscription) { if (!$subscription->isActive()) { @@ -271,8 +358,24 @@ function (array $subscriptions) use ($limit): void { $this->subscriptionStore->update($subscription); } } + } - $stream?->close(); + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } + + if ($subscription->runMode() !== RunMode::Once) { + continue; + } + + $subscription->finished(); + $this->subscriptionStore->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); } $this->logger?->info( @@ -684,108 +787,6 @@ function (array $subscriptions): void { ); } - /** - * @param list $subscriptions - * - * @return list - */ - private function fastForwardFromNowSubscriptions(array $subscriptions): array - { - $latestIndex = null; - $forwardedSubscriptions = []; - - foreach ($subscriptions as $subscription) { - $subscriber = $this->subscriber($subscription->id()); - - if (!$subscriber) { - $forwardedSubscriptions[] = $subscription; - - continue; - } - - if ($subscription->runMode() === RunMode::FromBeginning || $subscription->runMode() === RunMode::Once) { - $forwardedSubscriptions[] = $subscription; - - continue; - } - - if ($latestIndex === null) { - $latestIndex = $this->latestIndex(); - } - - $subscription->changePosition($latestIndex); - $subscription->active(); - $this->subscriptionStore->update($subscription); - - $this->logger?->info( - sprintf( - 'Subscription Engine: Subscriber "%s" for "%s" is in "from now" mode: skip past messages and set to active.', - $subscriber::class, - $subscription->id(), - ), - ); - } - - return $forwardedSubscriptions; - } - - private function setupNewSubscriptions(SubscriptionEngineCriteria $criteria): void - { - $this->findForUpdate( - new SubscriptionCriteria( - ids: $criteria->ids, - groups: $criteria->groups, - status: [Status::New], - ), - function (array $subscriptions): void { - foreach ($subscriptions as $subscription) { - $subscriber = $this->subscriber($subscription->id()); - - if (!$subscriber) { - throw SubscriberNotFound::forSubscriptionId($subscription->id()); - } - - $setupMethod = $subscriber->setupMethod(); - - if (!$setupMethod) { - $subscription->booting(); - $this->subscriptionStore->update($subscription); - - $this->logger?->debug(sprintf( - 'Subscription Engine: Subscriber "%s" for "%s" has no setup method, continue.', - $subscriber::class, - $subscription->id(), - )); - - continue; - } - - try { - $setupMethod(); - - $subscription->booting(); - $this->subscriptionStore->update($subscription); - - $this->logger?->debug(sprintf( - 'Subscription Engine: For Subscriber "%s" for "%s" the setup method has been executed and is now prepared for data.', - $subscriber::class, - $subscription->id(), - )); - } catch (Throwable $e) { - $this->logger?->error(sprintf( - 'Subscription Engine: Subscriber "%s" for "%s" has an error in the setup method: %s', - $subscriber::class, - $subscription->id(), - $e->getMessage(), - )); - - $this->handleError($subscription, $e); - } - } - }, - ); - } - private function discoverNewSubscriptions(): void { $this->findForUpdate( diff --git a/src/Subscription/Engine/SubscriptionEngine.php b/src/Subscription/Engine/SubscriptionEngine.php index 1b3c56a18..0e4229eaa 100644 --- a/src/Subscription/Engine/SubscriptionEngine.php +++ b/src/Subscription/Engine/SubscriptionEngine.php @@ -8,6 +8,8 @@ interface SubscriptionEngine { + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void; + /** * @param positive-int|null $limit * diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index ce6b09f57..dae8be233 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -96,6 +96,7 @@ public function setUp(): void #[Bench\Revs(10)] public function benchHandle10000Events(): void { + $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); $this->subscriptionEngine->remove(); } diff --git a/tests/Integration/BankAccountSplitStream/IntegrationTest.php b/tests/Integration/BankAccountSplitStream/IntegrationTest.php index 9b9bd3c40..8db1b0c30 100644 --- a/tests/Integration/BankAccountSplitStream/IntegrationTest.php +++ b/tests/Integration/BankAccountSplitStream/IntegrationTest.php @@ -16,7 +16,6 @@ use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; -use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Tests\DbalManager; @@ -83,7 +82,8 @@ public function testSuccessful(): void ); $schemaDirector->create(); - $engine->boot(new SubscriptionEngineCriteria()); + $engine->setup(); + $engine->boot(); $bankAccountId = AccountId::fromString('1'); $bankAccount = BankAccount::create($bankAccountId, 'John'); diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index c1c0beaa5..e8db9686c 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -80,6 +80,7 @@ public function testSuccessful(): void ); $schemaDirector->create(); + $engine->setup(); $engine->boot(); $profileId = ProfileId::fromString('1'); @@ -148,6 +149,7 @@ public function testSnapshot(): void ); $schemaDirector->create(); + $engine->setup(); $engine->boot(); $profileId = ProfileId::fromString('1'); diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index b386a3f02..f3e09dff9 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -103,6 +103,7 @@ public function testHappyPath(): void $engine->subscriptions(), ); + $engine->setup(); $engine->boot(); self::assertEquals( @@ -216,6 +217,7 @@ public function testErrorHandling(): void ), ); + $engine->setup(); $engine->boot(); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -358,6 +360,7 @@ public function testProcessor(): void $engine->subscriptions(), ); + $engine->setup(); $engine->boot(); self::assertEquals( diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index 67e85b3b1..ca9f01878 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -40,7 +40,7 @@ final class DefaultSubscriptionEngineTest extends TestCase { use ProphecyTrait; - public function testNothingToBoot(): void + public function testNothingToSetup(): void { $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->shouldNotBeCalled(); @@ -53,21 +53,23 @@ public function testNothingToBoot(): void new MetadataSubscriberAccessorRepository([]), ); - $engine->boot(); + $engine->setup(); self::assertEquals([], $store->addedSubscriptions); self::assertEquals([], $store->updatedSubscriptions); } - public function testBootDiscoverNewSubscribers(): void + public function testSetupWithoutCreateMethod(): void { $subscriptionId = 'test'; $subscriber = new #[Subscriber('test')] class { }; + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $streamableStore = $this->prophesize(Store::class); - $streamableStore->load($this->criteria())->willReturn(new ArrayStream([]))->shouldBeCalledOnce(); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); $subscriptionStore = new DummySubscriptionStore(); @@ -77,7 +79,7 @@ class { new MetadataSubscriberAccessorRepository([$subscriber]), ); - $engine->boot(); + $engine->setup(); self::assertEquals([ new Subscription( @@ -95,30 +97,84 @@ class { RunMode::FromBeginning, Status::Booting, ), + ], $subscriptionStore->updatedSubscriptions); + } + + public function testSetupWithCreateMethod(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test')] + class { + public bool $created = false; + + #[Setup] + public function create(): void + { + $this->created = true; + } + }; + + $subscriptionStore = new DummySubscriptionStore(); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->setup(); + + self::assertEquals([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, RunMode::FromBeginning, - Status::Active, + Status::New, + ), + ], $subscriptionStore->addedSubscriptions); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, ), ], $subscriptionStore->updatedSubscriptions); + + self::assertTrue($subscriber->created); } - public function testBootWithoutCreateMethod(): void + public function testSetupWithCreateError(): void { $subscriptionId = 'test'; $subscriber = new #[Subscriber('test')] class { + public function __construct( + public readonly RuntimeException $exception = new RuntimeException('ERROR'), + ) { + } + + #[Setup] + public function create(): void + { + throw $this->exception; + } }; $subscriptionStore = new DummySubscriptionStore([ new Subscription($subscriptionId), ]); - $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->prophesize(Store::class); - $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); $engine = new DefaultSubscriptionEngine( $streamableStore->reveal(), @@ -126,59 +182,174 @@ class { new MetadataSubscriberAccessorRepository([$subscriber]), ); - $engine->boot(); + $engine->setup(); - self::assertEquals([ + self::assertEquals( + [ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Error, + 0, + new SubscriptionError( + 'ERROR', + Status::New, + ThrowableToErrorContextTransformer::transform($subscriber->exception), + ), + ), + ], + $subscriptionStore->updatedSubscriptions, + ); + } + + public function testSetupWithSkipBooting(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', runMode: RunMode::FromBeginning)] + class { + }; + + $subscriptionStore = new DummySubscriptionStore([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, RunMode::FromBeginning, - Status::Booting, + Status::New, ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->setup(null, true); + + self::assertEquals([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, RunMode::FromBeginning, - Status::Booting, - 1, + Status::Active, ), + ], $subscriptionStore->updatedSubscriptions); + } + + public function testSetupWithFromNow(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', runMode: RunMode::FromNow)] + class { + }; + + $subscriptionStore = new DummySubscriptionStore([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, + RunMode::FromNow, + Status::New, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->setup(); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromNow, Status::Active, 1, ), ], $subscriptionStore->updatedSubscriptions); } - public function testBootWithMethods(): void + public function testSetupWithFromNowWithEmtpyStream(): void { $subscriptionId = 'test'; - $subscriber = new #[Subscriber('test')] + $subscriber = new #[Subscriber('test', runMode: RunMode::FromNow)] class { - public Message|null $message = null; - public bool $created = false; + }; - #[Setup] - public function create(): void - { - $this->created = true; - } + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromNow, + Status::New, + ), + ]); - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void - { - $this->message = $message; - } - }; + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([]))->shouldBeCalledOnce(); - $subscriptionStore = new DummySubscriptionStore(); + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); - $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $engine->setup(); + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromNow, + Status::Active, + 0, + ), + ], $subscriptionStore->updatedSubscriptions); + } + + public function testNothingToBoot(): void + { $streamableStore = $this->prophesize(Store::class); - $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + $streamableStore->load($this->criteria())->shouldNotBeCalled(); + + $store = new DummySubscriptionStore(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $store, + new MetadataSubscriberAccessorRepository([]), + ); + + $engine->boot(); + + self::assertEquals([], $store->addedSubscriptions); + self::assertEquals([], $store->updatedSubscriptions); + } + + public function testBootDiscoverNewSubscribers(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test')] + class { + }; + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->shouldNotBeCalled(); + + $subscriptionStore = new DummySubscriptionStore(); $engine = new DefaultSubscriptionEngine( $streamableStore->reveal(), @@ -197,13 +368,48 @@ public function handle(Message $message): void ), ], $subscriptionStore->addedSubscriptions); - self::assertEquals([ + self::assertEquals([], $subscriptionStore->updatedSubscriptions); + } + + public function testBootWithSubscriber(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test')] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + $this->message = $message; + } + }; + + $subscriptionStore = new DummySubscriptionStore([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, RunMode::FromBeginning, Status::Booting, ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->boot(); + + self::assertEquals([], $subscriptionStore->addedSubscriptions); + + self::assertEquals([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, @@ -220,7 +426,6 @@ public function handle(Message $message): void ), ], $subscriptionStore->updatedSubscriptions); - self::assertTrue($subscriber->created); self::assertSame($message, $subscriber->message); } @@ -230,13 +435,6 @@ public function testBootWithLimit(): void $subscriber = new #[Subscriber('test')] class { public Message|null $message = null; - public bool $created = false; - - #[Setup] - public function create(): void - { - $this->created = true; - } #[Subscribe(ProfileVisited::class)] public function handle(Message $message): void @@ -245,7 +443,14 @@ public function handle(Message $message): void } }; - $subscriptionStore = new DummySubscriptionStore(); + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -260,22 +465,9 @@ public function handle(Message $message): void $engine->boot(new SubscriptionEngineCriteria(), 1); - self::assertEquals([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, - Status::New, - ), - ], $subscriptionStore->addedSubscriptions); + self::assertEquals([], $subscriptionStore->addedSubscriptions); self::assertEquals([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, - Status::Booting, - ), new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, @@ -285,7 +477,6 @@ public function handle(Message $message): void ), ], $subscriptionStore->updatedSubscriptions); - self::assertTrue($subscriber->created); self::assertSame($message, $subscriber->message); } @@ -379,57 +570,6 @@ public function handle(Message $message): void self::assertNull($subscriber2->message); } - public function testBootWithCreateError(): void - { - $subscriptionId = 'test'; - $subscriber = new #[Subscriber('test')] - class { - public function __construct( - public readonly RuntimeException $exception = new RuntimeException('ERROR'), - ) { - } - - #[Setup] - public function create(): void - { - throw $this->exception; - } - }; - - $subscriptionStore = new DummySubscriptionStore([ - new Subscription($subscriptionId), - ]); - - $streamableStore = $this->prophesize(Store::class); - $streamableStore->load($this->criteria())->shouldNotBeCalled(); - - $engine = new DefaultSubscriptionEngine( - $streamableStore->reveal(), - $subscriptionStore, - new MetadataSubscriberAccessorRepository([$subscriber]), - ); - - $engine->boot(); - - self::assertEquals( - [ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, - Status::Error, - 0, - new SubscriptionError( - 'ERROR', - Status::New, - ThrowableToErrorContextTransformer::transform($subscriber->exception), - ), - ), - ], - $subscriptionStore->updatedSubscriptions, - ); - } - public function testBootingWithGabInIndex(): void { $subscriptionId = 'test'; @@ -488,104 +628,6 @@ public function handle(Message $message): void self::assertSame([$message1, $message2], $subscriber->messages); } - public function testBootingWithFromNow(): void - { - $subscriptionId = 'test'; - $subscriber = new #[Subscriber('test', runMode: RunMode::FromNow)] - class { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void - { - $this->message = $message; - } - }; - - $subscriptionStore = new DummySubscriptionStore([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromNow, - Status::Booting, - ), - ]); - - $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); - - $streamableStore = $this->prophesize(Store::class); - $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); - - $engine = new DefaultSubscriptionEngine( - $streamableStore->reveal(), - $subscriptionStore, - new MetadataSubscriberAccessorRepository([$subscriber]), - ); - - $engine->boot(); - - self::assertEquals([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromNow, - Status::Active, - 1, - ), - ], $subscriptionStore->updatedSubscriptions); - - self::assertNull($subscriber->message); - } - - public function testBootingWithFromNowWithEmtpyStream(): void - { - $subscriptionId = 'test'; - $subscriber = new #[Subscriber('test', runMode: RunMode::FromNow)] - class { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void - { - $this->message = $message; - } - }; - - $subscriptionStore = new DummySubscriptionStore([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromNow, - Status::Booting, - ), - ]); - - $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); - - $streamableStore = $this->prophesize(Store::class); - $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([]))->shouldBeCalledOnce(); - - $engine = new DefaultSubscriptionEngine( - $streamableStore->reveal(), - $subscriptionStore, - new MetadataSubscriberAccessorRepository([$subscriber]), - ); - - $engine->boot(); - - self::assertEquals([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromNow, - Status::Active, - 0, - ), - ], $subscriptionStore->updatedSubscriptions); - - self::assertNull($subscriber->message); - } - public function testBootingWithOnlyOnce(): void { $subscriptionId = 'test'; @@ -1019,6 +1061,62 @@ public function handle(Message $message): void self::assertSame([$message1, $message2], $subscriber->messages); } + public function testRunnningWithOnlyOnce(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', runMode: RunMode::Once)] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + $this->message = $message; + } + }; + + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::Once, + Status::Active, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->run(); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::Once, + Status::Active, + 1, + ), + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::Once, + Status::Finished, + 1, + ), + ], $subscriptionStore->updatedSubscriptions); + + self::assertEquals($message1, $subscriber->message); + } + public function testTeardownDiscoverNewSubscribers(): void { $subscriptionId = 'test'; @@ -1858,6 +1956,7 @@ class { public static function methodProvider(): Generator { + yield 'setup' => ['setup']; yield 'boot' => ['boot']; yield 'run' => ['run']; yield 'teardown' => ['teardown'];