Skip to content

Commit

Permalink
Merge pull request #491 from patchlevel/add-consumer
Browse files Browse the repository at this point in the history
extract consumer logic and rename outbox consumer into processor
  • Loading branch information
DavidBadura authored Feb 6, 2024
2 parents 9b672e8 + b366f7e commit 066739e
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 237 deletions.
25 changes: 23 additions & 2 deletions docs/pages/event_bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,46 @@ $eventBus = DefaultEventBus::create([

The order in which the listeners are executed is determined by the order in which they are passed to the factory.

Internally, the event bus uses the `Consumer` to consume the messages and call the listeners.

## Consumer

The consumer is responsible for consuming the messages and calling the listeners.

```php
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;

$consumer = DefaultConsumer::create([
$mailListener,
];

$consumer->consume($message);
```

Internally, the consumer uses the `ListenerProvider` to find the listeners for the message.

## Listener provider

The listener provider is responsible for finding all listeners for a specific event.
The default listener provider uses attributes to find the listeners.

```php
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\AttributeListenerProvider;

$listenerProvider = new AttributeListenerProvider([
$mailListener,
]);

$eventBus = new DefaultEventBus($listenerProvider);
$eventBus = new DefaultEventBus(
new DefaultConsumer($listenerProvider)
);
```

!!! tip

The `DefaultEventBus::create` method uses the `AttributeListenerProvider` by default.
The `DefaultEventBus::create` method uses the `DefaultConsumer` and `AttributeListenerProvider` by default.

### Custom listener provider

Expand Down
17 changes: 11 additions & 6 deletions docs/pages/outbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,33 @@ This stores the events to be dispatched in the database.
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Outbox\OutboxEventBus;

$outboxEventBus = new OutboxEventBus($store);
$eventBus = new OutboxEventBus($store);

$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$outboxEventBus
$eventBus
);
```

And then you have to define the consumer. This gets the right event bus.
It is used to load the events to be dispatched from the database, dispatch the events and then empty the outbox table.

```php
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer;
use Patchlevel\EventSourcing\Outbox\StoreOutboxProcessor;

$consumer = new StoreOutboxConsumer(
$consumer = DefaultConsumer::create([
$mailListener,
]);

$processor = new StoreOutboxProcessor(
$store,
new EventBusPublisher($realEventBus)
new EventBusPublisher($consumer)
);

$consumer->consume();
$processor->process();
```

## Using outbox
Expand Down
6 changes: 3 additions & 3 deletions src/Console/Command/OutboxConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Outbox\OutboxConsumer;
use Patchlevel\EventSourcing\Outbox\OutboxProcessor;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
Expand All @@ -21,7 +21,7 @@
final class OutboxConsumeCommand extends Command
{
public function __construct(
private readonly OutboxConsumer $consumer,
private readonly OutboxProcessor $processor,
) {
parent::__construct();
}
Expand Down Expand Up @@ -76,7 +76,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$worker = DefaultWorker::create(
function () use ($messageLimit): void {
$this->consumer->consume($messageLimit);
$this->processor->process($messageLimit);
},
[
'runLimit' => $runLimit,
Expand Down
10 changes: 10 additions & 0 deletions src/EventBus/Consumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

interface Consumer
{
public function consume(Message $message): void;
}
46 changes: 46 additions & 0 deletions src/EventBus/DefaultConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

use Psr\Log\LoggerInterface;

use function sprintf;

final class DefaultConsumer implements Consumer
{
public function __construct(
private readonly ListenerProvider $listenerProvider,
private readonly LoggerInterface|null $logger = null,
) {
}

public function consume(Message $message): void
{
$eventClass = $message->event()::class;

$this->logger?->debug(sprintf(
'EventBus: Consume message "%s".',
$eventClass,
));

$listeners = $this->listenerProvider->listenersForEvent($eventClass);

foreach ($listeners as $listener) {
$this->logger?->info(sprintf(
'EventBus: Listener "%s" consume message with event "%s".',
$listener->name(),
$eventClass,
));

($listener->callable())($message);
}
}

/** @param iterable<object> $listeners */
public static function create(iterable $listeners = []): self
{
return new self(new AttributeListenerProvider($listeners));
}
}
23 changes: 3 additions & 20 deletions src/EventBus/DefaultEventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final class DefaultEventBus implements EventBus
private bool $processing;

public function __construct(
private readonly ListenerProvider $listenerProvider,
private readonly Consumer $consumer,
private readonly LoggerInterface|null $logger = null,
) {
$this->queue = [];
Expand Down Expand Up @@ -46,24 +46,7 @@ public function dispatch(Message ...$messages): void
$this->logger?->debug('EventBus: Start processing queue.');

while ($message = array_shift($this->queue)) {
$eventClass = $message->event()::class;

$this->logger?->debug(sprintf(
'EventBus: Dispatch message "%s" to listeners.',
$eventClass,
));

$listeners = $this->listenerProvider->listenersForEvent($eventClass);

foreach ($listeners as $listener) {
$this->logger?->info(sprintf(
'EventBus: Dispatch message with event "%s" to listener "%s".',
$eventClass,
$listener->name(),
));

($listener->callable())($message);
}
$this->consumer->consume($message);
}
} finally {
$this->processing = false;
Expand All @@ -75,6 +58,6 @@ public function dispatch(Message ...$messages): void
/** @param iterable<object> $listeners */
public static function create(iterable $listeners = []): self
{
return new self(new AttributeListenerProvider($listeners));
return new self(DefaultConsumer::create($listeners));
}
}
6 changes: 3 additions & 3 deletions src/Outbox/EventBusPublisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

namespace Patchlevel\EventSourcing\Outbox;

use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Consumer;
use Patchlevel\EventSourcing\EventBus\Message;

final class EventBusPublisher implements OutboxPublisher
{
public function __construct(
private readonly EventBus $eventBus,
private readonly Consumer $consumer,
) {
}

public function publish(Message $message): void
{
$this->eventBus->dispatch($message);
$this->consumer->consume($message);
}
}
10 changes: 0 additions & 10 deletions src/Outbox/OutboxConsumer.php

This file was deleted.

11 changes: 11 additions & 0 deletions src/Outbox/OutboxEventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@

use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Message;
use Psr\Log\LoggerInterface;

use function sprintf;

final class OutboxEventBus implements EventBus
{
public function __construct(
private readonly OutboxStore $store,
private readonly LoggerInterface|null $logger = null,
) {
}

public function dispatch(Message ...$messages): void
{
$this->store->saveOutboxMessage(...$messages);

foreach ($messages as $message) {
$this->logger?->debug(sprintf(
'EventBus: Message "%s" added to queue.',
$message->event()::class,
));
}
}
}
10 changes: 10 additions & 0 deletions src/Outbox/OutboxProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Outbox;

interface OutboxProcessor
{
public function process(int|null $limit = null): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

namespace Patchlevel\EventSourcing\Outbox;

final class StoreOutboxConsumer implements OutboxConsumer
final class StoreOutboxProcessor implements OutboxProcessor
{
public function __construct(
private readonly OutboxStore $store,
private readonly OutboxPublisher $publisher,
) {
}

public function consume(int|null $limit = null): void
public function process(int|null $limit = null): void
{
$messages = $this->store->retrieveOutboxMessages($limit);

Expand Down
14 changes: 6 additions & 8 deletions tests/Integration/Outbox/OutboxTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace Patchlevel\EventSourcing\Tests\Integration\Outbox;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
use Patchlevel\EventSourcing\Outbox\OutboxEventBus;
use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer;
use Patchlevel\EventSourcing\Outbox\StoreOutboxProcessor;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
Expand Down Expand Up @@ -74,9 +74,7 @@ public function testSuccessful(): void
$projectorRepository,
);

$realEventBus = DefaultEventBus::create([
new SendEmailProcessor(),
]);
$eventBusConsumer = DefaultConsumer::create([new SendEmailProcessor()]);

$eventStream = new SyncProjectionistEventBusWrapper(
$outboxEventBus,
Expand Down Expand Up @@ -118,12 +116,12 @@ public function testSuccessful(): void
$message->event(),
);

$consumer = new StoreOutboxConsumer(
$consumer = new StoreOutboxProcessor(
$outboxStore,
new EventBusPublisher($realEventBus),
new EventBusPublisher($eventBusConsumer),
);

$consumer->consume();
$consumer->process();

self::assertSame(0, $outboxStore->countOutboxMessages());
self::assertCount(0, $outboxStore->retrieveOutboxMessages());
Expand Down
10 changes: 5 additions & 5 deletions tests/Unit/Console/Command/OutboxConsumeCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Unit\Console\Command;

use Patchlevel\EventSourcing\Console\Command\OutboxConsumeCommand;
use Patchlevel\EventSourcing\Outbox\OutboxConsumer;
use Patchlevel\EventSourcing\Outbox\OutboxProcessor;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;
use Symfony\Component\Console\Input\ArrayInput;
Expand All @@ -18,8 +18,8 @@ final class OutboxConsumeCommandTest extends TestCase

public function testSuccessful(): void
{
$consumer = $this->prophesize(OutboxConsumer::class);
$consumer->consume(100)->shouldBeCalled();
$consumer = $this->prophesize(OutboxProcessor::class);
$consumer->process(100)->shouldBeCalled();

$command = new OutboxConsumeCommand(
$consumer->reveal(),
Expand All @@ -35,8 +35,8 @@ public function testSuccessful(): void

public function testSuccessfulWithAllLimits(): void
{
$consumer = $this->prophesize(OutboxConsumer::class);
$consumer->consume(200)->shouldBeCalled();
$consumer = $this->prophesize(OutboxProcessor::class);
$consumer->process(200)->shouldBeCalled();

$command = new OutboxConsumeCommand(
$consumer->reveal(),
Expand Down
Loading

0 comments on commit 066739e

Please sign in to comment.