Skip to content

Commit

Permalink
add new run mode: once
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Feb 19, 2024
1 parent 2c4b5be commit c8b62c6
Show file tree
Hide file tree
Showing 10 changed files with 554 additions and 77 deletions.
54 changes: 50 additions & 4 deletions docs/pages/projection.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,60 @@ final class ProfileProjector

The default group is `default` and the projectionist takes all groups if none are given to him.

### From Now Mode
### Run Mode

Certain projectors operate exclusively on post-release events, disregarding historical data.
Consider, for instance, the scenario of launching a fresh email service.
The run mode determines how the projector should behave when it is booted.
There are three different modes:

#### From Beginning

This is the default mode.
The projector will start from the beginning of the event stream and process all events.

```php
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;

#[Projector('welcome_email', runMode: RunMode::FromBeginning)]
final class WelcomeEmailProjector
{
// ...
}
```

#### From Now

Certain projectors operate exclusively on post-release events, disregarding historical data.
This is useful for projectors that are only interested in events that occur after a certain point in time.
As example, a welcome email projector that only wants to send emails to new users.

```php
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;

#[Projector('welcome_email', fromNow: true)]
#[Projector('welcome_email', runMode: RunMode::FromNow)]
final class WelcomeEmailProjector
{
// ...
}
```

#### Once

This mode is useful for projectors that only need to run once.
This is useful for projectors to create reports or to migrate data.

```php
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;

#[Projector('migration', runMode: RunMode::Once)]
final class MigrationProjector
{
// ...
}
```

## Projectionist

The projectionist manages individual projectors and keeps the projections running.
Expand Down Expand Up @@ -287,6 +326,8 @@ stateDiagram-v2
Booting --> Error
Active --> Outdated
Active --> Error
Active --> Finished
Finished --> Outdated
Error --> Active
Error --> [*]
Outdated --> Active
Expand All @@ -311,6 +352,11 @@ When the process is finished, the projection is set to active.
The active status describes the projections currently being actively managed by the projectionist.
These projections have a projector, follow the event stream and should be up-to-date.

### Finished

A projection is finished if the projector has the mode `RunMode::Once`.
This means that the projection is only run once and then set to finished if it reaches the end of the event stream.

### Outdated

If a projection exists in the projection store
Expand Down
6 changes: 4 additions & 2 deletions src/Attribute/Projector.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
namespace Patchlevel\EventSourcing\Attribute;

use Attribute;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;

#[Attribute(Attribute::TARGET_CLASS)]
final class Projector
{
public function __construct(
public readonly string $id,
public readonly string $group = 'default',
public readonly bool $fromNow = false,
public readonly string $group = Projection::DEFAULT_GROUP,
public readonly RunMode $runMode = RunMode::FromBeginning,
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public function metadata(string $projector): ProjectorMetadata
$metadata = new ProjectorMetadata(
$projectorInfo->id,
$projectorInfo->group,
$projectorInfo->fromNow,
$projectorInfo->runMode,
$subscribeMethods,
$createMethod,
$dropMethod,
Expand Down
7 changes: 5 additions & 2 deletions src/Metadata/Projector/ProjectorMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

namespace Patchlevel\EventSourcing\Metadata\Projector;

use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;

final class ProjectorMetadata
{
public function __construct(
public readonly string $id,
public readonly string $group = 'default',
public readonly bool $fromNow = false,
public readonly string $group = Projection::DEFAULT_GROUP,
public readonly RunMode $runMode = RunMode::FromBeginning,
/** @var array<class-string|"*", list<string>> */
public readonly array $subscribeMethods = [],
public readonly string|null $setupMethod = null,
Expand Down
17 changes: 17 additions & 0 deletions src/Projection/Projection/Projection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ final class Projection
public function __construct(
private readonly string $id,
private readonly string $group = self::DEFAULT_GROUP,
private readonly RunMode $runMode = RunMode::FromBeginning,
private ProjectionStatus $status = ProjectionStatus::New,
private int $position = 0,
private ProjectionError|null $error = null,
Expand All @@ -28,6 +29,11 @@ public function group(): string
return $this->group;
}

public function runMode(): RunMode
{
return $this->runMode;
}

public function status(): ProjectionStatus
{
return $this->status;
Expand Down Expand Up @@ -75,6 +81,17 @@ public function isActive(): bool
return $this->status === ProjectionStatus::Active;
}

public function finished(): void
{
$this->status = ProjectionStatus::Finished;
$this->error = null;
}

public function isFinished(): bool
{
return $this->status === ProjectionStatus::Finished;
}

public function outdated(): void
{
$this->status = ProjectionStatus::Outdated;
Expand Down
1 change: 1 addition & 0 deletions src/Projection/Projection/ProjectionStatus.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ enum ProjectionStatus: string
case New = 'new';
case Booting = 'booting';
case Active = 'active';
case Finished = 'finished';
case Outdated = 'outdated';
case Error = 'error';
}
12 changes: 12 additions & 0 deletions src/Projection/Projection/RunMode.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projection;

enum RunMode: string
{
case FromBeginning = 'from_beginning';
case FromNow = 'from_now';
case Once = 'once';
}
12 changes: 10 additions & 2 deletions src/Projection/Projection/Store/DoctrineStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;
use Patchlevel\EventSourcing\Schema\SchemaConfigurator;

use function array_map;
Expand All @@ -27,6 +28,7 @@
/** @psalm-type Data = array{
* id: string,
* group_name: string,
* run_mode: string,
* position: int,
* status: string,
* error_message: string|null,
Expand Down Expand Up @@ -118,8 +120,9 @@ public function add(Projection $projection): void
[
'id' => $projection->id(),
'group_name' => $projection->group(),
'position' => $projection->position(),
'run_mode' => $projection->runMode()->value,
'status' => $projection->status()->value,
'position' => $projection->position(),
'error_message' => $projectionError?->errorMessage,
'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null,
'retry' => $projection->retry(),
Expand All @@ -135,8 +138,9 @@ public function update(Projection $projection): void
$this->projectionTable,
[
'group_name' => $projection->group(),
'position' => $projection->position(),
'run_mode' => $projection->runMode()->value,
'status' => $projection->status()->value,
'position' => $projection->position(),
'error_message' => $projectionError?->errorMessage,
'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null,
'retry' => $projection->retry(),
Expand Down Expand Up @@ -181,6 +185,9 @@ public function configureSchema(Schema $schema, Connection $connection): void
$table->addColumn('group_name', Types::STRING)
->setLength(32)
->setNotnull(true);
$table->addColumn('run_mode', Types::STRING)
->setLength(16)
->setNotnull(true);
$table->addColumn('position', Types::INTEGER)
->setNotnull(true);
$table->addColumn('status', Types::STRING)
Expand Down Expand Up @@ -209,6 +216,7 @@ private function createProjection(array $row): Projection
return new Projection(
$row['id'],
$row['group_name'],
RunMode::from($row['run_mode']),
ProjectionStatus::from($row['status']),
$row['position'],
$row['error_message'] !== null ? new ProjectionError(
Expand Down
36 changes: 26 additions & 10 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\Projector\AttributeProjectorMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadata;
use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadataFactory;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;
use Patchlevel\EventSourcing\Projection\Projection\Store\LockableProjectionStore;
use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore;
use Patchlevel\EventSourcing\Store\Criteria;
Expand Down Expand Up @@ -149,6 +151,18 @@ function ($projections) use ($limit, $throwByError): void {
continue;
}

if ($projection->runMode() === RunMode::Once) {
$projection->finished();
$this->projectionStore->update($projection);

$this->logger?->info(sprintf(
'Projectionist: Projection "%s" run only once and has been set to finished.',
$projection->id(),
));

continue;
}

$projection->active();
$this->projectionStore->update($projection);

Expand Down Expand Up @@ -534,7 +548,7 @@ private function projector(string $projectorId): object|null
$this->projectorIndex = [];

foreach ($this->projectors as $projector) {
$projectorId = $this->projectorId($projector);
$projectorId = $this->projectorMetadata($projector)->id;

$this->projectorIndex[$projectorId] = $projector;
}
Expand Down Expand Up @@ -626,9 +640,7 @@ private function fastForwardFromNowProjections(array $projections): array
continue;
}

$metadata = $this->metadataFactory->metadata($projector::class);

if (!$metadata->fromNow) {
if ($projection->runMode() !== RunMode::FromNow) {
$forwardedProjections[] = $projection;

continue;
Expand Down Expand Up @@ -727,20 +739,24 @@ private function discoverNewProjections(): void
new ProjectionCriteria(),
function (array $projections): void {
foreach ($this->projectors as $projector) {
$projectorId = $this->projectorId($projector);
$metadata = $this->projectorMetadata($projector);

foreach ($projections as $projection) {
if ($projection->id() === $projectorId) {
if ($projection->id() === $metadata->id) {
continue 2;
}
}

$this->projectionStore->add(new Projection($projectorId));
$this->projectionStore->add(new Projection(
$metadata->id,
$metadata->group,
$metadata->runMode,
));

$this->logger?->info(
sprintf(
'Projectionist: New Projector "%s" was found and added to the projection store.',
$projectorId,
$metadata->id,
),
);
}
Expand Down Expand Up @@ -789,9 +805,9 @@ private function resolveSubscribeMethods(object $projector, Message $message): i
);
}

private function projectorId(object $projector): string
private function projectorMetadata(object $projector): ProjectorMetadata
{
return $this->metadataFactory->metadata($projector::class)->id;
return $this->metadataFactory->metadata($projector::class);
}

private function latestIndex(): int
Expand Down
Loading

0 comments on commit c8b62c6

Please sign in to comment.