Skip to content

Commit

Permalink
allow subscribe all
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jan 11, 2024
1 parent 6d1bfe3 commit a692bf0
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 56 deletions.
4 changes: 3 additions & 1 deletion src/Attribute/Subscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#[Attribute(Attribute::TARGET_METHOD | Attribute::IS_REPEATABLE)]
final class Subscribe
{
/** @param class-string $eventClass */
public const ALL = '*';

/** @param class-string|'*' $eventClass */
public function __construct(
public readonly string $eventClass,
) {
Expand Down
12 changes: 10 additions & 2 deletions src/EventBus/AttributeListenerProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Patchlevel\EventSourcing\Attribute\Subscribe;
use ReflectionClass;

use function array_merge;

final class AttributeListenerProvider implements ListenerProvider
{
/** @var array<string, list<ListenerDescriptor>>|null */
Expand All @@ -22,7 +24,10 @@ public function __construct(
public function listenersForEvent(object $event): iterable
{
if ($this->subscribeMethods !== null) {
return $this->subscribeMethods[$event::class] ?? [];
return array_merge(
$this->subscribeMethods[$event::class] ?? [],
$this->subscribeMethods['*'] ?? [],
);
}

$this->subscribeMethods = [];
Expand All @@ -45,6 +50,9 @@ public function listenersForEvent(object $event): iterable
}
}

return $this->subscribeMethods[$event::class] ?? [];
return array_merge(
$this->subscribeMethods[$event::class] ?? [],
$this->subscribeMethods['*'] ?? [],
);
}
}
7 changes: 7 additions & 0 deletions src/Metadata/Projector/AttributeProjectorMetadataFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public function metadata(string $projector): ProjectorMetadata
$instance = $attribute->newInstance();
$eventClass = $instance->eventClass;

if ($eventClass === '*') {
throw new SubscribeAllNotSupported(
$projector,
$method->getName(),
);
}

if (array_key_exists($eventClass, $subscribeMethods)) {
throw new DuplicateSubscribeMethod(
$projector,
Expand Down
24 changes: 24 additions & 0 deletions src/Metadata/Projector/SubscribeAllNotSupported.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Metadata\Projector;

use Patchlevel\EventSourcing\Metadata\MetadataException;

use function sprintf;

final class SubscribeAllNotSupported extends MetadataException
{
/** @param class-string $projector */
public function __construct(string $projector, string $method)
{
parent::__construct(
sprintf(
'subscribe all (*) not supported in projector "%s" method "%s"',
$projector,
$method,
),
);
}
}
30 changes: 0 additions & 30 deletions src/WatchServer/WatchEventBusWrapper.php

This file was deleted.

26 changes: 26 additions & 0 deletions src/WatchServer/WatchListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\WatchServer;

use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\EventBus\Message;

final class WatchListener
{
public function __construct(
private readonly WatchServerClient $client,
) {
}

#[Subscribe('*')]
public function __invoke(Message $message): void
{
try {
$this->client->send($message);
} catch (SendingFailed) {
// to nothing
}
}
}
50 changes: 50 additions & 0 deletions tests/Unit/EventBus/AttributeListenerProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,54 @@ public function __invoke(Message $message): void
new ListenerDescriptor($listener2->__invoke(...)),
], $listeners);
}

public function testSubscribeAll(): void
{
$listener = new class {
#[Subscribe('*')]
public function __invoke(Message $message): void
{
}
};

$event = new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('[email protected]'),
);

$eventBus = new AttributeListenerProvider([$listener]);
$listeners = $eventBus->listenersForEvent($event);

self::assertEquals([
new ListenerDescriptor($listener->__invoke(...)),
], $listeners);
}

public function testMixedSubscribeTypes(): void
{
$listener = new class {
#[Subscribe('*')]
public function foo(Message $message): void
{
}

#[Subscribe(ProfileCreated::class)]
public function bar(Message $message): void
{
}
};

$event = new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('[email protected]'),
);

$eventBus = new AttributeListenerProvider([$listener]);
$listeners = $eventBus->listenersForEvent($event);

self::assertEquals([
new ListenerDescriptor($listener->bar(...)),
new ListenerDescriptor($listener->foo(...)),
], $listeners);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,39 @@

namespace Patchlevel\EventSourcing\Tests\Unit\WatchServer;

use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited;
use Patchlevel\EventSourcing\WatchServer\SendingFailed;
use Patchlevel\EventSourcing\WatchServer\WatchEventBusWrapper;
use Patchlevel\EventSourcing\WatchServer\WatchListener;
use Patchlevel\EventSourcing\WatchServer\WatchServerClient;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;

/** @covers \Patchlevel\EventSourcing\WatchServer\WatchEventBusWrapper */
final class WatchEventBusWrapperTest extends TestCase
/** @covers \Patchlevel\EventSourcing\WatchServer\WatchListener */
final class WatchListenerTest extends TestCase
{
use ProphecyTrait;

public function testWrapper(): void
public function testListener(): void
{
$message = new Message(new ProfileVisited(ProfileId::fromString('1')));

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch($message)->shouldBeCalled();

$client = $this->prophesize(WatchServerClient::class);
$client->send($message)->shouldBeCalled();

$wrapper = new WatchEventBusWrapper(
$eventBus->reveal(),
$client->reveal(),
);

$wrapper->dispatch($message);
$listener = new WatchListener($client->reveal());
$listener->__invoke($message);
}

public function testIgnoreErrors(): void
{
$message = new Message(new ProfileVisited(ProfileId::fromString('1')));

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch($message)->shouldBeCalled();

$client = $this->prophesize(WatchServerClient::class);
$client->send($message)->shouldBeCalled()->willThrow(SendingFailed::class);

$wrapper = new WatchEventBusWrapper(
$eventBus->reveal(),
$client->reveal(),
);

$wrapper->dispatch($message);
$listener = new WatchListener($client->reveal());
$listener->__invoke($message);
}
}

0 comments on commit a692bf0

Please sign in to comment.