Skip to content

Commit

Permalink
rename outbox consumer into processor
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Feb 6, 2024
1 parent 6111a01 commit b366f7e
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 35 deletions.
17 changes: 11 additions & 6 deletions docs/pages/outbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,33 @@ This stores the events to be dispatched in the database.
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Outbox\OutboxEventBus;

$outboxEventBus = new OutboxEventBus($store);
$eventBus = new OutboxEventBus($store);

$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$outboxEventBus
$eventBus
);
```

And then you have to define the consumer. This gets the right event bus.
It is used to load the events to be dispatched from the database, dispatch the events and then empty the outbox table.

```php
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer;
use Patchlevel\EventSourcing\Outbox\StoreOutboxProcessor;

$consumer = new StoreOutboxConsumer(
$consumer = DefaultConsumer::create([
$mailListener,
]);

$processor = new StoreOutboxProcessor(
$store,
new EventBusPublisher($realEventBus)
new EventBusPublisher($consumer)
);

$consumer->consume();
$processor->process();
```

## Using outbox
Expand Down
6 changes: 3 additions & 3 deletions src/Console/Command/OutboxConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Outbox\OutboxConsumer;
use Patchlevel\EventSourcing\Outbox\OutboxProcessor;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
Expand All @@ -21,7 +21,7 @@
final class OutboxConsumeCommand extends Command
{
public function __construct(
private readonly OutboxConsumer $consumer,
private readonly OutboxProcessor $processor,
) {
parent::__construct();
}
Expand Down Expand Up @@ -76,7 +76,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$worker = DefaultWorker::create(
function () use ($messageLimit): void {
$this->consumer->consume($messageLimit);
$this->processor->process($messageLimit);
},
[
'runLimit' => $runLimit,
Expand Down
10 changes: 0 additions & 10 deletions src/Outbox/OutboxConsumer.php

This file was deleted.

10 changes: 10 additions & 0 deletions src/Outbox/OutboxProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Outbox;

interface OutboxProcessor
{
public function process(int|null $limit = null): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

namespace Patchlevel\EventSourcing\Outbox;

final class StoreOutboxConsumer implements OutboxConsumer
final class StoreOutboxProcessor implements OutboxProcessor
{
public function __construct(
private readonly OutboxStore $store,
private readonly OutboxPublisher $publisher,
) {
}

public function consume(int|null $limit = null): void
public function process(int|null $limit = null): void
{
$messages = $this->store->retrieveOutboxMessages($limit);

Expand Down
6 changes: 3 additions & 3 deletions tests/Integration/Outbox/OutboxTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
use Patchlevel\EventSourcing\Outbox\OutboxEventBus;
use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer;
use Patchlevel\EventSourcing\Outbox\StoreOutboxProcessor;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
Expand Down Expand Up @@ -116,12 +116,12 @@ public function testSuccessful(): void
$message->event(),
);

$consumer = new StoreOutboxConsumer(
$consumer = new StoreOutboxProcessor(
$outboxStore,
new EventBusPublisher($eventBusConsumer),
);

$consumer->consume();
$consumer->process();

self::assertSame(0, $outboxStore->countOutboxMessages());
self::assertCount(0, $outboxStore->retrieveOutboxMessages());
Expand Down
10 changes: 5 additions & 5 deletions tests/Unit/Console/Command/OutboxConsumeCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Patchlevel\EventSourcing\Tests\Unit\Console\Command;

use Patchlevel\EventSourcing\Console\Command\OutboxConsumeCommand;
use Patchlevel\EventSourcing\Outbox\OutboxConsumer;
use Patchlevel\EventSourcing\Outbox\OutboxProcessor;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;
use Symfony\Component\Console\Input\ArrayInput;
Expand All @@ -18,8 +18,8 @@ final class OutboxConsumeCommandTest extends TestCase

public function testSuccessful(): void
{
$consumer = $this->prophesize(OutboxConsumer::class);
$consumer->consume(100)->shouldBeCalled();
$consumer = $this->prophesize(OutboxProcessor::class);
$consumer->process(100)->shouldBeCalled();

$command = new OutboxConsumeCommand(
$consumer->reveal(),
Expand All @@ -35,8 +35,8 @@ public function testSuccessful(): void

public function testSuccessfulWithAllLimits(): void
{
$consumer = $this->prophesize(OutboxConsumer::class);
$consumer->consume(200)->shouldBeCalled();
$consumer = $this->prophesize(OutboxProcessor::class);
$consumer->process(200)->shouldBeCalled();

$command = new OutboxConsumeCommand(
$consumer->reveal(),
Expand Down
12 changes: 6 additions & 6 deletions tests/Unit/Outbox/StoreOutboxConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Outbox\OutboxPublisher;
use Patchlevel\EventSourcing\Outbox\OutboxStore;
use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer;
use Patchlevel\EventSourcing\Outbox\StoreOutboxProcessor;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;

/** @covers \Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer */
/** @covers \Patchlevel\EventSourcing\Outbox\StoreOutboxProcessor */
final class StoreOutboxConsumerTest extends TestCase
{
use ProphecyTrait;
Expand All @@ -35,8 +35,8 @@ public function testConsume(): void
$eventBus = $this->prophesize(OutboxPublisher::class);
$eventBus->publish($message)->shouldBeCalled();

$consumer = new StoreOutboxConsumer($store->reveal(), $eventBus->reveal());
$consumer->consume();
$consumer = new StoreOutboxProcessor($store->reveal(), $eventBus->reveal());
$consumer->process();
}

public function testConsumeWithLimit(): void
Expand All @@ -55,7 +55,7 @@ public function testConsumeWithLimit(): void
$eventBus = $this->prophesize(OutboxPublisher::class);
$eventBus->publish($message)->shouldBeCalled();

$consumer = new StoreOutboxConsumer($store->reveal(), $eventBus->reveal());
$consumer->consume(100);
$consumer = new StoreOutboxProcessor($store->reveal(), $eventBus->reveal());
$consumer->process(100);
}
}

0 comments on commit b366f7e

Please sign in to comment.