Skip to content

Commit

Permalink
improve api
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Feb 22, 2024
1 parent 5609182 commit c582ede
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 320 deletions.
4 changes: 2 additions & 2 deletions docs/pages/projection.md
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,9 @@ Our default strategy can be configured with the following parameters:
* `maxAttempts` - The maximum number of attempts.

```php
use Patchlevel\EventSourcing\Projection\RetryStrategy\DefaultRetryStrategy;
use Patchlevel\EventSourcing\Projection\RetryStrategy\ClockBasedRetryStrategy;

$retryStrategy = new DefaultRetryStrategy(
$retryStrategy = new ClockBasedRetryStrategy(
baseDelay: 5,
delayFactor: 2,
maxAttempts: 5,
Expand Down
15 changes: 15 additions & 0 deletions src/Projection/Projection/NoErrorToRetry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projection;

use RuntimeException;

final class NoErrorToRetry extends RuntimeException
{
public function __construct()
{
parent::__construct('No error to retry');
}
}
29 changes: 18 additions & 11 deletions src/Projection/Projection/Projection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Patchlevel\EventSourcing\Projection\Projection;

use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry;
use DateTimeImmutable;
use Throwable;

final class Projection
Expand All @@ -18,7 +18,8 @@ public function __construct(
private ProjectionStatus $status = ProjectionStatus::New,
private int $position = 0,
private ProjectionError|null $error = null,
private Retry|null $retry = null,
private int $retryAttempt = 0,
private DateTimeImmutable|null $lastSavedAt = null,
) {
}

Expand Down Expand Up @@ -130,28 +131,34 @@ public function isError(): bool
return $this->status === ProjectionStatus::Error;
}

public function updateRetry(Retry|null $retry): void
public function retryAttempt(): int
{
$this->retry = $retry;
}

public function retry(): Retry|null
{
return $this->retry;
return $this->retryAttempt;
}

public function doRetry(): void
{
if ($this->error === null) {
return;
throw new NoErrorToRetry();
}

$this->retryAttempt++;
$this->status = $this->error->previousStatus;
$this->error = null;
}

public function resetRetry(): void
{
$this->retry = null;
$this->retryAttempt = 0;
}

public function lastSavedAt(): DateTimeImmutable|null
{
return $this->lastSavedAt;
}

public function updateLastSavedAt(DateTimeImmutable $lastSavedAt): void
{
$this->lastSavedAt = $lastSavedAt;
}
}
38 changes: 20 additions & 18 deletions src/Projection/Projection/Store/DoctrineStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;
use Patchlevel\EventSourcing\Projection\RetryStrategy\Retry;
use Patchlevel\EventSourcing\Schema\SchemaConfigurator;
use Psr\Clock\ClockInterface;

use function array_map;
use function assert;
Expand All @@ -39,14 +40,15 @@
* error_message: string|null,
* error_previous_status: string|null,
* error_context: string|null,
* retry_attempt: int|null,
* retry_next: string|null,
* retry_attempt: int,
* last_saved_at: string,
* }
*/
final class DoctrineStore implements LockableProjectionStore, SchemaConfigurator
{
public function __construct(
private readonly Connection $connection,
private readonly ClockInterface $clock = new SystemClock(),
private readonly string $projectionTable = 'projections',
) {
}
Expand Down Expand Up @@ -121,7 +123,8 @@ public function find(ProjectionCriteria|null $criteria = null): array
public function add(Projection $projection): void
{
$projectionError = $projection->projectionError();
$projectionRetry = $projection->retry();

$projection->updateLastSavedAt($this->clock->now());

$this->connection->insert(
$this->projectionTable,
Expand All @@ -134,19 +137,20 @@ public function add(Projection $projection): void
'error_message' => $projectionError?->errorMessage,
'error_previous_status' => $projectionError?->previousStatus?->value,
'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null,
'retry_attempt' => $projectionRetry?->attempt,
'retry_next' => $projectionRetry?->nextRetry,
'retry_attempt' => $projection->retryAttempt(),
'last_saved_at' => $projection->lastSavedAt(),
],
[
'retry_next' => Types::DATETIME_IMMUTABLE,
'last_saved_at' => Types::DATETIME_IMMUTABLE,
],
);
}

public function update(Projection $projection): void
{
$projectionError = $projection->projectionError();
$projectionRetry = $projection->retry();

$projection->updateLastSavedAt($this->clock->now());

$effectedRows = $this->connection->update(
$this->projectionTable,
Expand All @@ -158,14 +162,14 @@ public function update(Projection $projection): void
'error_message' => $projectionError?->errorMessage,
'error_previous_status' => $projectionError?->previousStatus?->value,
'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null,
'retry_attempt' => $projectionRetry?->attempt,
'retry_next' => $projectionRetry?->nextRetry,
'retry_attempt' => $projection->retryAttempt(),
'last_saved_at' => $projection->lastSavedAt(),
],
[
'id' => $projection->id(),
],
[
'retry_next' => Types::DATETIME_IMMUTABLE,
'last_saved_at' => Types::DATETIME_IMMUTABLE,
],
);

Expand Down Expand Up @@ -221,9 +225,9 @@ public function configureSchema(Schema $schema, Connection $connection): void
$table->addColumn('error_context', Types::JSON)
->setNotnull(false);
$table->addColumn('retry_attempt', Types::INTEGER)
->setNotnull(false);
$table->addColumn('retry_next', Types::DATETIMETZ_IMMUTABLE)
->setNotnull(false);
->setNotnull(true);
$table->addColumn('last_saved_at', Types::DATETIMETZ_IMMUTABLE)
->setNotnull(true);

$table->setPrimaryKey(['id']);
$table->addIndex(['group_name']);
Expand All @@ -247,10 +251,8 @@ private function createProjection(array $row): Projection
$row['error_previous_status'] !== null ? ProjectionStatus::from($row['error_previous_status']) : ProjectionStatus::New,
$context,
) : null,
$row['retry_attempt'] !== null ? new Retry(
$row['retry_attempt'],
self::normalizeDateTime($row['retry_next'], $this->connection->getDatabasePlatform()),
) : null,
$row['retry_attempt'],
self::normalizeDateTime($row['last_saved_at'], $this->connection->getDatabasePlatform()),
);
}

Expand Down
36 changes: 16 additions & 20 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use Patchlevel\EventSourcing\Projection\Projection\RunMode;
use Patchlevel\EventSourcing\Projection\Projection\Store\LockableProjectionStore;
use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore;
use Patchlevel\EventSourcing\Projection\RetryStrategy\DefaultRetryStrategy;
use Patchlevel\EventSourcing\Projection\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Projection\RetryStrategy\RetryStrategy;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
Expand All @@ -39,7 +39,7 @@ public function __construct(
private readonly Store $streamableMessageStore,
private readonly ProjectionStore $projectionStore,
private readonly iterable $projectors,
private readonly RetryStrategy $retryStrategy = new DefaultRetryStrategy(),
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly ProjectorMetadataFactory $metadataFactory = new AttributeProjectorMetadataFactory(),
private readonly LoggerInterface|null $logger = null,
) {
Expand Down Expand Up @@ -604,25 +604,34 @@ private function handleRetryProjections(ProjectionistCriteria $criteria): void
function (array $projections): void {
/** @var Projection $projection */
foreach ($projections as $projection) {
$retry = $projection->retry();
$error = $projection->projectionError();

if (!$retry) {
if ($error === null) {
continue;
}

if (!$this->retryStrategy->shouldRetry($retry)) {
$retryable = in_array(
$error->previousStatus,
[ProjectionStatus::New, ProjectionStatus::Booting, ProjectionStatus::Active],
true,
);

if (!$retryable) {
continue;
}

$projection->doRetry();
if (!$this->retryStrategy->shouldRetry($projection)) {
continue;
}

$projection->doRetry();
$this->projectionStore->update($projection);

$this->logger?->info(
sprintf(
'Projectionist: Retry projection "%s" (%d) and set back to %s.',
$projection->id(),
$retry->attempt,
$projection->retryAttempt(),
$projection->status()->value,
),
);
Expand Down Expand Up @@ -855,20 +864,7 @@ private function findForUpdate(ProjectionCriteria $criteria, Closure $closure):

private function handleError(Projection $projection, Throwable $throwable): void
{
$retryable = in_array(
$projection->status(),
[ProjectionStatus::New, ProjectionStatus::Booting, ProjectionStatus::Active],
true,
);

$projection->error($throwable);

if ($retryable) {
$projection->updateRetry(
$this->retryStrategy->nextAttempt($projection->retry()),
);
}

$this->projectionStore->update($projection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

use DateTimeImmutable;
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Psr\Clock\ClockInterface;
use RuntimeException;

use function round;
use function sprintf;

final class DefaultRetryStrategy implements RetryStrategy
final class ClockBasedRetryStrategy implements RetryStrategy
{
public const DEFAULT_BASE_DELAY = 5;
public const DEFAULT_DELAY_FACTOR = 2;
Expand All @@ -30,46 +30,29 @@ public function __construct(
) {
}

public function nextAttempt(Retry|null $retry): Retry|null
public function shouldRetry(Projection $projection): bool
{
if ($retry === null) {
return new Retry(
1,
$this->calculateNextDate(1),
);
}

if ($retry->attempt() >= $this->maxAttempts) {
return null;
}

return new Retry(
$retry->attempt() + 1,
$this->calculateNextDate($retry->attempt()),
);
}

public function shouldRetry(Retry|null $retry): bool
{
if ($retry === null) {
if ($projection->retryAttempt() >= $this->maxAttempts) {
return false;
}

$nextRetry = $retry->nextRetry();
$lastSavedAt = $projection->lastSavedAt();

if ($nextRetry === null) {
if ($lastSavedAt === null) {
return false;
}

return $nextRetry <= $this->clock->now();
$nextRetryDate = $this->calculateNextRetryDate($lastSavedAt, $projection->retryAttempt());

return $nextRetryDate <= $this->clock->now();
}

private function calculateNextDate(int $attempt): DateTimeImmutable
private function calculateNextRetryDate(DateTimeImmutable $lastDate, int $attempt): DateTimeImmutable
{
$nextDate = $this->clock->now()->modify(sprintf('+%d seconds', $this->calculateDelay($attempt)));
$nextDate = $lastDate->modify(sprintf('+%d seconds', $this->calculateDelay($attempt)));

if ($nextDate === false) {
throw new RuntimeException('Could not calculate next date');
throw new UnexpectedError('Could not calculate next date');
}

return $nextDate;
Expand Down
9 changes: 3 additions & 6 deletions src/Projection/RetryStrategy/NoRetryStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@

namespace Patchlevel\EventSourcing\Projection\RetryStrategy;

use Patchlevel\EventSourcing\Projection\Projection\Projection;

final class NoRetryStrategy implements RetryStrategy
{
public function nextAttempt(Retry|null $retry): Retry|null
{
return null;
}

public function shouldRetry(Retry|null $retry): bool
public function shouldRetry(Projection $projection): bool
{
return false;
}
Expand Down
Loading

0 comments on commit c582ede

Please sign in to comment.