Skip to content

Commit

Permalink
split boot method into boot and setup in subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Mar 9, 2024
1 parent 29833b0 commit 43687f9
Show file tree
Hide file tree
Showing 9 changed files with 535 additions and 322 deletions.
91 changes: 86 additions & 5 deletions src/Console/Command/SubscriptionBootCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@

namespace Patchlevel\EventSourcing\Console\Command;

use Closure;
use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Logger\ConsoleLogger;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
'event-sourcing:subscription:boot',
'Prepare new subscriptions and catch up with the event store',
'Catch up with the event store.',
)]
final class SubscriptionBootCommand extends SubscriptionCommand
{
Expand All @@ -22,20 +26,97 @@ public function configure(): void

$this
->addOption(
'limit',
'run-limit',
null,
InputOption::VALUE_REQUIRED,
'The maximum number of runs this command should execute',
)
->addOption(
'message-limit',
null,
InputOption::VALUE_REQUIRED,
'How many messages should be consumed in one run',
1000,
)
->addOption(
'memory-limit',
null,
InputOption::VALUE_REQUIRED,
'How much memory consumption should the worker be terminated (e.g. 250MB)',
)
->addOption(
'time-limit',
null,
InputOption::VALUE_REQUIRED,
'What is the maximum time the worker can run in seconds',
)
->addOption(
'sleep',
null,
InputOption::VALUE_REQUIRED,
'How much time should elapse before the next job is executed in milliseconds',
)
->addOption(
'setup',
null,
InputOption::VALUE_NONE,
'Setup new subscriptions',
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$limit = InputHelper::nullablePositiveInt($input->getOption('limit'));
$runLimit = InputHelper::nullablePositiveInt($input->getOption('run-limit'));
$messageLimit = InputHelper::nullablePositiveInt($input->getOption('message-limit'));
$memoryLimit = InputHelper::nullableString($input->getOption('memory-limit'));
$timeLimit = InputHelper::nullablePositiveInt($input->getOption('time-limit'));
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$setup = InputHelper::bool($input->getOption('setup'));

$criteria = $this->subscriptionEngineCriteria($input);
$this->engine->boot($criteria, $limit);

return 0;
if ($setup) {
$this->engine->setup($criteria);
}

$logger = new ConsoleLogger($output);

$finished = false;

$worker = DefaultWorker::create(
function (Closure $stop) use ($criteria, $messageLimit, &$finished): void {
$this->engine->boot($criteria, $messageLimit);

if (!$this->isBootingFinished($criteria)) {
return;
}

$finished = true;
$stop();
},
[
'runLimit' => $runLimit,
'memoryLimit' => $memoryLimit,
'timeLimit' => $timeLimit,
],
$logger,
);

$worker->run($sleep);

return $finished ? 0 : 1;
}

private function isBootingFinished(SubscriptionEngineCriteria $criteria): bool
{
$subscriptions = $this->engine->subscriptions($criteria);

foreach ($subscriptions as $subscription) {
if ($subscription->isBooting()) {
return false;
}
}

return true;
}
}
24 changes: 24 additions & 0 deletions src/Console/Command/SubscriptionSetupCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console\Command;

use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
'event-sourcing:subscription:setup',
'Setup new subscriptions',
)]
final class SubscriptionSetupCommand extends SubscriptionCommand
{
protected function execute(InputInterface $input, OutputInterface $output): int
{
$criteria = $this->subscriptionEngineCriteria($input);
$this->engine->setup($criteria);

return 0;
}
}
Loading

0 comments on commit 43687f9

Please sign in to comment.