Skip to content

Commit

Permalink
Merge pull request #537 from patchlevel/rename-outdated-into-detached
Browse files Browse the repository at this point in the history
rename outdated into detached in subscription engine
  • Loading branch information
DavidBadura authored Mar 12, 2024
2 parents 437be5d + 2921e78 commit bbe507a
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 39 deletions.
19 changes: 9 additions & 10 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,21 +390,20 @@ stateDiagram-v2
Booting --> Error
Active --> Paused
Active --> Finished
Active --> Outdated
Active --> Detached
Active --> Error
Paused --> New
Paused --> Booting
Paused --> Active
Paused --> Outdated
Paused --> Detached
Finished --> Active
Finished --> Outdated
Finished --> Detached
Error --> New
Error --> Booting
Error --> Active
Error --> Paused
Error --> [*]
Outdated --> Active
Outdated --> [*]
Detached --> Active
Detached --> [*]
```

### New
Expand Down Expand Up @@ -438,16 +437,16 @@ A subscription is finished if the subscriber has the mode `RunMode::Once`.
This means that the subscription is only run once and then set to finished if it reaches the end of the event stream.
You can also reactivate the subscription if you want so that it continues.

### Outdated
### Detached

If an active or finished subscription exists in the subscription store
that does not have a subscriber in the source code with a corresponding subscriber ID,
then this subscription is marked as outdated.
then this subscription is marked as detached.
This happens when either the subscriber has been deleted
or the subscriber ID of a subscriber has changed.
In the last case there should be a new subscription with the new subscriber ID.

An outdated subscription does not automatically become active again when the subscriber exists again.
A detached subscription does not automatically become active again when the subscriber exists again.
This happens, for example, when an old version was deployed again during a rollback.

There are two options to reactivate the subscription:
Expand Down Expand Up @@ -610,7 +609,7 @@ $subscriptionEngine->run($criteria);

### Teardown

If subscriptions are outdated, they can be cleaned up here.
If subscriptions are detached, they can be cleaned up here.
The subscription engine also tries to call the `teardown` method if available.

```php
Expand Down
14 changes: 7 additions & 7 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public function run(
$this->logger?->info('Subscription Engine: Start processing.');

$this->discoverNewSubscriptions();
$this->markOutdatedSubscriptions($criteria);
$this->markDetachedSubscriptions($criteria);
$this->retrySubscriptions($criteria);

$this->findForUpdate(
Expand Down Expand Up @@ -394,13 +394,13 @@ public function teardown(SubscriptionEngineCriteria|null $criteria = null): void

$this->discoverNewSubscriptions();

$this->logger?->info('Subscription Engine: Start teardown outdated subscriptions.');
$this->logger?->info('Subscription Engine: Start teardown detached subscriptions.');

$this->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
status: [Status::Outdated],
status: [Status::Detached],
),
function (array $subscriptions): void {
foreach ($subscriptions as $subscription) {
Expand Down Expand Up @@ -542,7 +542,7 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): vo
groups: $criteria->groups,
status: [
Status::Error,
Status::Outdated,
Status::Detached,
Status::Paused,
Status::Finished,
],
Expand Down Expand Up @@ -710,7 +710,7 @@ private function subscriber(string $subscriberId): SubscriberAccessor|null
return $this->subscriberRepository->get($subscriberId);
}

private function markOutdatedSubscriptions(SubscriptionEngineCriteria $criteria): void
private function markDetachedSubscriptions(SubscriptionEngineCriteria $criteria): void
{
$this->findForUpdate(
new SubscriptionCriteria(
Expand All @@ -726,12 +726,12 @@ function (array $subscriptions): void {
continue;
}

$subscription->outdated();
$subscription->detached();
$this->subscriptionStore->update($subscription);

$this->logger?->info(
sprintf(
'Subscription Engine: Subscriber for "%s" not found and has been marked as outdated.',
'Subscription Engine: Subscriber for "%s" not found and has been marked as detached.',
$subscription->id(),
),
);
Expand Down
2 changes: 1 addition & 1 deletion src/Subscription/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ enum Status: string
case Active = 'active';
case Paused = 'paused';
case Finished = 'finished';
case Outdated = 'outdated';
case Detached = 'detached';
case Error = 'error';
}
8 changes: 4 additions & 4 deletions src/Subscription/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ public function isFinished(): bool
return $this->status === Status::Finished;
}

public function outdated(): void
public function detached(): void
{
$this->status = Status::Outdated;
$this->status = Status::Detached;
}

public function isOutdated(): bool
public function isDetached(): bool
{
return $this->status === Status::Outdated;
return $this->status === Status::Detached;
}

public function error(Throwable|string $error): void
Expand Down
20 changes: 10 additions & 10 deletions tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ public function testRunningMarkOutdated(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
0,
),
], $subscriptionStore->updatedSubscriptions);
Expand Down Expand Up @@ -1156,7 +1156,7 @@ class {
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);

$subscriptionStore = new DummySubscriptionStore([$subscription]);
Expand Down Expand Up @@ -1194,7 +1194,7 @@ public function drop(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);

$subscriptionStore = new DummySubscriptionStore([$subscription]);
Expand Down Expand Up @@ -1234,7 +1234,7 @@ public function drop(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
),
]);

Expand All @@ -1261,7 +1261,7 @@ public function testTeardownWithoutSubscriber(): void
$subscriberId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
),
]);

Expand Down Expand Up @@ -1325,7 +1325,7 @@ public function drop(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);
$subscriptionStore = new DummySubscriptionStore([$subscription]);

Expand Down Expand Up @@ -1355,7 +1355,7 @@ class {
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);
$subscriptionStore = new DummySubscriptionStore([$subscription]);

Expand Down Expand Up @@ -1391,7 +1391,7 @@ public function drop(): void
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);
$subscriptionStore = new DummySubscriptionStore([$subscription]);

Expand All @@ -1417,7 +1417,7 @@ public function testRemoveWithoutSubscriber(): void
$subscriberId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
);
$subscriptionStore = new DummySubscriptionStore([$subscription]);

Expand Down Expand Up @@ -1514,7 +1514,7 @@ class {
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Outdated,
Status::Detached,
),
]);

Expand Down
14 changes: 7 additions & 7 deletions tests/Unit/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function testCreate(): void
self::assertFalse($subscription->isBooting());
self::assertFalse($subscription->isActive());
self::assertFalse($subscription->isError());
self::assertFalse($subscription->isOutdated());
self::assertFalse($subscription->isDetached());
}

public function testBooting(): void
Expand All @@ -44,7 +44,7 @@ public function testBooting(): void
self::assertTrue($subscription->isBooting());
self::assertFalse($subscription->isActive());
self::assertFalse($subscription->isError());
self::assertFalse($subscription->isOutdated());
self::assertFalse($subscription->isDetached());
}

public function testActive(): void
Expand All @@ -60,7 +60,7 @@ public function testActive(): void
self::assertFalse($subscription->isBooting());
self::assertTrue($subscription->isActive());
self::assertFalse($subscription->isError());
self::assertFalse($subscription->isOutdated());
self::assertFalse($subscription->isDetached());
}

public function testError(): void
Expand All @@ -78,7 +78,7 @@ public function testError(): void
self::assertFalse($subscription->isBooting());
self::assertFalse($subscription->isActive());
self::assertTrue($subscription->isError());
self::assertFalse($subscription->isOutdated());
self::assertFalse($subscription->isDetached());
self::assertEquals(
new SubscriptionError(
'test',
Expand All @@ -95,14 +95,14 @@ public function testOutdated(): void
'test',
);

$subscription->outdated();
$subscription->detached();

self::assertEquals(Status::Outdated, $subscription->status());
self::assertEquals(Status::Detached, $subscription->status());
self::assertFalse($subscription->isNew());
self::assertFalse($subscription->isBooting());
self::assertFalse($subscription->isActive());
self::assertFalse($subscription->isError());
self::assertTrue($subscription->isOutdated());
self::assertTrue($subscription->isDetached());
}

public function testChangePosition(): void
Expand Down

0 comments on commit bbe507a

Please sign in to comment.