Skip to content

Commit

Permalink
decouple consume and publish in outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jan 14, 2024
1 parent 7b52991 commit e4ee47c
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 14 deletions.
9 changes: 8 additions & 1 deletion docs/pages/outbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,14 @@ And then you have to define the consumer. This gets the right event bus.
It is used to load the events to be dispatched from the database, dispatch the events and then empty the outbox table.

```php
$consumer = new StoreOutboxConsumer($store, $realEventBus);
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer;

$consumer = new StoreOutboxConsumer(
$store,
new EventBusPublisher($realEventBus)
);

$consumer->consume();
```

Expand Down
21 changes: 21 additions & 0 deletions src/Outbox/EventBusPublisher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Outbox;

use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Message;

final class EventBusPublisher implements OutboxPublisher
{
public function __construct(
private readonly EventBus $eventBus,
) {
}

public function publish(Message $message): void
{
$this->eventBus->dispatch($message);
}
}
12 changes: 12 additions & 0 deletions src/Outbox/OutboxPublisher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Outbox;

use Patchlevel\EventSourcing\EventBus\Message;

interface OutboxPublisher
{
public function publish(Message $message): void;
}
12 changes: 5 additions & 7 deletions src/Outbox/StoreOutboxConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,21 @@

namespace Patchlevel\EventSourcing\Outbox;

use Patchlevel\EventSourcing\EventBus\EventBus;

final class StoreOutboxConsumer implements OutboxConsumer
{
public function __construct(
private readonly OutboxStore $outboxStore,
private readonly EventBus $eventBus,
private readonly OutboxStore $store,
private readonly OutboxPublisher $publisher,
) {
}

public function consume(int|null $limit = null): void
{
$messages = $this->outboxStore->retrieveOutboxMessages($limit);
$messages = $this->store->retrieveOutboxMessages($limit);

foreach ($messages as $message) {
$this->eventBus->dispatch($message);
$this->outboxStore->markOutboxMessageConsumed($message);
$this->publisher->publish($message);
$this->store->markOutboxMessageConsumed($message);
}
}
}
10 changes: 9 additions & 1 deletion tests/Integration/Outbox/OutboxTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory;
use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
use Patchlevel\EventSourcing\Outbox\OutboxEventBus;
use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
Expand Down Expand Up @@ -63,9 +64,12 @@ public function testSuccessful(): void
'outbox',
);

<<<<<<< Updated upstream

Check failure on line 67 in tests/Integration/Outbox/OutboxTest.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

ParseError

tests/Integration/Outbox/OutboxTest.php:67:1: ParseError: Syntax error, unexpected T_SL on line 67 (see https://psalm.dev/173)

Check failure on line 67 in tests/Integration/Outbox/OutboxTest.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

UndefinedConstant

tests/Integration/Outbox/OutboxTest.php:67:9: UndefinedConstant: Const Updated is not defined (see https://psalm.dev/020)

Check failure on line 67 in tests/Integration/Outbox/OutboxTest.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

UndefinedConstant

tests/Integration/Outbox/OutboxTest.php:67:17: UndefinedConstant: Const upstream is not defined (see https://psalm.dev/020)
$realEventBus = new DefaultEventBus();
$realEventBus->addListener(new SendEmailProcessor());

=======

Check failure on line 71 in tests/Integration/Outbox/OutboxTest.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

ParseError

tests/Integration/Outbox/OutboxTest.php:71:1: ParseError: Syntax error, unexpected T_IS_IDENTICAL on line 71 (see https://psalm.dev/173)
>>>>>>> Stashed changes

Check failure on line 72 in tests/Integration/Outbox/OutboxTest.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

UndefinedConstant

tests/Integration/Outbox/OutboxTest.php:72:9: UndefinedConstant: Const Stashed is not defined (see https://psalm.dev/020)

Check failure on line 72 in tests/Integration/Outbox/OutboxTest.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

UndefinedConstant

tests/Integration/Outbox/OutboxTest.php:72:17: UndefinedConstant: Const changes is not defined (see https://psalm.dev/020)
$outboxEventBus = new OutboxEventBus($outboxStore);

$profileProjector = new ProfileProjector($this->connection);
Expand Down Expand Up @@ -122,7 +126,11 @@ public function testSuccessful(): void
$message->event(),
);

$consumer = new StoreOutboxConsumer($outboxStore, $realEventBus);
$consumer = new StoreOutboxConsumer(
$outboxStore,
new EventBusPublisher($realEventBus),
);

$consumer->consume();

self::assertSame(0, $outboxStore->countOutboxMessages());
Expand Down
36 changes: 36 additions & 0 deletions tests/Unit/Outbox/EventBusPublisherTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Unit\Outbox;

use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;

/** @covers \Patchlevel\EventSourcing\Outbox\EventBusPublisher */
final class EventBusPublisherTest extends TestCase
{
use ProphecyTrait;

public function testPublish(): void
{
$message = new Message(
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('[email protected]'),
),
);

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch($message)->shouldBeCalled();

$publisher = new EventBusPublisher($eventBus->reveal());
$publisher->publish($message);
}
}
10 changes: 5 additions & 5 deletions tests/Unit/Outbox/StoreOutboxConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace Patchlevel\EventSourcing\Tests\Unit\Outbox;

use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Outbox\OutboxPublisher;
use Patchlevel\EventSourcing\Outbox\OutboxStore;
use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email;
Expand All @@ -32,8 +32,8 @@ public function testConsume(): void
$store->retrieveOutboxMessages(null)->willReturn([$message]);
$store->markOutboxMessageConsumed($message)->shouldBeCalled();

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch($message)->shouldBeCalled();
$eventBus = $this->prophesize(OutboxPublisher::class);
$eventBus->publish($message)->shouldBeCalled();

$consumer = new StoreOutboxConsumer($store->reveal(), $eventBus->reveal());
$consumer->consume();
Expand All @@ -52,8 +52,8 @@ public function testConsumeWithLimit(): void
$store->retrieveOutboxMessages(100)->willReturn([$message]);
$store->markOutboxMessageConsumed($message)->shouldBeCalled();

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch($message)->shouldBeCalled();
$eventBus = $this->prophesize(OutboxPublisher::class);
$eventBus->publish($message)->shouldBeCalled();

$consumer = new StoreOutboxConsumer($store->reveal(), $eventBus->reveal());
$consumer->consume(100);
Expand Down

0 comments on commit e4ee47c

Please sign in to comment.