Skip to content

Commit

Permalink
allow subscribe all in projectors
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Feb 17, 2024
1 parent da33974 commit 8583d29
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 123 deletions.
2 changes: 1 addition & 1 deletion baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<MixedMethodCall>
<code>$method</code>
<code>$method</code>
<code>$subscribeMethod</code>
<code>$method</code>
</MixedMethodCall>
</file>
<file src="src/Repository/DefaultRepository.php">
Expand Down
18 changes: 1 addition & 17 deletions src/Metadata/Projector/AttributeProjectorMetadataFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,7 @@ public function metadata(string $projector): ProjectorMetadata
$instance = $attribute->newInstance();
$eventClass = $instance->eventClass;

if ($eventClass === Subscribe::ALL) {
throw new SubscribeAllNotSupported(
$projector,
$method->getName(),
);
}

if (array_key_exists($eventClass, $subscribeMethods)) {
throw new DuplicateSubscribeMethod(
$projector,
$eventClass,
$subscribeMethods[$eventClass],
$method->getName(),
);
}

$subscribeMethods[$eventClass] = $method->getName();
$subscribeMethods[$eventClass][] = $method->getName();
}

if ($method->getAttributes(Setup::class)) {
Expand Down
29 changes: 0 additions & 29 deletions src/Metadata/Projector/DuplicateSubscribeMethod.php

This file was deleted.

2 changes: 1 addition & 1 deletion src/Metadata/Projector/ProjectorMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ final class ProjectorMetadata
public function __construct(
public readonly string $name,
public readonly int $version,
/** @var array<class-string, string> */
/** @var array<class-string|"*", list<string>> */
public readonly array $subscribeMethods = [],
public readonly string|null $setupMethod = null,
public readonly string|null $teardownMethod = null,
Expand Down
24 changes: 0 additions & 24 deletions src/Metadata/Projector/SubscribeAllNotSupported.php

This file was deleted.

8 changes: 3 additions & 5 deletions src/Pipeline/Target/ProjectorRepositoryTarget.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ public function save(Message ...$messages): void

foreach ($messages as $message) {
foreach ($projectors as $projector) {
$subscribeMethod = $this->projectorResolver->resolveSubscribeMethod($projector, $message);
$subscribeMethods = $this->projectorResolver->resolveSubscribeMethods($projector, $message);

if (!$subscribeMethod) {
continue;
foreach ($subscribeMethods as $subscribeMethod) {
$subscribeMethod($message);
}

$subscribeMethod($message);
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/Pipeline/Target/ProjectorTarget.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ public function __construct(
public function save(Message ...$messages): void
{
foreach ($messages as $message) {
$subscribeMethod = $this->projectorResolver->resolveSubscribeMethod($this->projector, $message);
$subscribeMethods = $this->projectorResolver->resolveSubscribeMethods($this->projector, $message);

if (!$subscribeMethod) {
continue;
foreach ($subscribeMethods as $subscribeMethod) {
$subscribeMethod($message);
}

$subscribeMethod($message);
}
}
}
10 changes: 6 additions & 4 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -461,15 +461,15 @@ private function handleMessage(int $index, Message $message, Projection $project
throw ProjectorNotFound::forProjectionId($projection->id());
}

$subscribeMethod = $this->projectorResolver->resolveSubscribeMethod($projector, $message);
$subscribeMethods = $this->projectorResolver->resolveSubscribeMethods($projector, $message);

if (!$subscribeMethod) {
if ($subscribeMethods === []) {
$projection->changePosition($index);
$this->projectionStore->save($projection);

$this->logger?->debug(
sprintf(
'Projectionist: Projector "%s" for "%s" has no subscribe method for "%s", continue.',
'Projectionist: Projector "%s" for "%s" has no subscribe methods for "%s", continue.',
$projector::class,
$projection->id()->toString(),
$message->event()::class,
Expand All @@ -480,7 +480,9 @@ private function handleMessage(int $index, Message $message, Projection $project
}

try {
$subscribeMethod($message);
foreach ($subscribeMethods as $subscribeMethod) {
$subscribeMethod($message);
}
} catch (Throwable $e) {
$this->logger?->error(
sprintf(
Expand Down
21 changes: 13 additions & 8 deletions src/Projection/Projector/MetadataProjectorResolver.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
namespace Patchlevel\EventSourcing\Projection\Projector;

use Closure;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\Projector\AttributeProjectorMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadataFactory;

use function array_key_exists;
use function array_map;
use function array_merge;

final class MetadataProjectorResolver implements ProjectorResolver
{
Expand Down Expand Up @@ -42,18 +44,21 @@ public function resolveTeardownMethod(object $projector): Closure|null
return $projector->$method(...);
}

public function resolveSubscribeMethod(object $projector, Message $message): Closure|null
/** @return iterable<Closure> */
public function resolveSubscribeMethods(object $projector, Message $message): iterable
{
$event = $message->event();
$metadata = $this->metadataFactory->metadata($projector::class);

if (!array_key_exists($event::class, $metadata->subscribeMethods)) {
return null;
}

$subscribeMethod = $metadata->subscribeMethods[$event::class];
$methods = array_merge(
$metadata->subscribeMethods[$event::class] ?? [],
$metadata->subscribeMethods[Subscribe::ALL] ?? [],
);

return $projector->$subscribeMethod(...);
return array_map(
static fn (string $method) => $projector->$method(...),
$methods,
);
}

public function projectorId(object $projector): ProjectorId
Expand Down
3 changes: 2 additions & 1 deletion src/Projection/Projector/ProjectorResolver.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public function resolveSetupMethod(object $projector): Closure|null;

public function resolveTeardownMethod(object $projector): Closure|null;

public function resolveSubscribeMethod(object $projector, Message $message): Closure|null;
/** @return iterable<Closure> */
public function resolveSubscribeMethods(object $projector, Message $message): iterable;

public function projectorId(object $projector): ProjectorId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public function drop(): void
$metadata = $metadataFactory->metadata($projection::class);

self::assertEquals(
[ProfileVisited::class => 'handle'],
[ProfileVisited::class => ['handle']],
$metadata->subscribeMethods,
);

Expand All @@ -94,8 +94,29 @@ public function handle(): void

self::assertEquals(
[
ProfileVisited::class => 'handle',
ProfileCreated::class => 'handle',
ProfileVisited::class => ['handle'],
ProfileCreated::class => ['handle'],
],
$metadata->subscribeMethods,
);
}

public function testSubscribeAll(): void
{
$projection = new #[Projector('foo', 1)]
class {
#[Subscribe(Subscribe::ALL)]
public function handle(): void
{
}
};

$metadataFactory = new AttributeProjectorMetadataFactory();
$metadata = $metadataFactory->metadata($projection::class);

self::assertEquals(
[
'*' => ['handle'],
],
$metadata->subscribeMethods,
);
Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/Pipeline/Target/ProjectorRepositoryTargetTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function __invoke(Message $message): void
$projectorRepository->projectors()->shouldBeCalledOnce()->willReturn([$projector]);

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message)->shouldBeCalledOnce()->willReturn($projector(...));
$projectorResolver->resolveSubscribeMethods($projector, $message)->shouldBeCalledOnce()->willReturn([$projector(...)]);

$projectorRepositoryTarget = new ProjectorRepositoryTarget($projectorRepository->reveal(), $projectorResolver->reveal());
$projectorRepositoryTarget->save($message);
Expand All @@ -65,7 +65,7 @@ public function __invoke(Message $message): void
$projectorRepository->projectors()->shouldBeCalledOnce()->willReturn([$projector]);

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message)->shouldBeCalledOnce()->willReturn(null);
$projectorResolver->resolveSubscribeMethods($projector, $message)->shouldBeCalledOnce()->willReturn([]);

$projectorRepositoryTarget = new ProjectorRepositoryTarget($projectorRepository->reveal(), $projectorResolver->reveal());
$projectorRepositoryTarget->save($message);
Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/Pipeline/Target/ProjectorTargetTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function __invoke(Message $message): void
};

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message)->shouldBeCalledOnce()->willReturn($projector(...));
$projectorResolver->resolveSubscribeMethods($projector, $message)->shouldBeCalledOnce()->willReturn([$projector(...)]);

$projectorTarget = new ProjectorTarget($projector, $projectorResolver->reveal());
$projectorTarget->save($message);
Expand All @@ -58,7 +58,7 @@ public function __invoke(Message $message): void
};

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message)->shouldBeCalledOnce()->willReturn(null);
$projectorResolver->resolveSubscribeMethods($projector, $message)->shouldBeCalledOnce()->willReturn([]);

$projectorTarget = new ProjectorTarget($projector, $projectorResolver->reveal());
$projectorTarget->save($message);
Expand Down
24 changes: 12 additions & 12 deletions tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class {
$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->projectorId($projector)->willReturn($projectorId);
$projectorResolver->resolveSetupMethod($projector)->willReturn(null);
$projectorResolver->resolveSubscribeMethod($projector, $message)->willReturn(null);
$projectorResolver->resolveSubscribeMethods($projector, $message)->willReturn([]);

$projectionist = new DefaultProjectionist(
$streamableStore->reveal(),
Expand Down Expand Up @@ -131,7 +131,7 @@ public function handle(Message $message): void

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSetupMethod($projector)->willReturn($projector->create(...));
$projectorResolver->resolveSubscribeMethod($projector, $message)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethods($projector, $message)->willReturn([$projector->handle(...)]);
$projectorResolver->projectorId($projector)->willReturn($projectorId);

$projectionist = new DefaultProjectionist(
Expand Down Expand Up @@ -185,7 +185,7 @@ public function handle(Message $message): void

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSetupMethod($projector)->willReturn($projector->create(...));
$projectorResolver->resolveSubscribeMethod($projector, $message)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethods($projector, $message)->willReturn([$projector->handle(...)]);
$projectorResolver->projectorId($projector)->willReturn($projectorId);

$projectionist = new DefaultProjectionist(
Expand Down Expand Up @@ -246,7 +246,7 @@ public function handle(Message $message): void
$projectorRepository->projectors()->willReturn([$projector1, $projector2])->shouldBeCalledOnce();

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector1, $message)->willReturn($projector1->handle(...));
$projectorResolver->resolveSubscribeMethods($projector1, $message)->willReturn([$projector1->handle(...)]);
$projectorResolver->projectorId($projector1)->willReturn($projectorId1);
$projectorResolver->projectorId($projector2)->willReturn($projectorId2);

Expand Down Expand Up @@ -354,8 +354,8 @@ public function handle(Message $message): void
$projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message1)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethod($projector, $message2)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethods($projector, $message1)->willReturn([$projector->handle(...)]);
$projectorResolver->resolveSubscribeMethods($projector, $message2)->willReturn([$projector->handle(...)]);
$projectorResolver->projectorId($projector)->willReturn($projectorId);

$projectionist = new DefaultProjectionist(
Expand Down Expand Up @@ -401,7 +401,7 @@ public function handle(Message $message): void
$projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethods($projector, $message)->willReturn([$projector->handle(...)]);
$projectorResolver->projectorId($projector)->willReturn($projectorId);

$projectionist = new DefaultProjectionist(
Expand Down Expand Up @@ -449,7 +449,7 @@ public function handle(Message $message): void
$projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message1)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethods($projector, $message1)->willReturn([$projector->handle(...)]);
$projectorResolver->projectorId($projector)->willReturn($projectorId);

$projectionist = new DefaultProjectionist(
Expand Down Expand Up @@ -508,7 +508,7 @@ public function handle(Message $message): void
$projectorRepository->projectors()->willReturn([$projector1, $projector2])->shouldBeCalledOnce();

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector1, $message)->willReturn($projector1->handle(...));
$projectorResolver->resolveSubscribeMethods($projector1, $message)->willReturn([$projector1->handle(...)]);
$projectorResolver->projectorId($projector1)->willReturn($projectorId1);
$projectorResolver->projectorId($projector2)->willReturn($projectorId2);

Expand Down Expand Up @@ -557,7 +557,7 @@ public function handle(Message $message): void
$projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethods($projector, $message)->willReturn([$projector->handle(...)]);
$projectorResolver->projectorId($projector)->willReturn($projectorId);

$projectionist = new DefaultProjectionist(
Expand Down Expand Up @@ -664,8 +664,8 @@ public function handle(Message $message): void
$projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();

$projectorResolver = $this->prophesize(ProjectorResolver::class);
$projectorResolver->resolveSubscribeMethod($projector, $message1)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethod($projector, $message2)->willReturn($projector->handle(...));
$projectorResolver->resolveSubscribeMethods($projector, $message1)->willReturn([$projector->handle(...)]);
$projectorResolver->resolveSubscribeMethods($projector, $message2)->willReturn([$projector->handle(...)]);
$projectorResolver->projectorId($projector)->willReturn($projectorId);

$projectionist = new DefaultProjectionist(
Expand Down
Loading

0 comments on commit 8583d29

Please sign in to comment.