Skip to content

Commit

Permalink
refactor event bus & drop symfony event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jan 11, 2024
1 parent 65b5875 commit 00a3299
Show file tree
Hide file tree
Showing 34 changed files with 525 additions and 728 deletions.
3 changes: 1 addition & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@
"symfony/messenger": "for an advanced event bus"
},
"conflict": {
"doctrine/migrations": "<3.3",
"symfony/messenger": "<5.4"
"doctrine/migrations": "<3.3"
},
"config": {
"preferred-install": {
Expand Down
142 changes: 71 additions & 71 deletions composer.lock

Large diffs are not rendered by default.

28 changes: 2 additions & 26 deletions docs/pages/event_bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,40 +137,16 @@ $eventBus = new SymfonyEventBus($symfonyMessenger);

## Listener

A listener must implement the `Listener` interface and define the `__invoke` method.
You can listen for specific events with the attribute `Subscribe`.
This listener is then called for all saved events / messages.

```php
use Patchlevel\EventSourcing\EventBus\Listener;
use Patchlevel\EventSourcing\EventBus\Message;

final class WelcomeListener implements Listener
{
public function __invoke(Message $message): void
{
if ($message->event() instanceof ProfileCreated) {
echo 'Welcome!';
}
}
}
```

!!! warning

If you only want to listen to certain messages,
then you have to check it in the `__invoke` method or use the subscriber.

## Subscriber

A `Subscriber` is a listener, except that it has implemented the invoke method itself.
Instead, you can define your own and multiple methods and listen for specific events with the attribute `Subscribe`.

```php
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\EventBus\Listener;
use Patchlevel\EventSourcing\EventBus\Message;

final class WelcomeSubscriber extends Subscriber
final class WelcomeSubscriber
{
#[Subscribe(ProfileCreated::class)]
public function onProfileCreated(Message $message): void
Expand Down
17 changes: 13 additions & 4 deletions docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\EventBus\Subscriber;

final class SendCheckInEmailProcessor extends Subscriber
final class SendCheckInEmailProcessor
{
public function __construct(
private readonly Mailer $mailer
Expand Down Expand Up @@ -283,6 +283,7 @@ use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
Expand All @@ -294,9 +295,6 @@ $connection = DriverManager::getConnection([

$mailer = /* your own mailer */;

$eventBus = new DefaultEventBus();
$eventBus->addListener(new SendCheckInEmailProcessor($mailer));

$serializer = DefaultEventSerializer::createFromPaths(['src/Domain/Hotel/Event']);
$aggregateRegistry = (new AttributeAggregateRootRegistryFactory)->create(['src/Domain/Hotel']);

Expand All @@ -320,6 +318,17 @@ $projectionist = new DefaultProjectionist(
$projectorRepository
);

$eventBus = SyncProjectionistEventBusWrapper::createWithDefaultLockStrategy(
DefaultEventBus::create([
new SyncProjectorListener($projectionist),
]),
$projectionist
);

$eventBus = DefaultEventBus::create([
new SendCheckInEmailProcessor($mailer),
]);

$repositoryManager = new DefaultRepositoryManager(
$aggregateRegistry,
$store,
Expand Down
50 changes: 50 additions & 0 deletions src/EventBus/AttributeListenerProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

use Patchlevel\EventSourcing\Attribute\Subscribe;
use ReflectionClass;

final class AttributeListenerProvider implements ListenerProvider
{
/** @var array<string, list<ListenerDescriptor>>|null */
private array|null $subscribeMethods = null;

/** @param iterable<object> $listeners */
public function __construct(
private readonly iterable $listeners,
) {
}

/** @return iterable<ListenerDescriptor> */
public function listenersForEvent(object $event): iterable
{
if ($this->subscribeMethods !== null) {
return $this->subscribeMethods[$event::class] ?? [];
}

$this->subscribeMethods = [];

foreach ($this->listeners as $listener) {
$reflection = new ReflectionClass($listener);
$methods = $reflection->getMethods();

foreach ($methods as $method) {
$attributes = $method->getAttributes(Subscribe::class);

foreach ($attributes as $attribute) {
$instance = $attribute->newInstance();
$eventClass = $instance->eventClass;

$this->subscribeMethods[$eventClass][] = new ListenerDescriptor(
$listener->{$method->getName()}(...),

Check failure on line 42 in src/EventBus/AttributeListenerProvider.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedMethodCall

src/EventBus/AttributeListenerProvider.php:42:37: MixedMethodCall: Cannot determine the type of $listener (see https://psalm.dev/015)
);
}
}
}

return $this->subscribeMethods[$event::class] ?? [];
}
}
45 changes: 37 additions & 8 deletions src/EventBus/DefaultEventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,73 @@

namespace Patchlevel\EventSourcing\EventBus;

use function array_merge;
use Psr\Log\LoggerInterface;

use function array_shift;
use function sprintf;

final class DefaultEventBus implements EventBus
{
/** @var array<Message<object>> */
private array $queue;
private bool $processing;

/** @param list<Listener> $listeners */
public function __construct(
private array $listeners = [],
private readonly ListenerProvider $listenerProvider,
private readonly LoggerInterface|null $logger = null,
) {
$this->queue = [];
$this->processing = false;
}

public function dispatch(Message ...$messages): void
{
$this->queue = array_merge($this->queue, $messages);
foreach ($messages as $message) {
$this->logger?->debug(sprintf(
'EventBus: Add message "%s" to queue.',
$message->event()::class,
));

$this->queue[] = $message;
}

if ($this->processing) {
$this->logger?->debug('EventBus: Is already processing, dont start new processing.');

return;
}

$this->processing = true;

$this->logger?->debug('EventBus: Start processing queue.');

while ($message = array_shift($this->queue)) {
foreach ($this->listeners as $listener) {
$listener($message);
$this->logger?->debug(sprintf(
'EventBus: Dispatch message "%s" to listeners.',
$message->event()::class,
));

$listeners = $this->listenerProvider->listenersForEvent($message->event());

foreach ($listeners as $listener) {
$this->logger?->info(sprintf(
'EventBus: Dispatch message with event "%s" to listener "%s".',
$message->event()::class,
$listener->name(),
));

($listener->callable())($message);
}
}

$this->processing = false;

$this->logger?->debug('EventBus: Finished processing queue.');
}

public function addListener(Listener $listener): void
/** @param iterable<object> $listeners */
public static function create(iterable $listeners = []): self
{
$this->listeners[] = $listener;
return new self(new AttributeListenerProvider($listeners));
}
}
27 changes: 0 additions & 27 deletions src/EventBus/DuplicateSubscribeMethod.php

This file was deleted.

10 changes: 0 additions & 10 deletions src/EventBus/Listener.php

This file was deleted.

43 changes: 43 additions & 0 deletions src/EventBus/ListenerDescriptor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

use Closure;
use ReflectionFunction;

final class ListenerDescriptor
{
private readonly Closure $callable;
private readonly string $name;

public function __construct(callable $callable)
{
$callable = $callable(...);

$this->callable = $callable;

$r = new ReflectionFunction($callable);

if ($r->isAnonymous()) {
$this->name = 'Closure';
} elseif (!$callable = $r->getClosureThis()) {
$class = $r->getClosureCalledClass();

$this->name = ($class ? $class->name . '::' : '') . $r->name;
} else {
$this->name = $callable::class . '::' . $r->name;
}
}

public function name(): string
{
return $this->name;
}

public function callable(): callable
{
return $this->callable;
}
}
11 changes: 11 additions & 0 deletions src/EventBus/ListenerProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

interface ListenerProvider
{
/** @return iterable<ListenerDescriptor> */
public function listenersForEvent(object $event): iterable;
}
59 changes: 0 additions & 59 deletions src/EventBus/Subscriber.php

This file was deleted.

Loading

0 comments on commit 00a3299

Please sign in to comment.