Skip to content

Commit

Permalink
add retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 29, 2023
1 parent fdd83f4 commit 256b66c
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 49 deletions.
26 changes: 26 additions & 0 deletions src/Projection/Projection/Projection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public function __construct(
private ProjectionStatus $status = ProjectionStatus::New,
private int $position = 0,
private ProjectionError|null $error = null,
private int $retry = 0,
) {
}

Expand Down Expand Up @@ -86,4 +87,29 @@ public function isError(): bool
{
return $this->status === ProjectionStatus::Error;
}

public function incrementRetry(): void
{
$this->retry++;
}

public function retry(): int
{
return $this->retry;
}

public function resetRetry(): void
{
$this->retry = 0;
}

public function disallowRetry(): void
{
$this->retry = -1;
}

public function isRetryDisallowed(): bool
{
return $this->retry === -1;
}
}
8 changes: 7 additions & 1 deletion src/Projection/Projection/Store/DoctrineStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
* position: int,
* status: string,
* error_message: string|null,
* error_object: string|null
* error_object: string|null,
* retry: int,
* }
*/
final class DoctrineStore implements ProjectionStore, SchemaConfigurator
Expand Down Expand Up @@ -84,6 +85,7 @@ private function createProjection(array $row): Projection
$row['error_message'],
ErrorSerializer::unserialize($row['error_object']),
) : null,
$row['retry'],
);
}

Expand All @@ -102,6 +104,7 @@ function (Connection $connection) use ($projections): void {
'status' => $projection->status()->value,
'error_message' => $projection->projectionError()?->errorMessage,
'error_object' => $errorObject,
'retry' => $projection->retry(),
],
[
'name' => $projection->id()->name(),
Expand All @@ -122,6 +125,7 @@ function (Connection $connection) use ($projections): void {
'status' => $projection->status()->value,
'error_message' => $projection->projectionError()?->errorMessage,
'error_object' => $errorObject,
'retry' => $projection->retry(),
],
);
}
Expand Down Expand Up @@ -160,6 +164,8 @@ public function configureSchema(Schema $schema, Connection $connection): void
->setNotnull(false);
$table->addColumn('error_object', Types::BLOB)
->setNotnull(false);
$table->addColumn('retry', Types::INTEGER)
->setNotnull(true);

$table->setPrimaryKey(['name', 'version']);
}
Expand Down
132 changes: 84 additions & 48 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

final class DefaultProjectionist implements Projectionist
{
private const RETRY_LIMIT = 5;

/** @var array<string, Projector>|null */
private array|null $projectors = null;

Expand Down Expand Up @@ -89,16 +91,17 @@ public function boot(
$e->getMessage(),
));

$projection->error(ProjectionError::fromThrowable($e));
$this->projectionStore->save($projection);

if ($throwByError) {
throw new ProjectionistError(
$projector::class,
$projection->id(),
$e,
);
}

$projection->error(ProjectionError::fromThrowable($e));
$projection->disallowRetry();
$this->projectionStore->save($projection);
}
}

Expand Down Expand Up @@ -162,22 +165,10 @@ public function run(
int|null $limit = null,
bool $throwByError = false,
): void {
$projections = $this->projections()
->filterByProjectionStatus(ProjectionStatus::Active)
->filterByCriteria($criteria);

foreach ($projections as $projection) {
$projector = $this->projector($projection->id());

if ($projector) {
continue;
}

$projection->outdated();
$this->projectionStore->save($projection);
$projections = $this->projections()->filterByCriteria($criteria);

$this->logger?->info(sprintf('projection "%s" has been marked as outdated', $projection->id()->toString()));
}
$this->handleOutdatedProjections($projections);
$this->handleRetryProjections($projections);

$projections = $projections->filterByProjectionStatus(ProjectionStatus::Active);

Expand Down Expand Up @@ -379,44 +370,51 @@ private function handleMessage(Message $message, Projection $projection, bool $t

$subscribeMethod = $this->projectorResolver->resolveSubscribeMethod($projector, $message);

if ($subscribeMethod) {
try {
$subscribeMethod($message);
if (!$subscribeMethod) {
$projection->incrementPosition();
$this->projectionStore->save($projection);

$this->logger?->debug(
sprintf(
'projector "%s" for "%s" processed the event "%s"',
$projector::class,
$projection->id()->toString(),
$message->event()::class,
),
);
} catch (Throwable $e) {
$this->logger?->error(
sprintf(
'projector "%s" for "%s" could not process the event: %s',
$projector::class,
$projection->id()->toString(),
$e->getMessage(),
),
);
return;
}

$projection->error(ProjectionError::fromThrowable($e));
$this->projectionStore->save($projection);
try {
$subscribeMethod($message);

if ($throwByError) {
throw new ProjectionistError(
$projector::class,
$projection->id(),
$e,
);
}
$this->logger?->debug(
sprintf(
'projector "%s" for "%s" processed the event "%s"',
$projector::class,
$projection->id()->toString(),
$message->event()::class,
),
);
} catch (Throwable $e) {
$this->logger?->error(
sprintf(
'projector "%s" for "%s" could not process the event: %s',
$projector::class,
$projection->id()->toString(),
$e->getMessage(),
),
);

return;
if ($throwByError) {
throw new ProjectionistError(
$projector::class,
$projection->id(),
$e,
);
}

$projection->error(ProjectionError::fromThrowable($e));
$projection->incrementRetry();
$this->projectionStore->save($projection);

return;
}

$projection->incrementPosition();
$projection->resetRetry();
$this->projectionStore->save($projection);
}

Expand All @@ -442,4 +440,42 @@ private function projectors(): array

return $this->projectors;
}

private function handleOutdatedProjections(ProjectionCollection $projections): void
{
foreach ($projections as $projection) {
if ($projection->isRetryDisallowed()) {
continue;
}

if (!$projection->isActive() && !$projection->isError()) {
continue;
}

$projector = $this->projector($projection->id());

if ($projector) {
continue;
}

$projection->outdated();
$this->projectionStore->save($projection);

$this->logger?->info(sprintf('projection "%s" has been marked as outdated', $projection->id()->toString()));
}
}

private function handleRetryProjections(ProjectionCollection $projections): void
{
foreach ($projections->filterByProjectionStatus(ProjectionStatus::Error) as $projection) {
if ($projection->retry() >= self::RETRY_LIMIT) {
continue;
}

$projection->active();
$this->projectionStore->save($projection);

$this->logger?->info(sprintf('retry projection "%s"', $projection->id()->toString()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public function create(): void
ProjectionStatus::Error,
0,
new ProjectionError('ERROR', $projector->exception),
-1,
),
],
$projectionStore->savedProjections,
Expand Down Expand Up @@ -468,6 +469,7 @@ public function handle(Message $message): void
ProjectionStatus::Error,
0,
new ProjectionError('ERROR', $projector->exception),
1,
),
],
$projectionStore->savedProjections,
Expand Down

0 comments on commit 256b66c

Please sign in to comment.