diff --git a/phpcs.xml.dist b/phpcs.xml.dist
index e09648280..b6748a932 100644
--- a/phpcs.xml.dist
+++ b/phpcs.xml.dist
@@ -13,5 +13,6 @@
+
diff --git a/src/Console/Command/ProjectionStatusCommand.php b/src/Console/Command/ProjectionStatusCommand.php
index 2746f85ff..e07d7c581 100644
--- a/src/Console/Command/ProjectionStatusCommand.php
+++ b/src/Console/Command/ProjectionStatusCommand.php
@@ -57,7 +57,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$projection->id()->version(),
$projection->position(),
$projection->status()->value,
- $projection->errorMessage(),
+ $projection->projectionError()?->errorMessage,
],
[...$projections],
),
@@ -84,12 +84,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$projection->id()->version(),
$projection->position(),
$projection->status()->value,
- $projection->errorMessage(),
+ $projection->projectionError()?->errorMessage,
],
],
);
- $errorObject = $projection->errorObject();
+ $errorObject = $projection->projectionError()?->errorObject;
if ($errorObject instanceof Throwable) {
$io->throwable($errorObject);
diff --git a/src/Projection/Projection/Projection.php b/src/Projection/Projection/Projection.php
index 6ac6e4b8a..16a181eb3 100644
--- a/src/Projection/Projection/Projection.php
+++ b/src/Projection/Projection/Projection.php
@@ -4,16 +4,14 @@
namespace Patchlevel\EventSourcing\Projection\Projection;
-use Throwable;
-
final class Projection
{
public function __construct(
private readonly ProjectionId $id,
private ProjectionStatus $status = ProjectionStatus::New,
private int $position = 0,
- private string|null $errorMessage = null,
- private Throwable|null $errorObject = null,
+ private ProjectionError|null $error = null,
+ private int $retry = 0,
) {
}
@@ -32,14 +30,9 @@ public function position(): int
return $this->position;
}
- public function errorMessage(): string|null
+ public function projectionError(): ProjectionError|null
{
- return $this->errorMessage;
- }
-
- public function errorObject(): Throwable|null
- {
- return $this->errorObject;
+ return $this->error;
}
public function incrementPosition(): void
@@ -55,8 +48,7 @@ public function isNew(): bool
public function booting(): void
{
$this->status = ProjectionStatus::Booting;
- $this->errorMessage = null;
- $this->errorObject = null;
+ $this->error = null;
}
public function isBooting(): bool
@@ -67,8 +59,7 @@ public function isBooting(): bool
public function active(): void
{
$this->status = ProjectionStatus::Active;
- $this->errorMessage = null;
- $this->errorObject = null;
+ $this->error = null;
}
public function isActive(): bool
@@ -86,15 +77,39 @@ public function isOutdated(): bool
return $this->status === ProjectionStatus::Outdated;
}
- public function error(Throwable|string|null $error = null): void
+ public function error(ProjectionError|null $error = null): void
{
$this->status = ProjectionStatus::Error;
- $this->errorMessage = $error instanceof Throwable ? $error->getMessage() : $error;
- $this->errorObject = $error instanceof Throwable ? $error : null;
+ $this->error = $error;
}
public function isError(): bool
{
return $this->status === ProjectionStatus::Error;
}
+
+ public function incrementRetry(): void
+ {
+ $this->retry++;
+ }
+
+ public function retry(): int
+ {
+ return $this->retry;
+ }
+
+ public function resetRetry(): void
+ {
+ $this->retry = 0;
+ }
+
+ public function disallowRetry(): void
+ {
+ $this->retry = -1;
+ }
+
+ public function isRetryDisallowed(): bool
+ {
+ return $this->retry === -1;
+ }
}
diff --git a/src/Projection/Projection/ProjectionError.php b/src/Projection/Projection/ProjectionError.php
new file mode 100644
index 000000000..beba99847
--- /dev/null
+++ b/src/Projection/Projection/ProjectionError.php
@@ -0,0 +1,21 @@
+getMessage(), $error);
+ }
+}
diff --git a/src/Projection/Projection/Store/DoctrineStore.php b/src/Projection/Projection/Store/DoctrineStore.php
index 9a01ff879..2b03090f7 100644
--- a/src/Projection/Projection/Store/DoctrineStore.php
+++ b/src/Projection/Projection/Store/DoctrineStore.php
@@ -9,6 +9,7 @@
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCollection;
+use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
@@ -16,6 +17,16 @@
use function array_map;
+/** @psalm-type Data = array{
+ * name: string,
+ * version: int,
+ * position: int,
+ * status: string,
+ * error_message: string|null,
+ * error_object: string|null,
+ * retry: int,
+ * }
+ */
final class DoctrineStore implements ProjectionStore, SchemaConfigurator
{
public function __construct(
@@ -32,7 +43,7 @@ public function get(ProjectionId $projectionId): Projection
->where('name = :name AND version = :version')
->getSQL();
- /** @var array{name: string, version: int, position: int, status: string, error_message: string|null}|false $result */
+ /** @var Data|false $result */
$result = $this->connection->fetchAssociative($sql, [
'name' => $projectionId->name(),
'version' => $projectionId->version(),
@@ -42,12 +53,7 @@ public function get(ProjectionId $projectionId): Projection
throw new ProjectionNotFound($projectionId);
}
- return new Projection(
- $projectionId,
- ProjectionStatus::from($result['status']),
- $result['position'],
- $result['error_message'],
- );
+ return $this->createProjection($result);
}
public function all(): ProjectionCollection
@@ -57,31 +63,38 @@ public function all(): ProjectionCollection
->from($this->projectionTable)
->getSQL();
- /** @var list $result */
+ /** @var list $result */
$result = $this->connection->fetchAllAssociative($sql);
return new ProjectionCollection(
array_map(
- static function (array $data) {
- return new Projection(
- new ProjectionId($data['name'], $data['version']),
- ProjectionStatus::from($data['status']),
- $data['position'],
- $data['error_message'],
- ErrorSerializer::unserialize($data['error_object']),
- );
- },
+ fn (array $data) => $this->createProjection($data),
$result,
),
);
}
+ /** @param Data $row */
+ private function createProjection(array $row): Projection
+ {
+ return new Projection(
+ new ProjectionId($row['name'], $row['version']),
+ ProjectionStatus::from($row['status']),
+ $row['position'],
+ $row['error_message'] ? new ProjectionError(
+ $row['error_message'],
+ ErrorSerializer::unserialize($row['error_object']),
+ ) : null,
+ $row['retry'],
+ );
+ }
+
public function save(Projection ...$projections): void
{
$this->connection->transactional(
function (Connection $connection) use ($projections): void {
foreach ($projections as $projection) {
- $errorObject = ErrorSerializer::serialize($projection->errorObject());
+ $errorObject = ErrorSerializer::serialize($projection->projectionError()?->errorObject);
try {
$effectedRows = (int)$connection->update(
@@ -89,8 +102,9 @@ function (Connection $connection) use ($projections): void {
[
'position' => $projection->position(),
'status' => $projection->status()->value,
- 'error_message' => $projection->errorMessage(),
+ 'error_message' => $projection->projectionError()?->errorMessage,
'error_object' => $errorObject,
+ 'retry' => $projection->retry(),
],
[
'name' => $projection->id()->name(),
@@ -109,8 +123,9 @@ function (Connection $connection) use ($projections): void {
'version' => $projection->id()->version(),
'position' => $projection->position(),
'status' => $projection->status()->value,
- 'error_message' => $projection->errorMessage(),
+ 'error_message' => $projection->projectionError()?->errorMessage,
'error_object' => $errorObject,
+ 'retry' => $projection->retry(),
],
);
}
@@ -149,6 +164,8 @@ public function configureSchema(Schema $schema, Connection $connection): void
->setNotnull(false);
$table->addColumn('error_object', Types::BLOB)
->setNotnull(false);
+ $table->addColumn('retry', Types::INTEGER)
+ ->setNotnull(true);
$table->setPrimaryKey(['name', 'version']);
}
diff --git a/src/Projection/Projection/Store/ErrorSerializer.php b/src/Projection/Projection/Store/ErrorSerializer.php
index 28a74a19c..523af671b 100644
--- a/src/Projection/Projection/Store/ErrorSerializer.php
+++ b/src/Projection/Projection/Store/ErrorSerializer.php
@@ -18,7 +18,7 @@ public static function serialize(Throwable|null $error): string|null
}
try {
- return serialize($error);
+ return @serialize($error);
} catch (Throwable) {
return null;
}
diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php
index 2b3c340c9..cadabc914 100644
--- a/src/Projection/Projectionist/DefaultProjectionist.php
+++ b/src/Projection/Projectionist/DefaultProjectionist.php
@@ -8,6 +8,7 @@
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCollection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
+use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore;
@@ -23,6 +24,8 @@
final class DefaultProjectionist implements Projectionist
{
+ private const RETRY_LIMIT = 5;
+
/** @var array|null */
private array|null $projectors = null;
@@ -87,9 +90,6 @@ public function boot(
$e->getMessage(),
));
- $projection->error($e);
- $this->projectionStore->save($projection);
-
if ($throwByError) {
throw new ProjectionistError(
$projector::class,
@@ -97,6 +97,10 @@ public function boot(
$e,
);
}
+
+ $projection->error(ProjectionError::fromThrowable($e));
+ $projection->disallowRetry();
+ $this->projectionStore->save($projection);
}
}
@@ -160,22 +164,10 @@ public function run(
int|null $limit = null,
bool $throwByError = false,
): void {
- $projections = $this->projections()
- ->filterByProjectionStatus(ProjectionStatus::Active)
- ->filterByCriteria($criteria);
-
- foreach ($projections as $projection) {
- $projector = $this->projector($projection->id());
-
- if ($projector) {
- continue;
- }
-
- $projection->outdated();
- $this->projectionStore->save($projection);
+ $projections = $this->projections()->filterByCriteria($criteria);
- $this->logger?->info(sprintf('projection "%s" has been marked as outdated', $projection->id()->toString()));
- }
+ $this->handleOutdatedProjections($projections);
+ $this->handleRetryProjections($projections);
$projections = $projections->filterByProjectionStatus(ProjectionStatus::Active);
@@ -378,44 +370,51 @@ private function handleMessage(Message $message, Projection $projection, bool $t
$subscribeMethod = $this->projectorResolver->resolveSubscribeMethod($projector, $message);
- if ($subscribeMethod) {
- try {
- $subscribeMethod($message);
+ if (!$subscribeMethod) {
+ $projection->incrementPosition();
+ $this->projectionStore->save($projection);
- $this->logger?->debug(
- sprintf(
- 'projector "%s" for "%s" processed the event "%s"',
- $projector::class,
- $projection->id()->toString(),
- $message->event()::class,
- ),
- );
- } catch (Throwable $e) {
- $this->logger?->error(
- sprintf(
- 'projector "%s" for "%s" could not process the event: %s',
- $projector::class,
- $projection->id()->toString(),
- $e->getMessage(),
- ),
- );
+ return;
+ }
- $projection->error($e);
- $this->projectionStore->save($projection);
+ try {
+ $subscribeMethod($message);
- if ($throwByError) {
- throw new ProjectionistError(
- $projector::class,
- $projection->id(),
- $e,
- );
- }
+ $this->logger?->debug(
+ sprintf(
+ 'projector "%s" for "%s" processed the event "%s"',
+ $projector::class,
+ $projection->id()->toString(),
+ $message->event()::class,
+ ),
+ );
+ } catch (Throwable $e) {
+ $this->logger?->error(
+ sprintf(
+ 'projector "%s" for "%s" could not process the event: %s',
+ $projector::class,
+ $projection->id()->toString(),
+ $e->getMessage(),
+ ),
+ );
- return;
+ if ($throwByError) {
+ throw new ProjectionistError(
+ $projector::class,
+ $projection->id(),
+ $e,
+ );
}
+
+ $projection->error(ProjectionError::fromThrowable($e));
+ $projection->incrementRetry();
+ $this->projectionStore->save($projection);
+
+ return;
}
$projection->incrementPosition();
+ $projection->resetRetry();
$this->projectionStore->save($projection);
}
@@ -441,4 +440,42 @@ private function projectors(): array
return $this->projectors;
}
+
+ private function handleOutdatedProjections(ProjectionCollection $projections): void
+ {
+ foreach ($projections as $projection) {
+ if ($projection->isRetryDisallowed()) {
+ continue;
+ }
+
+ if (!$projection->isActive() && !$projection->isError()) {
+ continue;
+ }
+
+ $projector = $this->projector($projection->id());
+
+ if ($projector) {
+ continue;
+ }
+
+ $projection->outdated();
+ $this->projectionStore->save($projection);
+
+ $this->logger?->info(sprintf('projection "%s" has been marked as outdated', $projection->id()->toString()));
+ }
+ }
+
+ private function handleRetryProjections(ProjectionCollection $projections): void
+ {
+ foreach ($projections->filterByProjectionStatus(ProjectionStatus::Error) as $projection) {
+ if ($projection->retry() >= self::RETRY_LIMIT) {
+ continue;
+ }
+
+ $projection->active();
+ $this->projectionStore->save($projection);
+
+ $this->logger?->info(sprintf('retry projection "%s"', $projection->id()->toString()));
+ }
+ }
}
diff --git a/tests/Unit/Projection/Projection/ProjectionTest.php b/tests/Unit/Projection/Projection/ProjectionTest.php
index 6aab00acf..44468a60b 100644
--- a/tests/Unit/Projection/Projection/ProjectionTest.php
+++ b/tests/Unit/Projection/Projection/ProjectionTest.php
@@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Unit\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
+use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use PHPUnit\Framework\TestCase;
@@ -68,7 +69,7 @@ public function testError(): void
$exception = new RuntimeException('test');
- $projection->error($exception);
+ $projection->error(ProjectionError::fromThrowable($exception));
self::assertEquals(ProjectionStatus::Error, $projection->status());
self::assertFalse($projection->isNew());
@@ -76,8 +77,7 @@ public function testError(): void
self::assertFalse($projection->isActive());
self::assertTrue($projection->isError());
self::assertFalse($projection->isOutdated());
- self::assertEquals('test', $projection->errorMessage());
- self::assertEquals($exception, $projection->errorObject());
+ self::assertEquals(new ProjectionError('test', $exception), $projection->projectionError());
}
public function testOutdated(): void
diff --git a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php
index 4c7dbff67..c4497ea93 100644
--- a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php
+++ b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php
@@ -9,6 +9,7 @@
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCollection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
+use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore;
@@ -250,8 +251,8 @@ public function create(): void
$projectionId,
ProjectionStatus::Error,
0,
- 'ERROR',
- $projector->exception,
+ new ProjectionError('ERROR', $projector->exception),
+ -1,
),
],
$projectionStore->savedProjections,
@@ -452,8 +453,8 @@ public function handle(Message $message): void
$projectionId,
ProjectionStatus::Error,
0,
- 'ERROR',
- $projector->exception,
+ new ProjectionError('ERROR', $projector->exception),
+ 1,
),
],
$projectionStore->savedProjections,