Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"from now" mode for projections #500

Merged
merged 4 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@
<code><![CDATA[$projectionError->errorContext]]></code>
</PossiblyNullPropertyFetch>
</file>
<file src="src/Projection/Projector/InMemoryProjectorRepository.php">
<InvalidOperand>
<code><![CDATA[$this->projectors]]></code>
</InvalidOperand>
</file>
<file src="src/Projection/Projector/MetadataProjectorResolver.php">
<file src="src/Projection/Projectionist/DefaultProjectionist.php">
<MixedMethodCall>
<code>$method</code>
<code>$method</code>
<code>$method</code>
</MixedMethodCall>
</file>
<file src="src/Projection/Projector/InMemoryProjectorRepository.php">
<InvalidOperand>
<code><![CDATA[$this->projectors]]></code>
</InvalidOperand>
</file>
<file src="src/Repository/DefaultRepository.php">
<PropertyTypeCoercion>
<code>new WeakMap()</code>
Expand Down
23 changes: 11 additions & 12 deletions docs/pages/projection.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,7 @@ Otherwise the projectionist will not recognize that the projection has changed a
To do this, you can add a version to the `projectorId`:

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\EventBus\Message;

#[Projector('profile_1')]
final class ProfileProjector
Expand All @@ -140,16 +135,20 @@ final class ProfileProjector

You can also use the `ProjectorUtil` to build the table/collection name.

## Projector Repository
## From Now

The projector repository is responsible for managing the projectors.
Certain projectors operate exclusively on post-release events, disregarding historical data.
Consider, for instance, the scenario of launching a fresh email service.
Its primary function is to dispatch welcome emails to newly registered users triggered by a `ProfileCreated` event.

```php
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Attribute\Projector;

$projectorRepository = new InMemoryProjectorRepository([
new ProfileProjection($connection)
]);
#[Projector('profile_1', fromNow: true)]
final class WelcomeEmailProjector
{
// ...
}
```

## Projectionist
Expand Down Expand Up @@ -305,7 +304,7 @@ use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
$projectionist = new DefaultProjectionist(
$eventStore,
$projectionStore,
$projectorRepository
[$projector1, $projector2, $projector3]
);
```

Expand Down
5 changes: 0 additions & 5 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ parameters:
count: 1
path: src/Projection/Projection/Store/DoctrineStore.php

-
message: "#^Method Patchlevel\\\\EventSourcing\\\\Projection\\\\Projector\\\\InMemoryProjectorRepository\\:\\:projectors\\(\\) should return array\\<int, object\\> but returns array\\<int\\|string, object\\>\\.$#"
count: 1
path: src/Projection/Projector/InMemoryProjectorRepository.php

-
message: "#^Parameter \\#2 \\$data of method Patchlevel\\\\Hydrator\\\\Hydrator\\:\\:hydrate\\(\\) expects array\\<string, mixed\\>, mixed given\\.$#"
count: 1
Expand Down
1 change: 1 addition & 0 deletions src/Attribute/Projector.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
{
public function __construct(
public readonly string $id,
public readonly bool $fromNow = false,

Check warning on line 14 in src/Attribute/Projector.php

View workflow job for this annotation

GitHub Actions / Mutation tests (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "FalseValue": --- Original +++ New @@ @@ #[Attribute(Attribute::TARGET_CLASS)] final class Projector { - public function __construct(public readonly string $id, public readonly bool $fromNow = false) + public function __construct(public readonly string $id, public readonly bool $fromNow = true) { } }
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public function metadata(string $projector): ProjectorMetadata

$metadata = new ProjectorMetadata(
$projectorInfo->id,
$projectorInfo->fromNow,
$subscribeMethods,
$createMethod,
$dropMethod,
Expand Down
1 change: 1 addition & 0 deletions src/Metadata/Projector/ProjectorMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ final class ProjectorMetadata
{
public function __construct(
public readonly string $id,
public readonly bool $fromNow = false,
/** @var array<class-string|"*", list<string>> */
public readonly array $subscribeMethods = [],
public readonly string|null $setupMethod = null,
Expand Down
139 changes: 112 additions & 27 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,39 @@

namespace Patchlevel\EventSourcing\Projection\Projectionist;

use Closure;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\Projector\AttributeProjectorMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadataFactory;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCollection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore;
use Patchlevel\EventSourcing\Projection\Projector\MetadataProjectorResolver;
use Patchlevel\EventSourcing\Projection\Projector\ProjectorRepository;
use Patchlevel\EventSourcing\Projection\Projector\ProjectorResolver;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Psr\Log\LoggerInterface;
use Throwable;

use function array_map;
use function array_merge;
use function sprintf;

final class DefaultProjectionist implements Projectionist
{
private const RETRY_LIMIT = 5;

/** @var array<string, object>|null */
private array|null $projectors = null;
private array|null $projectorIndex = null;

/** @param iterable<object> $projectors */
public function __construct(
private readonly Store $streamableMessageStore,
private readonly ProjectionStore $projectionStore,
private readonly ProjectorRepository $projectorRepository,
private readonly ProjectorResolver $projectorResolver = new MetadataProjectorResolver(),
private readonly iterable $projectors,
private readonly ProjectorMetadataFactory $metadataFactory = new AttributeProjectorMetadataFactory(),
private readonly LoggerInterface|null $logger = null,
) {
}
Expand Down Expand Up @@ -66,7 +70,7 @@ public function boot(
$projection->id(),
));

$setupMethod = $this->projectorResolver->resolveSetupMethod($projector);
$setupMethod = $this->resolveSetupMethod($projector);

if (!$setupMethod) {
$this->logger?->debug(sprintf(
Expand Down Expand Up @@ -107,6 +111,8 @@ public function boot(
}
}

$this->handleFromNowProjections($projections);

$projections = $projections->filterByProjectionStatus(ProjectionStatus::Booting);

if ($projections->count() === 0) {
Expand Down Expand Up @@ -306,7 +312,7 @@ public function teardown(ProjectionCriteria $criteria = new ProjectionCriteria()
continue;
}

$teardownMethod = $this->projectorResolver->resolveTeardownMethod($projector);
$teardownMethod = $this->resolveTeardownMethod($projector);

if (!$teardownMethod) {
$this->projectionStore->remove($projection->id());
Expand Down Expand Up @@ -372,7 +378,7 @@ public function remove(ProjectionCriteria $criteria = new ProjectionCriteria()):
continue;
}

$teardownMethod = $this->projectorResolver->resolveTeardownMethod($projector);
$teardownMethod = $this->resolveTeardownMethod($projector);

if (!$teardownMethod) {
$this->projectionStore->remove($projection->id());
Expand Down Expand Up @@ -436,10 +442,9 @@ public function reactivate(ProjectionCriteria $criteria = new ProjectionCriteria
public function projections(): ProjectionCollection
{
$projections = $this->projectionStore->all();
$projectors = $this->projectors();

foreach ($projectors as $projector) {
$projectorId = $this->projectorResolver->projectorId($projector);
foreach ($this->projectors as $projector) {
$projectorId = $this->projectorId($projector);

if ($projections->has($projectorId)) {
continue;
Expand All @@ -459,7 +464,7 @@ private function handleMessage(int $index, Message $message, Projection $project
throw ProjectorNotFound::forProjectionId($projection->id());
}

$subscribeMethods = $this->projectorResolver->resolveSubscribeMethods($projector, $message);
$subscribeMethods = $this->resolveSubscribeMethods($projector, $message);

if ($subscribeMethods === []) {
$projection->changePosition($index);
Expand Down Expand Up @@ -523,25 +528,17 @@ private function handleMessage(int $index, Message $message, Projection $project

private function projector(string $projectorId): object|null
{
$projectors = $this->projectors();

return $projectors[$projectorId] ?? null;
}

/** @return array<string, object> */
private function projectors(): array
{
if ($this->projectors === null) {
$this->projectors = [];
if ($this->projectorIndex === null) {
$this->projectorIndex = [];

foreach ($this->projectorRepository->projectors() as $projector) {
$projectorId = $this->projectorResolver->projectorId($projector);
foreach ($this->projectors as $projector) {
$projectorId = $this->projectorId($projector);

$this->projectors[$projectorId] = $projector;
$this->projectorIndex[$projectorId] = $projector;
}
}

return $this->projectors;
return $this->projectorIndex[$projectorId] ?? null;
}

private function handleOutdatedProjections(ProjectionCollection $projections): void
Expand Down Expand Up @@ -593,4 +590,92 @@ private function handleRetryProjections(ProjectionCollection $projections): void
);
}
}

private function handleFromNowProjections(ProjectionCollection $projections): void
DanielBadura marked this conversation as resolved.
Show resolved Hide resolved
{
$latestIndex = null;

foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) {
$projector = $this->projector($projection->id());

if (!$projector) {
continue;
}

$metadata = $this->metadataFactory->metadata($projector::class);

if (!$metadata->fromNow) {
continue;
}
DanielBadura marked this conversation as resolved.
Show resolved Hide resolved

if ($latestIndex === null) {
$latestIndex = $this->latestIndex();
}

$projection->changePosition($latestIndex);
$projection->active();
$this->projectionStore->save($projection);

$this->logger?->info(
sprintf(
'Projectionist: Projector "%s" for "%s" is in "from now" mode: skip past messages and set to active.',
$projector::class,
$projection->id(),
),
);
}
}

private function resolveSetupMethod(object $projector): Closure|null
{
$metadata = $this->metadataFactory->metadata($projector::class);
$method = $metadata->setupMethod;

if ($method === null) {
return null;
}

return $projector->$method(...);
}

private function resolveTeardownMethod(object $projector): Closure|null
{
$metadata = $this->metadataFactory->metadata($projector::class);
$method = $metadata->teardownMethod;

if ($method === null) {
return null;
}

return $projector->$method(...);
}

/** @return iterable<Closure> */
private function resolveSubscribeMethods(object $projector, Message $message): iterable
{
$event = $message->event();
$metadata = $this->metadataFactory->metadata($projector::class);

$methods = array_merge(
$metadata->subscribeMethods[$event::class] ?? [],
$metadata->subscribeMethods[Subscribe::ALL] ?? [],
);

return array_map(
static fn (string $method) => $projector->$method(...),
$methods,
);
}

private function projectorId(object $projector): string
{
return $this->metadataFactory->metadata($projector::class)->id;
}

private function latestIndex(): int
{
$stream = $this->streamableMessageStore->load(null, 1, null, true);

return $stream->index() ?: 1;
}
}
20 changes: 0 additions & 20 deletions src/Projection/Projector/InMemoryProjectorRepository.php

This file was deleted.

Loading
Loading