Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better projection error handling #449

Merged
merged 3 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions phpcs.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@

<rule ref="PatchlevelCodingStandard">
<exclude name="Generic.Files.LineLength.TooLong"/>
<exclude name="SlevomatCodingStandard.PHP.RequireExplicitAssertion.RequiredExplicitAssertion"/>
</rule>
</ruleset>
6 changes: 3 additions & 3 deletions src/Console/Command/ProjectionStatusCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$projection->id()->version(),
$projection->position(),
$projection->status()->value,
$projection->errorMessage(),
$projection->projectionError()?->errorMessage,
],
[...$projections],
),
Expand All @@ -84,12 +84,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$projection->id()->version(),
$projection->position(),
$projection->status()->value,
$projection->errorMessage(),
$projection->projectionError()?->errorMessage,
],
],
);

$errorObject = $projection->errorObject();
$errorObject = $projection->projectionError()?->errorObject;

if ($errorObject instanceof Throwable) {
$io->throwable($errorObject);
Expand Down
51 changes: 33 additions & 18 deletions src/Projection/Projection/Projection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@

namespace Patchlevel\EventSourcing\Projection\Projection;

use Throwable;

final class Projection
{
public function __construct(
private readonly ProjectionId $id,
private ProjectionStatus $status = ProjectionStatus::New,
private int $position = 0,
private string|null $errorMessage = null,
private Throwable|null $errorObject = null,
private ProjectionError|null $error = null,
private int $retry = 0,
) {
}

Expand All @@ -32,14 +30,9 @@ public function position(): int
return $this->position;
}

public function errorMessage(): string|null
public function projectionError(): ProjectionError|null
{
return $this->errorMessage;
}

public function errorObject(): Throwable|null
{
return $this->errorObject;
return $this->error;
}

public function incrementPosition(): void
Expand All @@ -55,8 +48,7 @@ public function isNew(): bool
public function booting(): void
{
$this->status = ProjectionStatus::Booting;
$this->errorMessage = null;
$this->errorObject = null;
$this->error = null;
}

public function isBooting(): bool
Expand All @@ -67,8 +59,7 @@ public function isBooting(): bool
public function active(): void
{
$this->status = ProjectionStatus::Active;
$this->errorMessage = null;
$this->errorObject = null;
$this->error = null;
}

public function isActive(): bool
Expand All @@ -86,15 +77,39 @@ public function isOutdated(): bool
return $this->status === ProjectionStatus::Outdated;
}

public function error(Throwable|string|null $error = null): void
public function error(ProjectionError|null $error = null): void
{
$this->status = ProjectionStatus::Error;
$this->errorMessage = $error instanceof Throwable ? $error->getMessage() : $error;
$this->errorObject = $error instanceof Throwable ? $error : null;
$this->error = $error;
}

public function isError(): bool
{
return $this->status === ProjectionStatus::Error;
}

public function incrementRetry(): void
{
$this->retry++;
}

public function retry(): int
{
return $this->retry;
}

public function resetRetry(): void
{
$this->retry = 0;
}

public function disallowRetry(): void
{
$this->retry = -1;
}

public function isRetryDisallowed(): bool
{
return $this->retry === -1;
}
}
21 changes: 21 additions & 0 deletions src/Projection/Projection/ProjectionError.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projection;

use Throwable;

final class ProjectionError
{
public function __construct(
public readonly string $errorMessage,
public readonly Throwable|null $errorObject = null,
) {
}

public static function fromThrowable(Throwable $error): self
{
return new self($error->getMessage(), $error);
}
}
57 changes: 37 additions & 20 deletions src/Projection/Projection/Store/DoctrineStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,24 @@
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCollection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Schema\SchemaConfigurator;

use function array_map;

/** @psalm-type Data = array{
* name: string,
* version: int,
* position: int,
* status: string,
* error_message: string|null,
* error_object: string|null,
* retry: int,
* }
*/
final class DoctrineStore implements ProjectionStore, SchemaConfigurator
{
public function __construct(
Expand All @@ -32,7 +43,7 @@ public function get(ProjectionId $projectionId): Projection
->where('name = :name AND version = :version')
->getSQL();

/** @var array{name: string, version: int, position: int, status: string, error_message: string|null}|false $result */
/** @var Data|false $result */
$result = $this->connection->fetchAssociative($sql, [
'name' => $projectionId->name(),
'version' => $projectionId->version(),
Expand All @@ -42,12 +53,7 @@ public function get(ProjectionId $projectionId): Projection
throw new ProjectionNotFound($projectionId);
}

return new Projection(
$projectionId,
ProjectionStatus::from($result['status']),
$result['position'],
$result['error_message'],
);
return $this->createProjection($result);
}

public function all(): ProjectionCollection
Expand All @@ -57,40 +63,48 @@ public function all(): ProjectionCollection
->from($this->projectionTable)
->getSQL();

/** @var list<array{name: string, version: int, position: int, status: string, error_message: string|null, error_object: string|null}> $result */
/** @var list<Data> $result */
$result = $this->connection->fetchAllAssociative($sql);

return new ProjectionCollection(
array_map(
static function (array $data) {
return new Projection(
new ProjectionId($data['name'], $data['version']),
ProjectionStatus::from($data['status']),
$data['position'],
$data['error_message'],
ErrorSerializer::unserialize($data['error_object']),
);
},
fn (array $data) => $this->createProjection($data),
$result,
),
);
}

/** @param Data $row */
private function createProjection(array $row): Projection
{
return new Projection(
new ProjectionId($row['name'], $row['version']),
ProjectionStatus::from($row['status']),
$row['position'],
$row['error_message'] ? new ProjectionError(
$row['error_message'],
ErrorSerializer::unserialize($row['error_object']),
) : null,
$row['retry'],
);
}

public function save(Projection ...$projections): void
{
$this->connection->transactional(
function (Connection $connection) use ($projections): void {
foreach ($projections as $projection) {
$errorObject = ErrorSerializer::serialize($projection->errorObject());
$errorObject = ErrorSerializer::serialize($projection->projectionError()?->errorObject);

try {
$effectedRows = (int)$connection->update(
$this->projectionTable,
[
'position' => $projection->position(),
'status' => $projection->status()->value,
'error_message' => $projection->errorMessage(),
'error_message' => $projection->projectionError()?->errorMessage,
'error_object' => $errorObject,
'retry' => $projection->retry(),
],
[
'name' => $projection->id()->name(),
Expand All @@ -109,8 +123,9 @@ function (Connection $connection) use ($projections): void {
'version' => $projection->id()->version(),
'position' => $projection->position(),
'status' => $projection->status()->value,
'error_message' => $projection->errorMessage(),
'error_message' => $projection->projectionError()?->errorMessage,
'error_object' => $errorObject,
'retry' => $projection->retry(),
],
);
}
Expand Down Expand Up @@ -149,6 +164,8 @@ public function configureSchema(Schema $schema, Connection $connection): void
->setNotnull(false);
$table->addColumn('error_object', Types::BLOB)
->setNotnull(false);
$table->addColumn('retry', Types::INTEGER)
->setNotnull(true);

$table->setPrimaryKey(['name', 'version']);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Projection/Projection/Store/ErrorSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static function serialize(Throwable|null $error): string|null
}

try {
return serialize($error);
return @serialize($error);
} catch (Throwable) {
return null;
}
Expand Down
Loading
Loading