Skip to content

Commit

Permalink
add from now feature for projections
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Feb 17, 2024
1 parent 5262c76 commit 56e915c
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 6 deletions.
12 changes: 6 additions & 6 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@
<code><![CDATA[$projectionError->errorContext]]></code>
</PossiblyNullPropertyFetch>
</file>
<file src="src/Projection/Projector/InMemoryProjectorRepository.php">
<InvalidOperand>
<code><![CDATA[$this->projectors]]></code>
</InvalidOperand>
</file>
<file src="src/Projection/Projector/MetadataProjectorResolver.php">
<file src="src/Projection/Projectionist/DefaultProjectionist.php">
<MixedMethodCall>
<code>$method</code>
<code>$method</code>
<code>$method</code>
</MixedMethodCall>
</file>
<file src="src/Projection/Projector/InMemoryProjectorRepository.php">
<InvalidOperand>
<code><![CDATA[$this->projectors]]></code>
</InvalidOperand>
</file>
<file src="src/Repository/DefaultRepository.php">
<PropertyTypeCoercion>
<code>new WeakMap()</code>
Expand Down
1 change: 1 addition & 0 deletions src/Attribute/Projector.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ final class Projector
{
public function __construct(
public readonly string $id,
public readonly bool $fromNow = false,

Check warning on line 14 in src/Attribute/Projector.php

View workflow job for this annotation

GitHub Actions / Mutation tests (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "FalseValue": --- Original +++ New @@ @@ #[Attribute(Attribute::TARGET_CLASS)] final class Projector { - public function __construct(public readonly string $id, public readonly bool $fromNow = false) + public function __construct(public readonly string $id, public readonly bool $fromNow = true) { } }
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public function metadata(string $projector): ProjectorMetadata

$metadata = new ProjectorMetadata(
$projectorInfo->id,
$projectorInfo->fromNow,
$subscribeMethods,
$createMethod,
$dropMethod,
Expand Down
1 change: 1 addition & 0 deletions src/Metadata/Projector/ProjectorMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ final class ProjectorMetadata
{
public function __construct(
public readonly string $id,
public readonly bool $fromNow = false,
/** @var array<class-string|"*", list<string>> */
public readonly array $subscribeMethods = [],
public readonly string|null $setupMethod = null,
Expand Down
46 changes: 46 additions & 0 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
use Psr\Log\LoggerInterface;
use Throwable;

use function array_map;
use function array_merge;
use function sprintf;

final class DefaultProjectionist implements Projectionist
Expand Down Expand Up @@ -109,6 +111,8 @@ public function boot(
}
}

$this->handleFromNowProjections($projections);

$projections = $projections->filterByProjectionStatus(ProjectionStatus::Booting);

if ($projections->count() === 0) {
Expand Down Expand Up @@ -596,6 +600,41 @@ private function handleRetryProjections(ProjectionCollection $projections): void
}
}

private function handleFromNowProjections(ProjectionCollection $projections): void
{
$latestIndex = null;

foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) {
$projector = $this->projector($projection->id());

if (!$projector) {
continue;
}

$metadata = $this->metadataFactory->metadata($projector::class);

if (!$metadata->fromNow) {
continue;
}

if ($latestIndex === null) {
$latestIndex = $this->latestIndex();
}

$projection->changePosition($latestIndex);
$projection->active();
$this->projectionStore->save($projection);

$this->logger?->info(
sprintf(
'Projectionist: Projector "%s" for "%s" is in "from now" mode: skip past messages and set to active.',
$projector::class,
$projection->id(),
),
);
}
}

private function resolveSetupMethod(object $projector): Closure|null
{
$metadata = $this->metadataFactory->metadata($projector::class);
Expand Down Expand Up @@ -641,4 +680,11 @@ private function projectorId(object $projector): string
{
return $this->metadataFactory->metadata($projector::class)->id;
}

private function latestIndex(): int
{
$stream = $this->streamableMessageStore->load(null, 1, null, true);

return $stream->index() ?: 1;
}
}

0 comments on commit 56e915c

Please sign in to comment.