Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

overhaul error handling in projectionist #504

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,9 @@
<code><![CDATA[next]]></code>
</InvalidMethodCall>
</file>
<file src="tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php">
<PossiblyUndefinedArrayOffset>
<code><![CDATA[$update1]]></code>
</PossiblyUndefinedArrayOffset>
</file>
</files>
55 changes: 47 additions & 8 deletions docs/pages/projection.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,17 @@ stateDiagram-v2
direction LR
[*] --> New
New --> Booting
New --> Error
Booting --> Active
Booting --> Finished
Booting --> Error
Active --> Finished
Active --> Outdated
Active --> Error
Active --> Finished
Finished --> Active
Finished --> Outdated
Error --> New
Error --> Booting
Error --> Active
Error --> [*]
Outdated --> Active
Expand Down Expand Up @@ -356,10 +361,11 @@ These projections have a projector, follow the event stream and should be up-to-

A projection is finished if the projector has the mode `RunMode::Once`.
This means that the projection is only run once and then set to finished if it reaches the end of the event stream.
You can also reactivate the projection if you want so that it continues.

### Outdated

If a projection exists in the projection store
If an active or finished projection exists in the projection store
that does not have a projector in the source code with a corresponding projector ID,
then this projection is marked as outdated.
This happens when either the projector has been deleted
Expand All @@ -377,10 +383,16 @@ There are two options to reactivate the projection:
### Error

If an error occurs in a projector, then the target projection is set to Error.
This projection will then no longer run until the projection is activated again.
This can happen in the create process, in the boot process or in the run process.
This projection will then no longer boot/run until the projection is reactivate or retried.

The projectionist has a retry strategy to retry projections that have failed.
It tries to reactivate the projection after a certain time and a certain number of attempts.
If this does not work, the projection is set to error and must be manually reactivated.

There are two options here:

* Reactivate the projection, so that the projection is active again.
* Reactivate the projection, so that the projection is in the previous state again.
* Remove the projection and rebuild it from scratch.

## Setup
Expand Down Expand Up @@ -420,19 +432,43 @@ $schemaDirector = new DoctrineSchemaDirector(

You can find more about schema configurator [here](./store.md)

### Retry Strategy

The projectionist uses a retry strategy to retry projections that have failed.
Our default strategy can be configured with the following parameters:

* `baseDelay` - The base delay in seconds.
* `delayFactor` - The factor by which the delay is multiplied after each attempt.
* `maxAttempts` - The maximum number of attempts.

```php
use Patchlevel\EventSourcing\Projection\RetryStrategy\ClockBasedRetryStrategy;

$retryStrategy = new ClockBasedRetryStrategy(
baseDelay: 5,
delayFactor: 2,
maxAttempts: 5,
);
```

!!! tip

You can reactivate the projection manually or remove it and rebuild it from scratch.

### Projectionist

Now we can create the projectionist and plug together the necessary services.
The event store is needed to load the events, the Projection Store to store the projection state
and the respective projectors.
and the respective projectors. Optionally, we can also pass a retry strategy.

```php
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;

$projectionist = new DefaultProjectionist(
$eventStore,
$projectionStore,
[$projector1, $projector2, $projector3]
[$projector1, $projector2, $projector3],
$retryStrategy,
);
```

Expand Down Expand Up @@ -512,6 +548,9 @@ foreach ($projections as $projection) {
}
```

!!! note
## Learn more

There are also [cli commands](./cli.md) for all commands.
* [How to use CLI commands](./cli.md)
* [How to use Pipeline](./pipeline.md)
* [How to use Event Bus](./event_bus.md)
* [How to Test](./testing.md)
2 changes: 1 addition & 1 deletion phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ parameters:
path: src/EventBus/Message.php

-
message: "#^Parameter \\#2 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Projection\\\\Projection\\\\ProjectionError constructor expects array\\<int, array\\{class\\: class\\-string, message\\: string, code\\: int\\|string, file\\: string, line\\: int, trace\\: array\\<int, array\\{file\\?\\: string, line\\?\\: int, function\\?\\: string, class\\?\\: string, type\\?\\: string, args\\?\\: array\\}\\>\\}\\>\\|null, mixed given\\.$#"
message: "#^Parameter \\#3 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Projection\\\\Projection\\\\ProjectionError constructor expects array\\<int, array\\{class\\: class\\-string, message\\: string, code\\: int\\|string, file\\: string, line\\: int, trace\\: array\\<int, array\\{file\\?\\: string, line\\?\\: int, function\\?\\: string, class\\?\\: string, type\\?\\: string, args\\?\\: array\\}\\>\\}\\>\\|null, mixed given\\.$#"
count: 1
path: src/Projection/Projection/Store/DoctrineStore.php

Expand Down
9 changes: 1 addition & 8 deletions src/Console/Command/ProjectionBootCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,15 @@ public function configure(): void
null,
InputOption::VALUE_REQUIRED,
'How many messages should be consumed in one run',
)
->addOption(
'throw-by-error',
null,
InputOption::VALUE_NONE,
'throw exception by error',
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$limit = InputHelper::nullablePositiveInt($input->getOption('limit'));
$throwByError = InputHelper::bool($input->getOption('throw-by-error'));

$criteria = $this->projectionCriteria($input);
$this->projectionist->boot($criteria, $limit, $throwByError);
$this->projectionist->boot($criteria, $limit);

return 0;
}
Expand Down
19 changes: 1 addition & 18 deletions src/Console/Command/ProjectionRebuildCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

#[AsCommand(
Expand All @@ -17,19 +15,6 @@
)]
final class ProjectionRebuildCommand extends ProjectionCommand
{
public function configure(): void
{
parent::configure();

$this
->addOption(
'throw-by-error',
null,
InputOption::VALUE_NONE,
'throw exception by error',
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new OutputStyle($input, $output);
Expand All @@ -40,10 +25,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return 1;
}

$throwByError = InputHelper::bool($input->getOption('throw-by-error'));

$this->projectionist->remove($criteria);
$this->projectionist->boot($criteria, null, $throwByError);
$this->projectionist->boot($criteria, null);

return 0;
}
Expand Down
13 changes: 3 additions & 10 deletions src/Console/Command/ProjectionRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ protected function configure(): void
'How much time should elapse before the next job is executed in milliseconds',
1000,
)
->addOption(
'throw-by-error',
null,
InputOption::VALUE_NONE,
'throw exception by error',
)
->addOption(
'rebuild',
null,
Expand All @@ -76,16 +70,15 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$memoryLimit = InputHelper::nullableString($input->getOption('memory-limit'));
$timeLimit = InputHelper::nullablePositiveInt($input->getOption('time-limit'));
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$throwByError = InputHelper::bool($input->getOption('throw-by-error'));
$rebuild = InputHelper::bool($input->getOption('rebuild'));

$criteria = $this->projectionCriteria($input);

$logger = new ConsoleLogger($output);

$worker = DefaultWorker::create(
function () use ($criteria, $messageLimit, $throwByError): void {
$this->projectionist->run($criteria, $messageLimit, $throwByError);
function () use ($criteria, $messageLimit): void {
$this->projectionist->run($criteria, $messageLimit);
},
[
'runLimit' => $runLimit,
Expand All @@ -97,7 +90,7 @@ function () use ($criteria, $messageLimit, $throwByError): void {

if ($rebuild) {
$this->projectionist->remove($criteria);
$this->projectionist->boot($criteria, null, $throwByError);
$this->projectionist->boot($criteria);
}

$worker->run($sleep);
Expand Down
4 changes: 2 additions & 2 deletions src/Console/Command/ProjectionStatusCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound;
use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -18,7 +18,7 @@
use function is_array;
use function sprintf;

/** @psalm-import-type Context from ErrorContext */
/** @psalm-import-type Context from ProjectionError */
#[AsCommand(
'event-sourcing:projection:status',
'View the current status of the projections',
Expand Down
15 changes: 15 additions & 0 deletions src/Projection/Projection/NoErrorToRetry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projection;

use RuntimeException;

final class NoErrorToRetry extends RuntimeException
{
public function __construct()
{
parent::__construct('No error to retry');
}
}
48 changes: 36 additions & 12 deletions src/Projection/Projection/Projection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Patchlevel\EventSourcing\Projection\Projection;

use DateTimeImmutable;
use Throwable;

final class Projection
{
public const DEFAULT_GROUP = 'default';
Expand All @@ -15,7 +18,8 @@ public function __construct(
private ProjectionStatus $status = ProjectionStatus::New,
private int $position = 0,
private ProjectionError|null $error = null,
private int $retry = 0,
private int $retryAttempt = 0,
private DateTimeImmutable|null $lastSavedAt = null,
) {
}

Expand Down Expand Up @@ -54,6 +58,12 @@ public function changePosition(int $position): void
$this->position = $position;
}

public function new(): void
{
$this->status = ProjectionStatus::New;
$this->error = null;
}

public function isNew(): bool
{
return $this->status === ProjectionStatus::New;
Expand Down Expand Up @@ -102,39 +112,53 @@ public function isOutdated(): bool
return $this->status === ProjectionStatus::Outdated;
}

public function error(ProjectionError|null $error = null): void
public function error(Throwable|string $error): void
{
$previousStatus = $this->status;
$this->status = ProjectionStatus::Error;
$this->error = $error;

if ($error instanceof Throwable) {
$this->error = ProjectionError::fromThrowable($previousStatus, $error);

return;
}

$this->error = new ProjectionError($error, $previousStatus);
}

public function isError(): bool
{
return $this->status === ProjectionStatus::Error;
}

public function incrementRetry(): void
public function retryAttempt(): int
{
$this->retry++;
return $this->retryAttempt;
}

public function retry(): int
public function doRetry(): void
{
return $this->retry;
if ($this->error === null) {
throw new NoErrorToRetry();
}

$this->retryAttempt++;
$this->status = $this->error->previousStatus;
$this->error = null;
}

public function resetRetry(): void
{
$this->retry = 0;
$this->retryAttempt = 0;
}

public function disallowRetry(): void
public function lastSavedAt(): DateTimeImmutable|null
{
$this->retry = -1;
return $this->lastSavedAt;
}

public function isRetryDisallowed(): bool
public function updateLastSavedAt(DateTimeImmutable $lastSavedAt): void
{
return $this->retry === -1;
$this->lastSavedAt = $lastSavedAt;
}
}
11 changes: 7 additions & 4 deletions src/Projection/Projection/ProjectionError.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@

namespace Patchlevel\EventSourcing\Projection\Projection;

use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext;
use Throwable;

/** @psalm-import-type Context from ErrorContext */
/**
* @psalm-type Trace = array{file?: string, line?: int, function?: string, class?: string, type?: string, args?: array<array-key, mixed>}
* @psalm-type Context = array{class: class-string, message: string, code: int|string, file: string, line: int, trace: list<Trace>}
*/
final class ProjectionError
{
/** @param list<Context>|null $errorContext */
public function __construct(
public readonly string $errorMessage,
public readonly ProjectionStatus $previousStatus,
public readonly array|null $errorContext = null,
) {
}

public static function fromThrowable(Throwable $error): self
public static function fromThrowable(ProjectionStatus $projectionStatus, Throwable $error): self
{
return new self($error->getMessage(), ErrorContext::fromThrowable($error));
return new self($error->getMessage(), $projectionStatus, ThrowableToErrorContextTransformer::transform($error));
}
}
Loading
Loading