Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make the event bus optional #547

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ After we have defined everything, we still have to plug the whole thing together

```php
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Projection\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Projection\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Projection\Subscriber\MetadataSubscriberAccessorRepository;
Expand Down Expand Up @@ -306,12 +305,9 @@ $projectionist = new DefaultSubscriptionEngine(
$projectorRepository,
);

$eventBus = DefaultEventBus::create();

$repositoryManager = new DefaultRepositoryManager(
$aggregateRegistry,
$eventStore,
$eventBus,
);

$hotelRepository = $repositoryManager->get(Hotel::class);
Expand Down
55 changes: 39 additions & 16 deletions docs/pages/repository.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# Repository

A `repository` takes care of storing and loading the `aggregates`.
He is also responsible for building [messages](event_bus.md) from the events and then dispatching them to the event bus.

Every aggregate needs a repository to be stored.
And each repository is only responsible for one aggregate.
He is also responsible for building [messages](event_bus.md) from the events
and optionally dispatching them to the event bus.

## Create a repository

Expand All @@ -13,8 +11,7 @@ This helps to build the repository correctly.

The `DefaultRepositoryManager` needs some services to work.
For one, it needs [AggregateRootRegistry](aggregate.md#aggregate-root-registry) so that it knows which aggregates exist.
The [store](store.md), which is then given to the repository so that it can save and load the events at the end.
And the [EventBus](event_bus.md) to publish the new events.
And the [store](store.md), which is then given to the repository so that it can save and load the events at the end.

After plugging the `DefaultRepositoryManager` together, you can create the repository associated with the aggregate.

Expand All @@ -24,7 +21,6 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$eventBus,
);

$repository = $repositoryManager->get(Profile::class);
Expand All @@ -33,6 +29,40 @@ $repository = $repositoryManager->get(Profile::class);

The same repository instance is always returned for a specific aggregate.

### Event Bus

You can pass an event bus to the `DefaultRepositoryManager` to dispatch events synchronously.
This is useful if you want to react to events in the same transaction.

```php
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;

$eventBus = DefaultEventBus::create([/* listeners */]);

$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$eventBus,
);

$repository = $repositoryManager->get(Profile::class);
```
!!! warning

If you use the event bus, you should be aware that the events are dispatched synchronously.
You may encounter "at least once" problems.

!!! note

You can find out more about event bus [here](event_bus.md).

!!! tip

In most cases it is better to react to events asynchronously,
that's why we recommend the subscription engine.
More information can be found [here](subscription.md).

### Snapshots

Loading events for an aggregate is superfast.
Expand All @@ -51,7 +81,7 @@ $snapshotStore = new DefaultSnapshotStore(['default' => $adapter]);
$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$eventBus,
null,
$snapshotStore,
);

Expand All @@ -73,7 +103,7 @@ $decorator = new ApplicationIdDecorator();
$repositoryManager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
$eventBus,
null,
null,
$decorator,
);
Expand All @@ -94,8 +124,6 @@ saving it or checking whether it exists.
An `aggregate` can be `saved`.
All new events that have not yet been written to the database are fetched from the aggregate.
These events are then also append to the database.
After the events have been written,
the new events are dispatched on the [event bus](./event_bus.md).

```php
use Patchlevel\EventSourcing\Aggregate\Uuid;
Expand All @@ -109,11 +137,6 @@ $repository->save($profile);

All events are written to the database with one transaction in order to ensure data consistency.

!!! tip

If you want to make sure that dispatching events and storing events is transaction safe,
then you should look at the [outbox](outbox.md) pattern.

### Load an aggregate

An `aggregate` can be loaded using the `load` method.
Expand Down
4 changes: 2 additions & 2 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ final class DefaultRepository implements Repository
/** @param AggregateRootMetadata<T> $metadata */
public function __construct(
private Store $store,
private EventBus $eventBus,
private readonly AggregateRootMetadata $metadata,
private EventBus|null $eventBus = null,
private SnapshotStore|null $snapshotStore = null,
private MessageDecorator|null $messageDecorator = null,
ClockInterface|null $clock = null,
Expand Down Expand Up @@ -267,7 +267,7 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $
}

$this->archive(...$messages);
$this->eventBus->dispatch(...$messages);
$this->eventBus?->dispatch(...$messages);
});

$this->aggregateIsValid[$aggregate] = true;
Expand Down
4 changes: 2 additions & 2 deletions src/Repository/DefaultRepositoryManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ final class DefaultRepositoryManager implements RepositoryManager
public function __construct(
private AggregateRootRegistry $aggregateRootRegistry,
private Store $store,
private EventBus $eventBus,
private EventBus|null $eventBus = null,
private SnapshotStore|null $snapshotStore = null,
private MessageDecorator|null $messageDecorator = null,
ClockInterface|null $clock = null,
Expand Down Expand Up @@ -66,8 +66,8 @@ public function get(string $aggregateClass): Repository

return $this->instances[$aggregateClass] = new DefaultRepository(
$this->store,
$this->eventBus,
$this->metadataFactory->metadata($aggregateClass),
$this->eventBus,
$this->snapshotStore,
$this->messageDecorator,
$this->clock,
Expand Down
7 changes: 1 addition & 6 deletions tests/Benchmark/SimpleSetupBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
Expand All @@ -23,7 +21,6 @@
final class SimpleSetupBench
{
private Store $store;
private EventBus $bus;
private Repository $repository;

private AggregateRootId $id;
Expand All @@ -32,8 +29,6 @@ public function setUp(): void
{
$connection = DbalManager::createConnection();

$this->bus = DefaultEventBus::create();

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -44,7 +39,7 @@ public function setUp(): void
'eventstore',
);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata());
$this->repository = new DefaultRepository($this->store, Profile::metadata());

$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down
7 changes: 1 addition & 6 deletions tests/Benchmark/SnapshotsBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
Expand All @@ -26,7 +24,6 @@
final class SnapshotsBench
{
private Store $store;
private EventBus $bus;
private SnapshotStore $snapshotStore;
private Repository $repository;

Expand All @@ -38,8 +35,6 @@ public function setUp(): void
{
$connection = DbalManager::createConnection();

$this->bus = DefaultEventBus::create();

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -54,7 +49,7 @@ public function setUp(): void

$this->snapshotStore = new DefaultSnapshotStore(['default' => $this->adapter]);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata(), $this->snapshotStore);
$this->repository = new DefaultRepository($this->store, Profile::metadata(), null, $this->snapshotStore);

$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down
7 changes: 1 addition & 6 deletions tests/Benchmark/SplitStreamBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
Expand All @@ -27,7 +25,6 @@
final class SplitStreamBench
{
private Store $store;
private EventBus $bus;
private Repository $repository;

private AggregateRootId $id;
Expand All @@ -36,8 +33,6 @@ public function setUp(): void
{
$connection = DbalManager::createConnection();

$this->bus = DefaultEventBus::create();

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -50,9 +45,9 @@ public function setUp(): void

$this->repository = new DefaultRepository(
$this->store,
$this->bus,
Profile::metadata(),
null,
null,
new SplitStreamDecorator(
new AttributeEventMetadataFactory(),
),
Expand Down
7 changes: 1 addition & 6 deletions tests/Benchmark/SubscriptionEngineBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Benchmark;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
Expand All @@ -30,7 +28,6 @@
final class SubscriptionEngineBench
{
private Store $store;
private EventBus $bus;
private Repository $repository;

private SubscriptionEngine $subscriptionEngine;
Expand All @@ -41,8 +38,6 @@ public function setUp(): void
{
$connection = DbalManager::createConnection();

$this->bus = DefaultEventBus::create();

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -55,7 +50,7 @@ public function setUp(): void
'eventstore',
);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata());
$this->repository = new DefaultRepository($this->store, Profile::metadata());

$subscriptionStore = new DoctrineSubscriptionStore(
$connection,
Expand Down
5 changes: 1 addition & 4 deletions tests/Benchmark/blackfire.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Doctrine\DBAL\Driver\PDO\SQLite\Driver;
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand All @@ -26,8 +25,6 @@
'path' => DB_PATH,
]);

$bus = DefaultEventBus::create();

$store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
Expand All @@ -38,7 +35,7 @@
'eventstore',
);

$repository = new DefaultRepository($store, $bus, Profile::metadata());
$repository = new DefaultRepository($store, Profile::metadata());

$schemaDirector = new DoctrineSchemaDirector(
$connection,
Expand Down
13 changes: 7 additions & 6 deletions tests/Integration/BankAccountSplitStream/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
Expand Down Expand Up @@ -63,12 +62,10 @@ public function testSuccessful(): void
new MetadataSubscriberAccessorRepository([$bankAccountProjector]),
);

$eventBus = DefaultEventBus::create([$bankAccountProjector]);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventBus,
null,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand All @@ -91,6 +88,8 @@ public function testSuccessful(): void
$bankAccount->addBalance(500);
$repository->save($bankAccount);

$engine->run();

$result = $this->connection->fetchAssociative('SELECT * FROM projection_bank_account WHERE id = ?', ['1']);

self::assertIsArray($result);
Expand All @@ -102,7 +101,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventBus,
null,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand All @@ -125,6 +124,8 @@ public function testSuccessful(): void
$bankAccount->addBalance(200);
$repository->save($bankAccount);

$engine->run();

$result = $this->connection->fetchAssociative('SELECT * FROM projection_bank_account WHERE id = ?', ['1']);

self::assertIsArray($result);
Expand All @@ -136,7 +137,7 @@ public function testSuccessful(): void
$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
$eventBus,
null,
null,
new ChainMessageDecorator([
new SplitStreamDecorator(new AttributeEventMetadataFactory()),
Expand Down
Loading
Loading