Skip to content

Commit

Permalink
make the event bus optional
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Mar 21, 2024
1 parent 3b6277f commit f2139dc
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 188 deletions.
4 changes: 0 additions & 4 deletions docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ After we have defined everything, we still have to plug the whole thing together

```php
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Projection\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Projection\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcing\Projection\Store\DoctrineSubscriptionStore;
Expand Down Expand Up @@ -320,12 +319,9 @@ $projectionist = new DefaultSubscriptionEngine(
$projectorRepository,
);

$eventBus = DefaultEventBus::create();

$repositoryManager = new DefaultRepositoryManager(
$aggregateRegistry,
$eventStore,
$eventBus,
);

$hotelRepository = $repositoryManager->get(Hotel::class);
Expand Down
58 changes: 41 additions & 17 deletions docs/pages/repository.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# Repository

A `repository` takes care of storing and loading the `aggregates`.
He is also responsible for building [messages](event_bus.md) from the events and then dispatching them to the event bus.

Every aggregate needs a repository to be stored.
And each repository is only responsible for one aggregate.
He is also responsible for building [messages](event_bus.md) from the events
and optionally dispatching them to the event bus.

## Create a repository

Expand All @@ -13,8 +11,7 @@ This helps to build the repository correctly.

The `DefaultRepositoryManager` needs some services to work.
For one, it needs [AggregateRootRegistry](aggregate.md#aggregate-root-registry) so that it knows which aggregates exist.
The [store](store.md), which is then given to the repository so that it can save and load the events at the end.
And the [EventBus](event_bus.md) to publish the new events.
And the [store](store.md), which is then given to the repository so that it can save and load the events at the end.

After plugging the `DefaultRepositoryManager` together, you can create the repository associated with the aggregate.

Expand All @@ -24,7 +21,6 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$eventBus
);

$repository = $repositoryManager->get(Profile::class);
Expand All @@ -34,6 +30,41 @@ $repository = $repositoryManager->get(Profile::class);

The same repository instance is always returned for a specific aggregate.

### Event Bus

You can pass an event bus to the `DefaultRepositoryManager` to dispatch events synchronously.
This is useful if you want to react to events in the same transaction.

```php
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;

$eventBus = DefaultEventBus::create([/* listeners */]);

$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$eventBus,
);

$repository = $repositoryManager->get(Profile::class);
```

!!! warning

If you use the event bus, you should be aware that the events are dispatched synchronously.
You may encounter "at least once" problems.

!!! note

You can find out more about event bus [here](event_bus.md).

!!! tip

In most cases it is better to react to events asynchronously,
that's why we recommend the subscription engine.
More information can be found [here](subscription.md).

### Snapshots

Loading events for an aggregate is superfast.
Expand All @@ -54,7 +85,7 @@ $snapshotStore = new DefaultSnapshotStore([
$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$eventBus,
null,
$snapshotStore
);

Expand All @@ -77,7 +108,7 @@ $decorator = new ApplicationIdDecorator();
$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$eventBus,
null,
null,
$decorator
);
Expand All @@ -98,9 +129,7 @@ saving it or checking whether it exists.

An `aggregate` can be `saved`.
All new events that have not yet been written to the database are fetched from the aggregate.
These events are then also append to the database.
After the events have been written,
the new events are dispatched on the [event bus](./event_bus.md).
These events are then also append to the database.

```php
use Patchlevel\EventSourcing\Aggregate\Uuid;
Expand All @@ -115,11 +144,6 @@ $repository->save($profile);

All events are written to the database with one transaction in order to ensure data consistency.

!!! tip

If you want to make sure that dispatching events and storing events is transaction safe,
then you should look at the [outbox](outbox.md) pattern.

### Load an aggregate

An `aggregate` can be loaded using the `load` method.
Expand Down
4 changes: 2 additions & 2 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ final class DefaultRepository implements Repository
/** @param AggregateRootMetadata<T> $metadata */
public function __construct(
private Store $store,
private EventBus $eventBus,
private readonly AggregateRootMetadata $metadata,
private EventBus|null $eventBus = null,
private SnapshotStore|null $snapshotStore = null,
private MessageDecorator|null $messageDecorator = null,
ClockInterface|null $clock = null,
Expand Down Expand Up @@ -267,7 +267,7 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $
}

$this->archive(...$messages);
$this->eventBus->dispatch(...$messages);
$this->eventBus?->dispatch(...$messages);
});

$this->aggregateIsValid[$aggregate] = true;
Expand Down
4 changes: 2 additions & 2 deletions src/Repository/DefaultRepositoryManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ final class DefaultRepositoryManager implements RepositoryManager
public function __construct(
private AggregateRootRegistry $aggregateRootRegistry,
private Store $store,
private EventBus $eventBus,
private EventBus|null $eventBus = null,
private SnapshotStore|null $snapshotStore = null,
private MessageDecorator|null $messageDecorator = null,
ClockInterface|null $clock = null,
Expand Down Expand Up @@ -66,8 +66,8 @@ public function get(string $aggregateClass): Repository

return $this->instances[$aggregateClass] = new DefaultRepository(
$this->store,
$this->eventBus,
$this->metadataFactory->metadata($aggregateClass),
$this->eventBus,
$this->snapshotStore,
$this->messageDecorator,
$this->clock,
Expand Down
7 changes: 1 addition & 6 deletions tests/Benchmark/SimpleSetupBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
Expand All @@ -23,7 +21,6 @@
final class SimpleSetupBench
{
private Store $store;
private EventBus $bus;
private Repository $repository;

private AggregateRootId $id;
Expand All @@ -32,8 +29,6 @@ public function setUp(): void
{
$connection = DbalManager::createConnection();

$this->bus = DefaultEventBus::create();

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -44,7 +39,7 @@ public function setUp(): void
'eventstore',
);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata());
$this->repository = new DefaultRepository($this->store, Profile::metadata());

$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down
7 changes: 1 addition & 6 deletions tests/Benchmark/SnapshotsBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
Expand All @@ -26,7 +24,6 @@
final class SnapshotsBench
{
private Store $store;
private EventBus $bus;
private SnapshotStore $snapshotStore;
private Repository $repository;

Expand All @@ -38,8 +35,6 @@ public function setUp(): void
{
$connection = DbalManager::createConnection();

$this->bus = DefaultEventBus::create();

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -54,7 +49,7 @@ public function setUp(): void

$this->snapshotStore = new DefaultSnapshotStore(['default' => $this->adapter]);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata(), $this->snapshotStore);
$this->repository = new DefaultRepository($this->store, Profile::metadata(), null, $this->snapshotStore);

$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down
7 changes: 1 addition & 6 deletions tests/Benchmark/SplitStreamBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
Expand All @@ -27,7 +25,6 @@
final class SplitStreamBench
{
private Store $store;
private EventBus $bus;
private Repository $repository;

private AggregateRootId $id;
Expand All @@ -36,8 +33,6 @@ public function setUp(): void
{
$connection = DbalManager::createConnection();

$this->bus = DefaultEventBus::create();

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -50,9 +45,9 @@ public function setUp(): void

$this->repository = new DefaultRepository(
$this->store,
$this->bus,
Profile::metadata(),
null,
null,
new SplitStreamDecorator(
new AttributeEventMetadataFactory(),
),
Expand Down
7 changes: 1 addition & 6 deletions tests/Benchmark/SubscriptionEngineBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
Expand All @@ -30,7 +28,6 @@
final class SubscriptionEngineBench
{
private Store $store;
private EventBus $bus;
private Repository $repository;

private SubscriptionEngine $subscriptionEngine;
Expand All @@ -41,8 +38,6 @@ public function setUp(): void
{
$connection = DbalManager::createConnection();

$this->bus = DefaultEventBus::create();

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -55,7 +50,7 @@ public function setUp(): void
'eventstore',
);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata());
$this->repository = new DefaultRepository($this->store, Profile::metadata());

$subscriptionStore = new DoctrineSubscriptionStore(
$connection,
Expand Down
13 changes: 7 additions & 6 deletions tests/Integration/BankAccountSplitStream/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
Expand Down Expand Up @@ -63,12 +62,10 @@ public function testSuccessful(): void
new MetadataSubscriberAccessorRepository([$bankAccountProjector]),
);

$eventBus = DefaultEventBus::create([$bankAccountProjector]);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventBus,
null,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand All @@ -91,6 +88,8 @@ public function testSuccessful(): void
$bankAccount->addBalance(500);
$repository->save($bankAccount);

$engine->run();

$result = $this->connection->fetchAssociative('SELECT * FROM projection_bank_account WHERE id = ?', ['1']);

self::assertIsArray($result);
Expand All @@ -102,7 +101,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventBus,
null,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand All @@ -125,6 +124,8 @@ public function testSuccessful(): void
$bankAccount->addBalance(200);
$repository->save($bankAccount);

$engine->run();

$result = $this->connection->fetchAssociative('SELECT * FROM projection_bank_account WHERE id = ?', ['1']);

self::assertIsArray($result);
Expand All @@ -136,7 +137,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventBus,
null,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand Down
4 changes: 2 additions & 2 deletions tests/Integration/Pipeline/PipelineChangeStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public function testSuccessful(): void

$newSchemaDirector->create();

$oldRepository = new DefaultRepository($oldStore, DefaultEventBus::create(), Profile::metadata());
$newRepository = new DefaultRepository($newStore, DefaultEventBus::create(), Profile::metadata());
$oldRepository = new DefaultRepository($oldStore, Profile::metadata());
$newRepository = new DefaultRepository($newStore, Profile::metadata());

$profileId = ProfileId::fromString('1');
$profile = Profile::create($profileId);
Expand Down
Loading

0 comments on commit f2139dc

Please sign in to comment.