Skip to content

Commit

Permalink
rename outdated into detached & worker should not discover new subscr…
Browse files Browse the repository at this point in the history
…iptions in running process
  • Loading branch information
DavidBadura committed Mar 12, 2024
1 parent 437be5d commit 8dca190
Show file tree
Hide file tree
Showing 11 changed files with 512 additions and 41 deletions.
19 changes: 9 additions & 10 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,21 +390,20 @@ stateDiagram-v2
Booting --> Error
Active --> Paused
Active --> Finished
Active --> Outdated
Active --> Detached
Active --> Error
Paused --> New
Paused --> Booting
Paused --> Active
Paused --> Outdated
Paused --> Detached
Finished --> Active
Finished --> Outdated
Finished --> Detached
Error --> New
Error --> Booting
Error --> Active
Error --> Paused
Error --> [*]
Outdated --> Active
Outdated --> [*]
Detached --> Active
Detached --> [*]
```

### New
Expand Down Expand Up @@ -438,16 +437,16 @@ A subscription is finished if the subscriber has the mode `RunMode::Once`.
This means that the subscription is only run once and then set to finished if it reaches the end of the event stream.
You can also reactivate the subscription if you want so that it continues.

### Outdated
### Detached

If an active or finished subscription exists in the subscription store
that does not have a subscriber in the source code with a corresponding subscriber ID,
then this subscription is marked as outdated.
then this subscription is marked as detached.
This happens when either the subscriber has been deleted
or the subscriber ID of a subscriber has changed.
In the last case there should be a new subscription with the new subscriber ID.

An outdated subscription does not automatically become active again when the subscriber exists again.
A detached subscription does not automatically become active again when the subscriber exists again.
This happens, for example, when an old version was deployed again during a rollback.

There are two options to reactivate the subscription:
Expand Down Expand Up @@ -610,7 +609,7 @@ $subscriptionEngine->run($criteria);

### Teardown

If subscriptions are outdated, they can be cleaned up here.
If subscriptions are detached, they can be cleaned up here.
The subscription engine also tries to call the `teardown` method if available.

```php
Expand Down
2 changes: 1 addition & 1 deletion src/Console/Command/SubscriptionBootCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$setup = InputHelper::bool($input->getOption('setup'));

$criteria = $this->subscriptionEngineCriteria($input);
$criteria = $this->staticSubscriptionEngineCriteria($input);

if ($setup) {
$this->engine->setup($criteria);
Expand Down
14 changes: 14 additions & 0 deletions src/Console/Command/SubscriptionCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,18 @@ protected function subscriptionEngineCriteria(InputInterface $input): Subscripti
InputHelper::nullableStringList($input->getOption('group')),
);
}

protected function staticSubscriptionEngineCriteria(InputInterface $input): SubscriptionEngineCriteria
{
$criteria = $this->subscriptionEngineCriteria($input);

$subscriptions = $this->engine->subscriptions($criteria);

return new SubscriptionEngineCriteria(
array_map(
static fn($subscription) => $subscription->id(),
$subscriptions
),
);
}
}
2 changes: 1 addition & 1 deletion src/Console/Command/SubscriptionRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$rebuild = InputHelper::bool($input->getOption('rebuild'));

$criteria = $this->subscriptionEngineCriteria($input);
$criteria = $this->staticSubscriptionEngineCriteria($input);

$logger = new ConsoleLogger($output);

Expand Down
14 changes: 7 additions & 7 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public function run(
$this->logger?->info('Subscription Engine: Start processing.');

$this->discoverNewSubscriptions();
$this->markOutdatedSubscriptions($criteria);
$this->markDetachedSubscriptions($criteria);
$this->retrySubscriptions($criteria);

$this->findForUpdate(
Expand Down Expand Up @@ -394,13 +394,13 @@ public function teardown(SubscriptionEngineCriteria|null $criteria = null): void

$this->discoverNewSubscriptions();

$this->logger?->info('Subscription Engine: Start teardown outdated subscriptions.');
$this->logger?->info('Subscription Engine: Start teardown detached subscriptions.');

$this->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
status: [Status::Outdated],
status: [Status::Detached],
),
function (array $subscriptions): void {
foreach ($subscriptions as $subscription) {
Expand Down Expand Up @@ -542,7 +542,7 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): vo
groups: $criteria->groups,
status: [
Status::Error,
Status::Outdated,
Status::Detached,
Status::Paused,
Status::Finished,
],
Expand Down Expand Up @@ -710,7 +710,7 @@ private function subscriber(string $subscriberId): SubscriberAccessor|null
return $this->subscriberRepository->get($subscriberId);
}

private function markOutdatedSubscriptions(SubscriptionEngineCriteria $criteria): void
private function markDetachedSubscriptions(SubscriptionEngineCriteria $criteria): void
{
$this->findForUpdate(
new SubscriptionCriteria(
Expand All @@ -726,12 +726,12 @@ function (array $subscriptions): void {
continue;
}

$subscription->outdated();
$subscription->detached();
$this->subscriptionStore->update($subscription);

$this->logger?->info(
sprintf(
'Subscription Engine: Subscriber for "%s" not found and has been marked as outdated.',
'Subscription Engine: Subscriber for "%s" not found and has been marked as detached.',
$subscription->id(),
),
);
Expand Down
2 changes: 1 addition & 1 deletion src/Subscription/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ enum Status: string
case Active = 'active';
case Paused = 'paused';
case Finished = 'finished';
case Outdated = 'outdated';
case Detached = 'detached';
case Error = 'error';
}
8 changes: 4 additions & 4 deletions src/Subscription/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ public function isFinished(): bool
return $this->status === Status::Finished;
}

public function outdated(): void
public function detached(): void
{
$this->status = Status::Outdated;
$this->status = Status::Detached;
}

public function isOutdated(): bool
public function isDetached(): bool
{
return $this->status === Status::Outdated;
return $this->status === Status::Detached;
}

public function error(Throwable|string $error): void
Expand Down
66 changes: 66 additions & 0 deletions tests/Integration/Subscription/Subscriber/ProfileNewProjection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\Table;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated;

use function assert;

#[Subscriber('profile_2')]

Check failure on line 19 in tests/Integration/Subscription/Subscriber/ProfileNewProjection.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

TooFewArguments

tests/Integration/Subscription/Subscriber/ProfileNewProjection.php:19:3: TooFewArguments: Too few arguments for Patchlevel\EventSourcing\Attribute\Subscriber::__construct - expecting runMode to be passed (see https://psalm.dev/025)
final class ProfileNewProjection
{
use SubscriberUtil;

public function __construct(
private Connection $connection,
) {
}

#[Setup]
public function create(): void
{
$table = new Table($this->tableName());
$table->addColumn('id', 'string')->setLength(36);
$table->addColumn('firstname', 'string')->setLength(255);
$table->setPrimaryKey(['id']);

$this->connection->createSchemaManager()->createTable($table);
}

#[Teardown]
public function drop(): void
{
$this->connection->createSchemaManager()->dropTable($this->tableName());
}

#[Subscribe(ProfileCreated::class)]
public function handleProfileCreated(Message $message): void
{
$profileCreated = $message->event();

assert($profileCreated instanceof ProfileCreated);

$this->connection->executeStatement(
'INSERT INTO ' . $this->tableName() . ' (id, firstname) VALUES(:id, :firstname);',
[
'id' => $profileCreated->profileId->toString(),
'firstname' => $profileCreated->name,
],
);
}

private function tableName(): string
{
return 'projection_' . $this->subscriberId();
}
}
Loading

0 comments on commit 8dca190

Please sign in to comment.