Skip to content

Commit

Permalink
Merge pull request #449 from patchlevel/better-projection-error-handling
Browse files Browse the repository at this point in the history
better projection error handling
  • Loading branch information
DavidBadura authored Jan 2, 2024
2 parents 7c60a1c + cc91df8 commit 4b966ea
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 97 deletions.
1 change: 1 addition & 0 deletions phpcs.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@

<rule ref="PatchlevelCodingStandard">
<exclude name="Generic.Files.LineLength.TooLong"/>
<exclude name="SlevomatCodingStandard.PHP.RequireExplicitAssertion.RequiredExplicitAssertion"/>
</rule>
</ruleset>
6 changes: 3 additions & 3 deletions src/Console/Command/ProjectionStatusCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$projection->id()->version(),
$projection->position(),
$projection->status()->value,
$projection->errorMessage(),
$projection->projectionError()?->errorMessage,
],
[...$projections],
),
Expand All @@ -84,12 +84,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$projection->id()->version(),
$projection->position(),
$projection->status()->value,
$projection->errorMessage(),
$projection->projectionError()?->errorMessage,
],
],
);

$errorObject = $projection->errorObject();
$errorObject = $projection->projectionError()?->errorObject;

if ($errorObject instanceof Throwable) {
$io->throwable($errorObject);
Expand Down
51 changes: 33 additions & 18 deletions src/Projection/Projection/Projection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@

namespace Patchlevel\EventSourcing\Projection\Projection;

use Throwable;

final class Projection
{
public function __construct(
private readonly ProjectionId $id,
private ProjectionStatus $status = ProjectionStatus::New,
private int $position = 0,
private string|null $errorMessage = null,
private Throwable|null $errorObject = null,
private ProjectionError|null $error = null,
private int $retry = 0,
) {
}

Expand All @@ -32,14 +30,9 @@ public function position(): int
return $this->position;
}

public function errorMessage(): string|null
public function projectionError(): ProjectionError|null
{
return $this->errorMessage;
}

public function errorObject(): Throwable|null
{
return $this->errorObject;
return $this->error;
}

public function incrementPosition(): void
Expand All @@ -55,8 +48,7 @@ public function isNew(): bool
public function booting(): void
{
$this->status = ProjectionStatus::Booting;
$this->errorMessage = null;
$this->errorObject = null;
$this->error = null;
}

public function isBooting(): bool
Expand All @@ -67,8 +59,7 @@ public function isBooting(): bool
public function active(): void
{
$this->status = ProjectionStatus::Active;
$this->errorMessage = null;
$this->errorObject = null;
$this->error = null;
}

public function isActive(): bool
Expand All @@ -86,15 +77,39 @@ public function isOutdated(): bool
return $this->status === ProjectionStatus::Outdated;
}

public function error(Throwable|string|null $error = null): void
public function error(ProjectionError|null $error = null): void
{
$this->status = ProjectionStatus::Error;
$this->errorMessage = $error instanceof Throwable ? $error->getMessage() : $error;
$this->errorObject = $error instanceof Throwable ? $error : null;
$this->error = $error;
}

public function isError(): bool
{
return $this->status === ProjectionStatus::Error;
}

public function incrementRetry(): void
{
$this->retry++;
}

public function retry(): int
{
return $this->retry;
}

public function resetRetry(): void
{
$this->retry = 0;
}

public function disallowRetry(): void
{
$this->retry = -1;
}

public function isRetryDisallowed(): bool
{
return $this->retry === -1;
}
}
21 changes: 21 additions & 0 deletions src/Projection/Projection/ProjectionError.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projection;

use Throwable;

final class ProjectionError
{
public function __construct(
public readonly string $errorMessage,
public readonly Throwable|null $errorObject = null,
) {
}

public static function fromThrowable(Throwable $error): self
{
return new self($error->getMessage(), $error);
}
}
57 changes: 37 additions & 20 deletions src/Projection/Projection/Store/DoctrineStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,24 @@
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCollection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Schema\SchemaConfigurator;

use function array_map;

/** @psalm-type Data = array{
* name: string,
* version: int,
* position: int,
* status: string,
* error_message: string|null,
* error_object: string|null,
* retry: int,
* }
*/
final class DoctrineStore implements ProjectionStore, SchemaConfigurator
{
public function __construct(
Expand All @@ -32,7 +43,7 @@ public function get(ProjectionId $projectionId): Projection
->where('name = :name AND version = :version')
->getSQL();

/** @var array{name: string, version: int, position: int, status: string, error_message: string|null}|false $result */
/** @var Data|false $result */
$result = $this->connection->fetchAssociative($sql, [
'name' => $projectionId->name(),
'version' => $projectionId->version(),
Expand All @@ -42,12 +53,7 @@ public function get(ProjectionId $projectionId): Projection
throw new ProjectionNotFound($projectionId);
}

return new Projection(
$projectionId,
ProjectionStatus::from($result['status']),
$result['position'],
$result['error_message'],
);
return $this->createProjection($result);
}

public function all(): ProjectionCollection
Expand All @@ -57,40 +63,48 @@ public function all(): ProjectionCollection
->from($this->projectionTable)
->getSQL();

/** @var list<array{name: string, version: int, position: int, status: string, error_message: string|null, error_object: string|null}> $result */
/** @var list<Data> $result */
$result = $this->connection->fetchAllAssociative($sql);

return new ProjectionCollection(
array_map(
static function (array $data) {
return new Projection(
new ProjectionId($data['name'], $data['version']),
ProjectionStatus::from($data['status']),
$data['position'],
$data['error_message'],
ErrorSerializer::unserialize($data['error_object']),
);
},
fn (array $data) => $this->createProjection($data),
$result,
),
);
}

/** @param Data $row */
private function createProjection(array $row): Projection
{
return new Projection(
new ProjectionId($row['name'], $row['version']),
ProjectionStatus::from($row['status']),
$row['position'],
$row['error_message'] ? new ProjectionError(
$row['error_message'],
ErrorSerializer::unserialize($row['error_object']),
) : null,
$row['retry'],
);
}

public function save(Projection ...$projections): void
{
$this->connection->transactional(
function (Connection $connection) use ($projections): void {
foreach ($projections as $projection) {
$errorObject = ErrorSerializer::serialize($projection->errorObject());
$errorObject = ErrorSerializer::serialize($projection->projectionError()?->errorObject);

try {
$effectedRows = (int)$connection->update(
$this->projectionTable,
[
'position' => $projection->position(),
'status' => $projection->status()->value,
'error_message' => $projection->errorMessage(),
'error_message' => $projection->projectionError()?->errorMessage,
'error_object' => $errorObject,
'retry' => $projection->retry(),
],
[
'name' => $projection->id()->name(),
Expand All @@ -109,8 +123,9 @@ function (Connection $connection) use ($projections): void {
'version' => $projection->id()->version(),
'position' => $projection->position(),
'status' => $projection->status()->value,
'error_message' => $projection->errorMessage(),
'error_message' => $projection->projectionError()?->errorMessage,
'error_object' => $errorObject,
'retry' => $projection->retry(),
],
);
}
Expand Down Expand Up @@ -149,6 +164,8 @@ public function configureSchema(Schema $schema, Connection $connection): void
->setNotnull(false);
$table->addColumn('error_object', Types::BLOB)
->setNotnull(false);
$table->addColumn('retry', Types::INTEGER)
->setNotnull(true);

$table->setPrimaryKey(['name', 'version']);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Projection/Projection/Store/ErrorSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static function serialize(Throwable|null $error): string|null
}

try {
return serialize($error);
return @serialize($error);
} catch (Throwable) {
return null;
}
Expand Down
Loading

0 comments on commit 4b966ea

Please sign in to comment.