diff --git a/docs/pages/projection.md b/docs/pages/projection.md index 06170be0a..c88e54b60 100644 --- a/docs/pages/projection.md +++ b/docs/pages/projection.md @@ -231,21 +231,60 @@ final class ProfileProjector The default group is `default` and the projectionist takes all groups if none are given to him. -### From Now Mode +### Run Mode -Certain projectors operate exclusively on post-release events, disregarding historical data. -Consider, for instance, the scenario of launching a fresh email service. +The run mode determines how the projector should behave when it is booted. +There are three different modes: + +#### From Beginning + +This is the default mode. +The projector will start from the beginning of the event stream and process all events. + +```php +use Patchlevel\EventSourcing\Attribute\Projector; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; + +#[Projector('welcome_email', runMode: RunMode::FromBeginning)] +final class WelcomeEmailProjector +{ + // ... +} +``` + +#### From Now + +Certain projectors operate exclusively on post-release events, disregarding historical data. +This is useful for projectors that are only interested in events that occur after a certain point in time. +As example, a welcome email projector that only wants to send emails to new users. ```php use Patchlevel\EventSourcing\Attribute\Projector; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; -#[Projector('welcome_email', fromNow: true)] +#[Projector('welcome_email', runMode: RunMode::FromNow)] final class WelcomeEmailProjector { // ... } ``` +#### Once + +This mode is useful for projectors that only need to run once. +This is useful for projectors to create reports or to migrate data. + +```php +use Patchlevel\EventSourcing\Attribute\Projector; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; + +#[Projector('migration', runMode: RunMode::Once)] +final class MigrationProjector +{ + // ... +} +``` + ## Projectionist The projectionist manages individual projectors and keeps the projections running. @@ -287,6 +326,8 @@ stateDiagram-v2 Booting --> Error Active --> Outdated Active --> Error + Active --> Finished + Finished --> Outdated Error --> Active Error --> [*] Outdated --> Active @@ -311,6 +352,11 @@ When the process is finished, the projection is set to active. The active status describes the projections currently being actively managed by the projectionist. These projections have a projector, follow the event stream and should be up-to-date. +### Finished + +A projection is finished if the projector has the mode `RunMode::Once`. +This means that the projection is only run once and then set to finished if it reaches the end of the event stream. + ### Outdated If a projection exists in the projection store diff --git a/src/Attribute/Projector.php b/src/Attribute/Projector.php index ae9f00036..a45a7586a 100644 --- a/src/Attribute/Projector.php +++ b/src/Attribute/Projector.php @@ -5,14 +5,16 @@ namespace Patchlevel\EventSourcing\Attribute; use Attribute; +use Patchlevel\EventSourcing\Projection\Projection\Projection; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; #[Attribute(Attribute::TARGET_CLASS)] final class Projector { public function __construct( public readonly string $id, - public readonly string $group = 'default', - public readonly bool $fromNow = false, + public readonly string $group = Projection::DEFAULT_GROUP, + public readonly RunMode $runMode = RunMode::FromBeginning, ) { } } diff --git a/src/Metadata/Projector/AttributeProjectorMetadataFactory.php b/src/Metadata/Projector/AttributeProjectorMetadataFactory.php index c3e5d4b3c..0a3bcd116 100644 --- a/src/Metadata/Projector/AttributeProjectorMetadataFactory.php +++ b/src/Metadata/Projector/AttributeProjectorMetadataFactory.php @@ -80,7 +80,7 @@ public function metadata(string $projector): ProjectorMetadata $metadata = new ProjectorMetadata( $projectorInfo->id, $projectorInfo->group, - $projectorInfo->fromNow, + $projectorInfo->runMode, $subscribeMethods, $createMethod, $dropMethod, diff --git a/src/Metadata/Projector/ProjectorMetadata.php b/src/Metadata/Projector/ProjectorMetadata.php index b3e629e46..5d776f4cc 100644 --- a/src/Metadata/Projector/ProjectorMetadata.php +++ b/src/Metadata/Projector/ProjectorMetadata.php @@ -4,12 +4,15 @@ namespace Patchlevel\EventSourcing\Metadata\Projector; +use Patchlevel\EventSourcing\Projection\Projection\Projection; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; + final class ProjectorMetadata { public function __construct( public readonly string $id, - public readonly string $group = 'default', - public readonly bool $fromNow = false, + public readonly string $group = Projection::DEFAULT_GROUP, + public readonly RunMode $runMode = RunMode::FromBeginning, /** @var array> */ public readonly array $subscribeMethods = [], public readonly string|null $setupMethod = null, diff --git a/src/Projection/Projection/Projection.php b/src/Projection/Projection/Projection.php index 9773f56d5..1d3078ce0 100644 --- a/src/Projection/Projection/Projection.php +++ b/src/Projection/Projection/Projection.php @@ -11,6 +11,7 @@ final class Projection public function __construct( private readonly string $id, private readonly string $group = self::DEFAULT_GROUP, + private readonly RunMode $runMode = RunMode::FromBeginning, private ProjectionStatus $status = ProjectionStatus::New, private int $position = 0, private ProjectionError|null $error = null, @@ -28,6 +29,11 @@ public function group(): string return $this->group; } + public function runMode(): RunMode + { + return $this->runMode; + } + public function status(): ProjectionStatus { return $this->status; @@ -75,6 +81,17 @@ public function isActive(): bool return $this->status === ProjectionStatus::Active; } + public function finished(): void + { + $this->status = ProjectionStatus::Finished; + $this->error = null; + } + + public function isFinished(): bool + { + return $this->status === ProjectionStatus::Finished; + } + public function outdated(): void { $this->status = ProjectionStatus::Outdated; diff --git a/src/Projection/Projection/ProjectionStatus.php b/src/Projection/Projection/ProjectionStatus.php index 36c47f262..b4b10f792 100644 --- a/src/Projection/Projection/ProjectionStatus.php +++ b/src/Projection/Projection/ProjectionStatus.php @@ -9,6 +9,7 @@ enum ProjectionStatus: string case New = 'new'; case Booting = 'booting'; case Active = 'active'; + case Finished = 'finished'; case Outdated = 'outdated'; case Error = 'error'; } diff --git a/src/Projection/Projection/RunMode.php b/src/Projection/Projection/RunMode.php new file mode 100644 index 000000000..bfb0f6a14 --- /dev/null +++ b/src/Projection/Projection/RunMode.php @@ -0,0 +1,12 @@ + $projection->id(), 'group_name' => $projection->group(), - 'position' => $projection->position(), + 'run_mode' => $projection->runMode()->value, 'status' => $projection->status()->value, + 'position' => $projection->position(), 'error_message' => $projectionError?->errorMessage, 'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null, 'retry' => $projection->retry(), @@ -135,8 +138,9 @@ public function update(Projection $projection): void $this->projectionTable, [ 'group_name' => $projection->group(), - 'position' => $projection->position(), + 'run_mode' => $projection->runMode()->value, 'status' => $projection->status()->value, + 'position' => $projection->position(), 'error_message' => $projectionError?->errorMessage, 'error_context' => $projectionError?->errorContext !== null ? json_encode($projectionError->errorContext, JSON_THROW_ON_ERROR) : null, 'retry' => $projection->retry(), @@ -181,6 +185,9 @@ public function configureSchema(Schema $schema, Connection $connection): void $table->addColumn('group_name', Types::STRING) ->setLength(32) ->setNotnull(true); + $table->addColumn('run_mode', Types::STRING) + ->setLength(16) + ->setNotnull(true); $table->addColumn('position', Types::INTEGER) ->setNotnull(true); $table->addColumn('status', Types::STRING) @@ -209,6 +216,7 @@ private function createProjection(array $row): Projection return new Projection( $row['id'], $row['group_name'], + RunMode::from($row['run_mode']), ProjectionStatus::from($row['status']), $row['position'], $row['error_message'] !== null ? new ProjectionError( diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index bb2f65f4a..003cd66f5 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -8,11 +8,13 @@ use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\Metadata\Projector\AttributeProjectorMetadataFactory; +use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadata; use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadataFactory; use Patchlevel\EventSourcing\Projection\Projection\Projection; use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; use Patchlevel\EventSourcing\Projection\Projection\Store\LockableProjectionStore; use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore; use Patchlevel\EventSourcing\Store\Criteria; @@ -149,6 +151,18 @@ function ($projections) use ($limit, $throwByError): void { continue; } + if ($projection->runMode() === RunMode::Once) { + $projection->finished(); + $this->projectionStore->update($projection); + + $this->logger?->info(sprintf( + 'Projectionist: Projection "%s" run only once and has been set to finished.', + $projection->id(), + )); + + continue; + } + $projection->active(); $this->projectionStore->update($projection); @@ -534,7 +548,7 @@ private function projector(string $projectorId): object|null $this->projectorIndex = []; foreach ($this->projectors as $projector) { - $projectorId = $this->projectorId($projector); + $projectorId = $this->projectorMetadata($projector)->id; $this->projectorIndex[$projectorId] = $projector; } @@ -626,9 +640,7 @@ private function fastForwardFromNowProjections(array $projections): array continue; } - $metadata = $this->metadataFactory->metadata($projector::class); - - if (!$metadata->fromNow) { + if ($projection->runMode() !== RunMode::FromNow) { $forwardedProjections[] = $projection; continue; @@ -727,20 +739,24 @@ private function discoverNewProjections(): void new ProjectionCriteria(), function (array $projections): void { foreach ($this->projectors as $projector) { - $projectorId = $this->projectorId($projector); + $metadata = $this->projectorMetadata($projector); foreach ($projections as $projection) { - if ($projection->id() === $projectorId) { + if ($projection->id() === $metadata->id) { continue 2; } } - $this->projectionStore->add(new Projection($projectorId)); + $this->projectionStore->add(new Projection( + $metadata->id, + $metadata->group, + $metadata->runMode, + )); $this->logger?->info( sprintf( 'Projectionist: New Projector "%s" was found and added to the projection store.', - $projectorId, + $metadata->id, ), ); } @@ -789,9 +805,9 @@ private function resolveSubscribeMethods(object $projector, Message $message): i ); } - private function projectorId(object $projector): string + private function projectorMetadata(object $projector): ProjectorMetadata { - return $this->metadataFactory->metadata($projector::class)->id; + return $this->metadataFactory->metadata($projector::class); } private function latestIndex(): int diff --git a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php index f8c179393..baa663ad3 100644 --- a/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php +++ b/tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php @@ -15,6 +15,7 @@ use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria; use Patchlevel\EventSourcing\Projection\Projection\ProjectionError; use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus; +use Patchlevel\EventSourcing\Projection\Projection\RunMode; use Patchlevel\EventSourcing\Projection\Projection\Store\ErrorContext; use Patchlevel\EventSourcing\Projection\Projection\Store\LockableProjectionStore; use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore; @@ -77,12 +78,27 @@ class { $projectionist->boot(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projectionStore->addedProjections); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), ], $projectionStore->updatedProjections); } @@ -111,9 +127,26 @@ class { $projectionist->boot(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting, 1), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + 1, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), ], $projectionStore->updatedProjections); } @@ -154,13 +187,35 @@ public function handle(Message $message): void $projectionist->boot(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projectionStore->addedProjections); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting, 1), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + 1, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), ], $projectionStore->updatedProjections); self::assertTrue($projector->created); @@ -204,12 +259,28 @@ public function handle(Message $message): void $projectionist->boot(new ProjectionistCriteria(), 1); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projectionStore->addedProjections); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting, 1), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + 1, + ), ], $projectionStore->updatedProjections); self::assertTrue($projector->created); @@ -243,8 +314,19 @@ public function handle(Message $message): void }; $projectionStore = new DummyStore([ - new Projection($projectionId1, Projection::DEFAULT_GROUP, ProjectionStatus::Booting), - new Projection($projectionId2, Projection::DEFAULT_GROUP, ProjectionStatus::Booting, 1), + new Projection( + $projectionId1, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + ), + new Projection( + $projectionId2, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + 1, + ), ]); $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -261,9 +343,27 @@ public function handle(Message $message): void $projectionist->boot(); self::assertEquals([ - new Projection($projectionId1, Projection::DEFAULT_GROUP, ProjectionStatus::Booting, 1), - new Projection($projectionId1, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), - new Projection($projectionId2, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), + new Projection( + $projectionId1, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + 1, + ), + new Projection( + $projectionId1, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), + new Projection( + $projectionId2, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), ], $projectionStore->updatedProjections); self::assertSame($message, $projector1->message); @@ -307,6 +407,7 @@ public function create(): void new Projection( $projectionId, Projection::DEFAULT_GROUP, + RunMode::FromBeginning, ProjectionStatus::Error, 0, new ProjectionError('ERROR', ErrorContext::fromThrowable($projector->exception)), @@ -332,7 +433,14 @@ public function handle(Message $message): void } }; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + ), + ]); $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -349,9 +457,27 @@ public function handle(Message $message): void $projectionist->boot(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting, 1), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting, 3), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 3), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + 1, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + 3, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 3, + ), ], $projectionStore->updatedProjections); self::assertSame([$message1, $message2], $projector->messages); @@ -360,7 +486,7 @@ public function handle(Message $message): void public function testBootingWithFromNow(): void { $projectionId = 'test'; - $projector = new #[ProjectionAttribute('test', fromNow: true)] + $projector = new #[ProjectionAttribute('test', runMode: RunMode::FromNow)] class { public Message|null $message = null; @@ -371,7 +497,14 @@ public function handle(Message $message): void } }; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromNow, + ProjectionStatus::Booting, + ), + ]); $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -387,12 +520,74 @@ public function handle(Message $message): void $projectionist->boot(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromNow, + ProjectionStatus::Active, + 1, + ), ], $projectionStore->updatedProjections); self::assertNull($projector->message); } + public function testBootingWithOnlyOnce(): void + { + $projectionId = 'test'; + $projector = new #[ProjectionAttribute('test', runMode: RunMode::Once)] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + $this->message = $message; + } + }; + + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::Once, + ProjectionStatus::Booting, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $projectionist = new DefaultProjectionist( + $streamableStore->reveal(), + $projectionStore, + [$projector], + ); + + $projectionist->boot(); + + self::assertEquals([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::Once, + ProjectionStatus::Booting, + 1, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::Once, + ProjectionStatus::Finished, + 1, + ), + ], $projectionStore->updatedProjections); + + self::assertEquals($message1, $projector->message); + } + public function testRunDiscoverNewProjectors(): void { $projectionId = 'test'; @@ -412,7 +607,12 @@ class { $projectionist->run(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projectionStore->addedProjections); } @@ -430,7 +630,14 @@ public function handle(Message $message): void } }; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + ]); $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -446,7 +653,13 @@ public function handle(Message $message): void $projectionist->run(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), ], $projectionStore->updatedProjections); self::assertSame($message, $projector->message); @@ -466,7 +679,14 @@ public function handle(Message $message): void } }; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + ]); $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -486,7 +706,13 @@ public function handle(Message $message): void $projectionist->run(new ProjectionistCriteria(), 1); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), ], $projectionStore->updatedProjections); self::assertSame($message1, $projector->message); @@ -519,8 +745,19 @@ public function handle(Message $message): void }; $projectionStore = new DummyStore([ - new Projection($projectionId1, Projection::DEFAULT_GROUP, ProjectionStatus::Active), - new Projection($projectionId2, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), + new Projection( + $projectionId1, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + new Projection( + $projectionId2, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), ]); $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -537,7 +774,13 @@ public function handle(Message $message): void $projectionist->run(); self::assertEquals([ - new Projection($projectionId1, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), + new Projection( + $projectionId1, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), ], $projectionStore->updatedProjections); self::assertSame($message, $projector1->message); @@ -561,7 +804,14 @@ public function handle(Message $message): void } }; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + ]); $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -581,6 +831,7 @@ public function handle(Message $message): void new Projection( $projectionId, Projection::DEFAULT_GROUP, + RunMode::FromBeginning, ProjectionStatus::Error, 0, new ProjectionError('ERROR', ErrorContext::fromThrowable($projector->exception)), @@ -595,7 +846,14 @@ public function testRunningMarkOutdated(): void { $projectionId = 'test'; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + ]); $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->shouldNotBeCalled(); @@ -609,7 +867,13 @@ public function testRunningMarkOutdated(): void $projectionist->run(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated, 0), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + 0, + ), ], $projectionStore->updatedProjections); } @@ -617,7 +881,14 @@ public function testRunningWithoutActiveProjectors(): void { $projectionId = 'test'; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Booting)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Booting, + ), + ]); $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->shouldNotBeCalled(); @@ -648,7 +919,14 @@ public function handle(Message $message): void } }; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + ), + ]); $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $message2 = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -665,8 +943,20 @@ public function handle(Message $message): void $projectionist->run(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 1), - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 3), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 1, + ), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 3, + ), ], $projectionStore->updatedProjections); self::assertSame([$message1, $message2], $projector->messages); @@ -691,7 +981,12 @@ class { $projectionist->teardown(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projectionStore->addedProjections); } @@ -702,7 +997,12 @@ public function testTeardownWithoutTeardownMethod(): void class { }; - $projection = new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated); + $projection = new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ); $projectionStore = new DummyStore([$projection]); @@ -735,7 +1035,12 @@ public function drop(): void } }; - $projection = new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated); + $projection = new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ); $projectionStore = new DummyStore([$projection]); @@ -769,7 +1074,14 @@ public function drop(): void } }; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ), + ]); $streamableStore = $this->prophesize(Store::class); @@ -789,7 +1101,14 @@ public function testTeardownWithoutProjector(): void { $projectorId = 'test'; - $projectionStore = new DummyStore([new Projection($projectorId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated)]); + $projectionStore = new DummyStore([ + new Projection( + $projectorId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ), + ]); $streamableStore = $this->prophesize(Store::class); @@ -824,7 +1143,12 @@ class { $projectionist->remove(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projectionStore->addedProjections); } @@ -842,7 +1166,12 @@ public function drop(): void } }; - $projection = new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated); + $projection = new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ); $projectionStore = new DummyStore([$projection]); $streamableStore = $this->prophesize(Store::class); @@ -867,7 +1196,12 @@ public function testRemoveWithoutDropMethod(): void class { }; - $projection = new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated); + $projection = new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ); $projectionStore = new DummyStore([$projection]); $streamableStore = $this->prophesize(Store::class); @@ -898,7 +1232,12 @@ public function drop(): void } }; - $projection = new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated); + $projection = new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ); $projectionStore = new DummyStore([$projection]); $streamableStore = $this->prophesize(Store::class); @@ -919,7 +1258,12 @@ public function testRemoveWithoutProjector(): void { $projectorId = 'test'; - $projection = new Projection($projectorId, Projection::DEFAULT_GROUP, ProjectionStatus::Outdated); + $projection = new Projection( + $projectorId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Outdated, + ); $projectionStore = new DummyStore([$projection]); $streamableStore = $this->prophesize(Store::class); @@ -955,7 +1299,12 @@ class { $projectionist->reactivate(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projectionStore->addedProjections); } @@ -966,7 +1315,14 @@ public function testReactivate(): void class { }; - $projectionStore = new DummyStore([new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Error)]); + $projectionStore = new DummyStore([ + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Error, + ), + ]); $streamableStore = $this->prophesize(Store::class); @@ -979,7 +1335,13 @@ class { $projectionist->reactivate(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::Active, 0), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::Active, + 0, + ), ], $projectionStore->updatedProjections); } @@ -1002,11 +1364,21 @@ class { $projections = $projectionist->projections(); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projectionStore->addedProjections); self::assertEquals([ - new Projection($projectionId, Projection::DEFAULT_GROUP, ProjectionStatus::New), + new Projection( + $projectionId, + Projection::DEFAULT_GROUP, + RunMode::FromBeginning, + ProjectionStatus::New, + ), ], $projections); } @@ -1056,7 +1428,7 @@ class { $projectionStore = $this->prophesize(LockableProjectionStore::class); $projectionStore->inLock(Argument::type(Closure::class))->will( - /** @param array{Closure} $args */ + /** @param array{Closure} $args */ static fn (array $args): mixed => $args[0]() )->shouldBeCalled(); $projectionStore->find(Argument::any())->willReturn([])->shouldBeCalled();