Skip to content

Commit

Permalink
Merge pull request #531 from patchlevel/add-setup-method
Browse files Browse the repository at this point in the history
split boot method into boot and setup in subscription engine
  • Loading branch information
DavidBadura authored Mar 12, 2024
2 parents 8f56e5f + 26be8cf commit 437be5d
Show file tree
Hide file tree
Showing 17 changed files with 783 additions and 546 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"doctrine/dbal": "^3.8.1|^4.0.0",
"doctrine/migrations": "^3.3.2",
"patchlevel/hydrator": "^1.2.0",
"patchlevel/worker": "^1.1.1",
"patchlevel/worker": "^1.2.0",
"psr/cache": "^2.0.0|^3.0.0",
"psr/clock": "^1.0",
"psr/event-dispatcher": "^1.0",
Expand Down
410 changes: 209 additions & 201 deletions composer.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions docs/pages/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ To manage your subscriptions there are the following cli commands.
* SubscriptionReactiveCommand: `event-sourcing:subscription:reactive`
* SubscriptionRemoveCommand: `event-sourcing:subscription:remove`
* SubscriptionRunCommand: `event-sourcing:subscription:run`
* SubscriptionSetupCommand: `event-sourcing:subscription:setup`
* SubscriptionStatusCommand: `event-sourcing:subscription:status`
* SubscriptionTeardownCommand: `event-sourcing:subscription:teardown`

Expand Down Expand Up @@ -80,6 +81,7 @@ $cli->addCommands(array(
new Command\SubscriptionRemoveCommand($projectionist),
new Command\SubscriptionReactivateCommand($projectionist),
new Command\SubscriptionRebuildCommand($projectionist),
new Command\SubscriptionSetupCommand($projectionist),
new Command\SubscriptionStatusCommand($projectionist),
new Command\SchemaCreateCommand($store, $schemaManager),
new Command\SchemaDropCommand($store, $schemaManager),
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ $schemaDirector = new DoctrineSchemaDirector(
);

$schemaDirector->create();
$projectionist->boot();
$projectionist->setup(skipBooting: true);
```

!!! note
Expand Down
32 changes: 23 additions & 9 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ final class ProfileSubscriber

### Run Mode

The run mode determines how the subscriber should behave when it is booted.
The run mode determines how the subscriber should behave.
There are three different modes:

#### From Beginning
Expand Down Expand Up @@ -382,6 +382,7 @@ stateDiagram-v2
direction LR
[*] --> New
New --> Booting
New --> Active
New --> Error
Booting --> Active
Booting --> Paused
Expand All @@ -395,7 +396,6 @@ stateDiagram-v2
Paused --> Booting
Paused --> Active
Paused --> Outdated
Paused --> [*]
Finished --> Active
Finished --> Outdated
Error --> New
Expand All @@ -412,12 +412,13 @@ stateDiagram-v2
A subscription is created and "new" if a subscriber exists with an ID that is not yet tracked.
This can happen when either a new subscriber has been added, the subscriber ID has changed
or the subscription has been manually deleted from the subscription store.
You can then set up the subscription so that it is booting or active.
In this step, the subscription engine also tries to call the `setup` method if available.

### Booting

Booting status is reached when the boot process is invoked.
In this step, the "setup" method is called on the subscription, if available.
And the subscription is brought up to date, depending on the mode.
Booting status is reached when the setup process is finished.
In this step the subscription engine tries to catch up to the current event stream.
When the process is finished, the subscription is set to active or finished.

### Active
Expand Down Expand Up @@ -575,12 +576,25 @@ $criteria = new SubscriptionEngineCriteria(

An `OR` check is made for the respective criteria and all criteria are checked with an `AND`.

### Setup

New subscriptions need to be set up before they can be used.
In this step, the subscription engine also tries to call the `setup` method if available.
After the setup process, the subscription is set to booting or active.

```php
$subscriptionEngine->setup($criteria);
```

!!! tip

You can skip the booting step with the second boolean parameter named `skipBooting`.

### Boot

So that the subscription engine can manage the subscriptions, they must be booted.
In this step, the `setup` will be called if available.
Then the subscriptions then catch up with the current position of the event stream.
When the subscriptions are finished, they switch to the active or finished state.
You can boot the subscriptions with the `boot` method.
All booting subscriptions will catch up to the current event stream.
After the boot process, the subscription is set to active or finished.

```php
$subscriptionEngine->boot($criteria);
Expand Down
91 changes: 86 additions & 5 deletions src/Console/Command/SubscriptionBootCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@

namespace Patchlevel\EventSourcing\Console\Command;

use Closure;
use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Logger\ConsoleLogger;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
'event-sourcing:subscription:boot',
'Prepare new subscriptions and catch up with the event store',
'Catch up with the event store.',
)]
final class SubscriptionBootCommand extends SubscriptionCommand
{
Expand All @@ -22,20 +26,97 @@ public function configure(): void

$this
->addOption(
'limit',
'run-limit',
null,
InputOption::VALUE_REQUIRED,
'The maximum number of runs this command should execute',
)
->addOption(
'message-limit',
null,
InputOption::VALUE_REQUIRED,
'How many messages should be consumed in one run',
1000,
)
->addOption(
'memory-limit',
null,
InputOption::VALUE_REQUIRED,
'How much memory consumption should the worker be terminated (e.g. 250MB)',
)
->addOption(
'time-limit',
null,
InputOption::VALUE_REQUIRED,
'What is the maximum time the worker can run in seconds',
)
->addOption(
'sleep',
null,
InputOption::VALUE_REQUIRED,
'How much time should elapse before the next job is executed in milliseconds',
)
->addOption(
'setup',
null,
InputOption::VALUE_NONE,
'Setup new subscriptions',
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$limit = InputHelper::nullablePositiveInt($input->getOption('limit'));
$runLimit = InputHelper::nullablePositiveInt($input->getOption('run-limit'));
$messageLimit = InputHelper::nullablePositiveInt($input->getOption('message-limit'));
$memoryLimit = InputHelper::nullableString($input->getOption('memory-limit'));
$timeLimit = InputHelper::nullablePositiveInt($input->getOption('time-limit'));
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$setup = InputHelper::bool($input->getOption('setup'));

$criteria = $this->subscriptionEngineCriteria($input);
$this->engine->boot($criteria, $limit);

return 0;
if ($setup) {
$this->engine->setup($criteria);
}

$logger = new ConsoleLogger($output);

$finished = false;

$worker = DefaultWorker::create(
function (Closure $stop) use ($criteria, $messageLimit, &$finished): void {
$this->engine->boot($criteria, $messageLimit);

if (!$this->isBootingFinished($criteria)) {
return;
}

$finished = true;
$stop();
},
[
'runLimit' => $runLimit,
'memoryLimit' => $memoryLimit,
'timeLimit' => $timeLimit,
],
$logger,
);

$worker->run($sleep);

return $finished ? 0 : 1;
}

private function isBootingFinished(SubscriptionEngineCriteria $criteria): bool
{
$subscriptions = $this->engine->subscriptions($criteria);

foreach ($subscriptions as $subscription) {
if ($subscription->isBooting()) {
return false;
}
}

return true;
}
}
24 changes: 24 additions & 0 deletions src/Console/Command/SubscriptionSetupCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console\Command;

use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
'event-sourcing:subscription:setup',
'Setup new subscriptions',
)]
final class SubscriptionSetupCommand extends SubscriptionCommand
{
protected function execute(InputInterface $input, OutputInterface $output): int
{
$criteria = $this->subscriptionEngineCriteria($input);
$this->engine->setup($criteria);

return 0;
}
}
2 changes: 1 addition & 1 deletion src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private function getCustomHeaders(Message $message): array
return array_values(
array_filter(
$message->headers(),
static fn (object $header) => !in_array($header::class, $filteredHeaders, true)
static fn (object $header) => !in_array($header::class, $filteredHeaders, true),
),
);
}
Expand Down
Loading

0 comments on commit 437be5d

Please sign in to comment.