Skip to content

Commit

Permalink
add failed status in subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 30, 2024
1 parent 36630de commit 7ab1094
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 7 deletions.
9 changes: 8 additions & 1 deletion src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Status;
Expand Down Expand Up @@ -667,6 +668,7 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Re
groups: $criteria->groups,
status: [
Status::Error,
Status::Failed,
Status::Detached,
Status::Paused,
Status::Finished,
Expand Down Expand Up @@ -984,7 +986,12 @@ private function discoverNewSubscriptions(): void

private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
if ($this->retryStrategy instanceof ConditionalRetryStrategy && !$this->retryStrategy->canRetry($subscription)) {

Check failure on line 989 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Call to method canRetry() on an unknown class Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy.

Check failure on line 989 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Class Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy not found.
$subscription->failed($throwable);
} else {
$subscription->error($throwable);
}

$this->subscriptionManager->update($subscription);

if (!isset($this->batching[$subscription->id()])) {
Expand Down
9 changes: 7 additions & 2 deletions src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use function round;
use function sprintf;

final class ClockBasedRetryStrategy implements RetryStrategy
final class ClockBasedRetryStrategy implements ConditionalRetryStrategy

Check failure on line 15 in src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Class Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy implements unknown interface Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy.

Check failure on line 15 in src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source

Check failure on line 15 in src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source

Check failure on line 15 in src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source
{
public const DEFAULT_BASE_DELAY = 5;
public const DEFAULT_DELAY_FACTOR = 2;
Expand All @@ -30,9 +30,14 @@ public function __construct(
) {
}

public function canRetry(Subscription $subscription): bool
{
return $subscription->retryAttempt() < $this->maxAttempts;
}

public function shouldRetry(Subscription $subscription): bool
{
if ($subscription->retryAttempt() >= $this->maxAttempts) {
if ($this->canRetry($subscription) === false) {
return false;
}

Expand Down
7 changes: 6 additions & 1 deletion src/Subscription/RetryStrategy/NoRetryStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@

use Patchlevel\EventSourcing\Subscription\Subscription;

final class NoRetryStrategy implements RetryStrategy
final class NoRetryStrategy implements ConditionalRetryStrategy

Check failure on line 9 in src/Subscription/RetryStrategy/NoRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Class Patchlevel\EventSourcing\Subscription\RetryStrategy\NoRetryStrategy implements unknown interface Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy.

Check failure on line 9 in src/Subscription/RetryStrategy/NoRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source

Check failure on line 9 in src/Subscription/RetryStrategy/NoRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source

Check failure on line 9 in src/Subscription/RetryStrategy/NoRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source

Check failure on line 9 in src/Subscription/RetryStrategy/NoRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source

Check failure on line 9 in src/Subscription/RetryStrategy/NoRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source

Check failure on line 9 in src/Subscription/RetryStrategy/NoRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source

Check failure on line 9 in src/Subscription/RetryStrategy/NoRetryStrategy.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

Roave\BetterReflection\Reflection\ReflectionClass "Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy" could not be found in the located source
{
public function shouldRetry(Subscription $subscription): bool
{
return false;
}

public function canRetry(Subscription $subscription): bool
{
return false;
}
}
1 change: 1 addition & 0 deletions src/Subscription/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ enum Status: string
case Finished = 'finished';
case Detached = 'detached';
case Error = 'error';
case Failed = 'failed';
}
19 changes: 19 additions & 0 deletions src/Subscription/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@ public function isError(): bool
return $this->status === Status::Error;
}

public function failed(Throwable|string $error): void
{
$previousStatus = $this->status;
$this->status = Status::Failed;

if ($error instanceof Throwable) {
$this->error = SubscriptionError::fromThrowable($previousStatus, $error);

return;
}

$this->error = new SubscriptionError($error, $previousStatus);
}

public function isFailed(): bool
{
return $this->status === Status::Failed;
}

public function retryAttempt(): int
{
return $this->retryAttempt;
Expand Down
33 changes: 30 additions & 3 deletions tests/Integration/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ public function testErrorHandling(): void

$subscriber->subscribeError = true;

// first run, error

$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
Expand All @@ -269,6 +271,8 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(0, $subscription->retryAttempt());

// second run, time has not passed yet, no retry, no error

$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
Expand All @@ -281,8 +285,9 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(0, $subscription->retryAttempt());

$clock->sleep(5);
// third run, time has passed, 1. retry, error again

$clock->sleep(5);
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
Expand All @@ -300,8 +305,9 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(1, $subscription->retryAttempt());

$clock->sleep(10);
// fourth run, time has passed, 2. retry, max retries reached, failed

$clock->sleep(10);
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
Expand All @@ -314,11 +320,28 @@ public function testErrorHandling(): void

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

self::assertEquals(Status::Error, $subscription->status());
self::assertEquals(Status::Failed, $subscription->status());
self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage);
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(2, $subscription->retryAttempt());

// fifth run, time has passed, skip failed subscription

$clock->sleep(20);
$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
self::assertEquals([], $result->errors);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

self::assertEquals(Status::Failed, $subscription->status());
self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage);
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(2, $subscription->retryAttempt());

// reactivated subscription

$engine->reactivate(new SubscriptionEngineCriteria(
ids: ['error_producer'],
));
Expand All @@ -329,6 +352,8 @@ public function testErrorHandling(): void
self::assertEquals(null, $subscription->subscriptionError());
self::assertEquals(0, $subscription->retryAttempt());

// sixth run, error again

$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
Expand All @@ -346,6 +371,8 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(0, $subscription->retryAttempt());

// seventh run, time has passed, error fixed, 1. retry, no error

$clock->sleep(5);
$subscriber->subscribeError = false;

Expand Down

0 comments on commit 7ab1094

Please sign in to comment.