Skip to content

Commit

Permalink
Merge pull request #403 from patchlevel/remove-deprecated-projector-l…
Browse files Browse the repository at this point in the history
…istener

replace deprecated projector listener
  • Loading branch information
DavidBadura authored Aug 10, 2023
2 parents edb84a9 + ac8910a commit f1d8f53
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Message;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\FlockStore;

final class RunProjectionistEventBusWrapper implements EventBus
final class SyncProjectionistEventBusWrapper implements EventBus
{
public function __construct(
private readonly EventBus $parentEventBus,
Expand All @@ -33,4 +34,15 @@ public function dispatch(Message ...$messages): void
$lock->release();
}
}

public static function createWithDefaultLockStrategy(EventBus $parentEventBus, Projectionist $projectionist): self
{
return new self(
$parentEventBus,
$projectionist,
new LockFactory(
new FlockStore(),
),
);
}
}
23 changes: 0 additions & 23 deletions src/Projection/Projector/SyncProjectorListener.php

This file was deleted.

37 changes: 27 additions & 10 deletions tests/Benchmark/WriteEventsBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
Expand All @@ -22,6 +24,8 @@
use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\ProfileId;
use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Projection\ProfileProjector;
use PhpBench\Attributes as Bench;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\InMemoryStore as LockInMemoryStore;

use function file_exists;
use function unlink;
Expand All @@ -47,20 +51,33 @@ public function setUp(): void
'path' => self::DB_PATH,
]);

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/BasicImplementation/Aggregate']),
'eventstore',
);

$profileProjection = new ProfileProjector($connection);
$projectionRepository = new InMemoryProjectorRepository(
[$profileProjection],
);

$this->bus = new DefaultEventBus();
$this->bus->addListener(new SyncProjectorListener($projectionRepository));
$this->bus->addListener(new SendEmailProcessor());
$projectionist = new DefaultProjectionist(
$this->store,
new InMemoryStore(),
$projectionRepository,
);

$this->store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/BasicImplementation/Aggregate']),
'eventstore',
$innerEventStream = new DefaultEventBus();
$innerEventStream->addListener(new SendEmailProcessor());

$this->bus = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
);

$this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata());
Expand All @@ -71,7 +88,7 @@ public function setUp(): void
);

$schemaDirector->create();
$profileProjection->create();
$projectionist->boot();

$this->profile = Profile::create(ProfileId::fromString('1'), 'Peter');
$this->repository->save($this->profile);
Expand Down
33 changes: 24 additions & 9 deletions tests/Integration/BankAccountSplitStream/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
Expand All @@ -24,6 +26,8 @@
use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Projection\BankAccountProjection;
use Patchlevel\EventSourcing\Tests\Integration\DbalManager;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\InMemoryStore as LockInMemoryStore;

use function count;

Expand All @@ -42,21 +46,32 @@ public function tearDown(): void
$this->connection->close();
}

public function testSingleTableSuccessful(): void
public function testSuccessful(): void
{
$bankAccountProjection = new BankAccountProjection($this->connection);
$projectionRepository = new InMemoryProjectorRepository([$bankAccountProjection]);

$eventStream = new DefaultEventBus();
$eventStream->addListener(new SyncProjectorListener($projectionRepository));

$store = new DoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']),
'eventstore',
);

$bankAccountProjection = new BankAccountProjection($this->connection);
$projectionRepository = new InMemoryProjectorRepository([$bankAccountProjection]);

$projectionist = new DefaultProjectionist(
$store,
new InMemoryStore(),
$projectionRepository,
);

$eventStream = new SyncProjectionistEventBusWrapper(
new DefaultEventBus(),
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['bank_account' => BankAccount::class]),
$store,
Expand All @@ -74,7 +89,7 @@ public function testSingleTableSuccessful(): void
);

$schemaDirector->create();
$bankAccountProjection->create();
$projectionist->boot();

$bankAccount = BankAccount::create(AccountId::fromString('1'), 'John');
$bankAccount->addBalance(100);
Expand Down
98 changes: 71 additions & 27 deletions tests/Integration/BasicImplementation/BasicIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
use Patchlevel\EventSourcing\EventBus\SymfonyEventBus;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory;
use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper;
use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository;
use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
Expand All @@ -23,6 +25,8 @@
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Projection\ProfileProjection;
use Patchlevel\EventSourcing\Tests\Integration\DbalManager;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\InMemoryStore as LockInMemoryStore;

/** @coversNothing */
final class BasicIntegrationTest extends TestCase
Expand All @@ -42,20 +46,33 @@ public function tearDown(): void

public function testSuccessful(): void
{
$store = new DoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']),
'eventstore',
);

$profileProjection = new ProfileProjection($this->connection);
$projectorRepository = new InMemoryProjectorRepository(
[$profileProjection],
);

$eventStream = new DefaultEventBus();
$eventStream->addListener(new SyncProjectorListener($projectorRepository));
$eventStream->addListener(new SendEmailProcessor());
$projectionist = new DefaultProjectionist(
$store,
new InMemoryStore(),
$projectorRepository,
);

$store = new DoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']),
'eventstore',
$innerEventStream = new DefaultEventBus();
$innerEventStream->addListener(new SendEmailProcessor());

$eventStream = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
);

$manager = new DefaultRepositoryManager(
Expand All @@ -73,7 +90,7 @@ public function testSuccessful(): void
);

$schemaDirector->create();
$profileProjection->create();
$projectionist->boot();

$profile = Profile::create(ProfileId::fromString('1'), 'John');
$repository->save($profile);
Expand Down Expand Up @@ -102,28 +119,42 @@ public function testSuccessful(): void

public function testWithSymfonySuccessful(): void
{
$store = new DoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']),
'eventstore',
);

$profileProjection = new ProfileProjection($this->connection);
$projectorRepository = new InMemoryProjectorRepository(
[$profileProjection],
);

$eventStream = SymfonyEventBus::create([
new SyncProjectorListener($projectorRepository),
$projectionist = new DefaultProjectionist(
$store,
new InMemoryStore(),
$projectorRepository,
);

$innerEventStream = SymfonyEventBus::create([
new SendEmailProcessor(),
]);

$store = new DoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']),
'eventstore',
$eventStream = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
);

$manager = new DefaultRepositoryManager(
new AggregateRootRegistry(['profile' => Profile::class]),
$store,
$eventStream,
);

$repository = $manager->get(Profile::class);

$schemaDirector = new DoctrineSchemaDirector(
Expand All @@ -132,7 +163,7 @@ public function testWithSymfonySuccessful(): void
);

$schemaDirector->create();
$profileProjection->create();
$projectionist->boot();

$profile = Profile::create(ProfileId::fromString('1'), 'John');
$repository->save($profile);
Expand Down Expand Up @@ -164,20 +195,33 @@ public function testWithSymfonySuccessful(): void

public function testSnapshot(): void
{
$store = new DoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']),
'eventstore',
);

$profileProjection = new ProfileProjection($this->connection);
$projectorRepository = new InMemoryProjectorRepository(
[$profileProjection],
);

$eventStream = new DefaultEventBus();
$eventStream->addListener(new SyncProjectorListener($projectorRepository));
$eventStream->addListener(new SendEmailProcessor());
$projectionist = new DefaultProjectionist(
$store,
new InMemoryStore(),
$projectorRepository,
);

$store = new DoctrineDbalStore(
$this->connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
(new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']),
'eventstore',
$innerEventStream = new DefaultEventBus();
$innerEventStream->addListener(new SendEmailProcessor());

$eventStream = new SyncProjectionistEventBusWrapper(
$innerEventStream,
$projectionist,
new LockFactory(
new LockInMemoryStore(),
),
);

$manager = new DefaultRepositoryManager(
Expand All @@ -195,7 +239,7 @@ public function testSnapshot(): void
);

$schemaDirector->create();
$profileProjection->create();
$projectionist->boot();

$profile = Profile::create(ProfileId::fromString('1'), 'John');
$repository->save($profile);
Expand Down
Loading

0 comments on commit f1d8f53

Please sign in to comment.