Skip to content

Commit

Permalink
add new exceptions for repository & store
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jan 6, 2024
1 parent 81b322a commit 4fec8c5
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 9 deletions.
15 changes: 13 additions & 2 deletions src/EventBus/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ public function event(): object
return $this->event;
}

/** @return class-string<AggregateRoot> */
/**
* @return class-string<AggregateRoot>
*
* @throws HeaderNotFound
*/
public function aggregateClass(): string
{
$value = $this->aggregateClass;
Expand All @@ -81,6 +85,7 @@ public function withAggregateClass(string $value): self
return $message;
}

/** @throws HeaderNotFound */
public function aggregateId(): string
{
$value = $this->aggregateId;
Expand All @@ -100,7 +105,11 @@ public function withAggregateId(string $value): self
return $message;
}

/** @return positive-int */
/**
* @return positive-int
*
* @throws HeaderNotFound
*/
public function playhead(): int
{
$value = $this->playhead;
Expand All @@ -121,6 +130,7 @@ public function withPlayhead(int $value): self
return $message;
}

/** @throws HeaderNotFound */
public function recordedOn(): DateTimeImmutable
{
$value = $this->recordedOn;
Expand Down Expand Up @@ -166,6 +176,7 @@ public function withArchived(bool $value): self
return $message;
}

/** @throws HeaderNotFound */
public function customHeader(string $name): mixed
{
if (array_keys($this->customHeaders, $name)) {
Expand Down
11 changes: 10 additions & 1 deletion src/Projection/Projectionist/Projectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@

interface Projectionist
{
/**
* @throws ProjectionistError
* @throws ProjectorNotFound
*/
public function boot(
ProjectionCriteria $criteria = new ProjectionCriteria(),
int|null $limit = null,
bool $throwByError = false,
): void;

/** @param positive-int $limit */
/**
* @param positive-int $limit
*
* @throws ProjectionistError
* @throws ProjectorNotFound
*/
public function run(
ProjectionCriteria $criteria = new ProjectionCriteria(),
int|null $limit = null,
Expand Down
25 changes: 25 additions & 0 deletions src/Repository/AggregateAlreadyExists.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Repository;

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Aggregate\AggregateRootId;

use function sprintf;

final class AggregateAlreadyExists extends RepositoryException
{
/** @param class-string<AggregateRoot> $aggregate */
public function __construct(string $aggregate, AggregateRootId $id)
{
parent::__construct(
sprintf(
'aggregate %s with id %s already exists',
$aggregate,
$id->toString(),
),
);
}
}
25 changes: 25 additions & 0 deletions src/Repository/AggregateOutdated.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Repository;

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Aggregate\AggregateRootId;

use function sprintf;

final class AggregateOutdated extends RepositoryException
{
/** @param class-string<AggregateRoot> $aggregate */
public function __construct(string $aggregate, AggregateRootId $id)
{
parent::__construct(
sprintf(
'Aggregate %s with id %s is outdated. There are new events in the store. Please reload the aggregate.',
$aggregate,
$id->toString(),
),
);
}
}
17 changes: 14 additions & 3 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Patchlevel\EventSourcing\Store\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
Expand Down Expand Up @@ -142,8 +143,9 @@ public function save(AggregateRoot $aggregate): void
}

$playhead = $aggregate->playhead() - $eventCount;
$newAggregate = $playhead === 0;

if (!isset($this->aggregateIsValid[$aggregate]) && $playhead !== 0) {
if (!isset($this->aggregateIsValid[$aggregate]) && !$newAggregate) {
throw new AggregateUnknown($aggregate::class, $aggregate->aggregateRootId());
}

Expand Down Expand Up @@ -176,8 +178,17 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator,
$events,
);

$this->store->transactional(function () use ($messages): void {
$this->store->save(...$messages);
$this->store->transactional(function () use ($messages, $aggregate, $newAggregate): void {
try {
$this->store->save(...$messages);
} catch (UniqueConstraintViolation) {
if ($newAggregate) {
throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId());
}

throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId());
}

$this->archive(...$messages);
$this->eventBus->dispatch(...$messages);
});
Expand Down
17 changes: 15 additions & 2 deletions src/Repository/Repository.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,24 @@
/** @template T of AggregateRoot */
interface Repository
{
/** @return T */
/**
* @return T
*
* @throws AggregateNotFound
*/
public function load(AggregateRootId $id): AggregateRoot;

public function has(AggregateRootId $id): bool;

/** @param T $aggregate */
/**
* @param T $aggregate
*
* @throws WrongAggregate
* @throws AggregateDetached
* @throws AggregateUnknown
* @throws PlayheadMismatch
* @throws AggregateAlreadyExists
* @throws AggregateOutdated
*/
public function save(AggregateRoot $aggregate): void;
}
4 changes: 4 additions & 0 deletions src/Serializer/Encoder/Encoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ interface Encoder
/**
* @param array<string, mixed> $data
* @param array<string, mixed> $options
*
* @throws EncodeNotPossible
*/
public function encode(array $data, array $options = []): string;

/**
* @param array<string, mixed> $options
*
* @return array<string, mixed>
*
* @throws DecodeNotPossible
*/
public function decode(string $data, array $options = []): array;
}
7 changes: 7 additions & 0 deletions src/Snapshot/SnapshotStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

interface SnapshotStore
{
/**
* @throws SnapshotNotConfigured
* @throws AdapterNotFound
*/
public function save(AggregateRoot $aggregateRoot): void;

/**
Expand All @@ -18,6 +22,9 @@ public function save(AggregateRoot $aggregateRoot): void;
* @return T
*
* @throws SnapshotNotFound
* @throws SnapshotVersionInvalid
* @throws SnapshotNotConfigured
* @throws AdapterNotFound
*
* @template T of AggregateRoot
*/
Expand Down
7 changes: 6 additions & 1 deletion src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Closure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Type;
Expand Down Expand Up @@ -183,7 +184,11 @@ function (Connection $connection) use ($messages): void {
implode("),\n(", $placeholders),
);

$connection->executeStatement($query, $parameters, $types);
try {
$connection->executeStatement($query, $parameters, $types);
} catch (UniqueConstraintViolationException $e) {
throw new UniqueConstraintViolation($e);
}
},
);
}
Expand Down
4 changes: 4 additions & 0 deletions src/Store/Store.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ public function load(

public function count(Criteria|null $criteria = null): int;

/**
* @throws MissingDataForStorage
* @throws UniqueConstraintViolation
*/
public function save(Message ...$messages): void;

/**
Expand Down
15 changes: 15 additions & 0 deletions src/Store/UniqueConstraintViolation.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

use Throwable;

final class UniqueConstraintViolation extends StoreException
{
public function __construct(Throwable|null $previous = null)
{
parent::__construct('unique constraint violation', 0, $previous);
}
}
78 changes: 78 additions & 0 deletions tests/Unit/Repository/DefaultRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory;
use Patchlevel\EventSourcing\Repository\AggregateAlreadyExists;
use Patchlevel\EventSourcing\Repository\AggregateDetached;
use Patchlevel\EventSourcing\Repository\AggregateNotFound;
use Patchlevel\EventSourcing\Repository\AggregateOutdated;
use Patchlevel\EventSourcing\Repository\AggregateUnknown;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\WrongAggregate;
Expand All @@ -20,6 +22,7 @@
use Patchlevel\EventSourcing\Store\ArrayStream;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\Profile;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated;
Expand Down Expand Up @@ -404,6 +407,81 @@ public function testUnknownException(): void
$repository->save($aggregate);
}

public function testDuplicate(): void
{
$this->expectException(AggregateAlreadyExists::class);

$store = $this->prophesize(Store::class);
$store->save(
Argument::type(Message::class),
)->willThrow(new UniqueConstraintViolation());

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0]()
);

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch(Argument::type('object'))->shouldNotBeCalled();

$repository = new DefaultRepository(
$store->reveal(),
$eventBus->reveal(),
Profile::metadata(),
);

$aggregate = Profile::createProfile(
ProfileId::fromString('1'),
Email::fromString('[email protected]'),
);

$repository->save($aggregate);
}

public function testOutdated(): void
{
$this->expectException(AggregateOutdated::class);

$store = $this->prophesize(Store::class);

$store->save(
Argument::that(static function (Message $message) {
return $message->playhead() === 1;
}),
)->shouldBeCalled();

$store->save(
Argument::that(static function (Message $message) {
return $message->playhead() === 2;
}),
)->willThrow(new UniqueConstraintViolation());

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0]()
);

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch(Argument::type('object'))->shouldBeCalled();

$repository = new DefaultRepository(
$store->reveal(),
$eventBus->reveal(),
Profile::metadata(),
);

$aggregate = Profile::createProfile(
ProfileId::fromString('1'),
Email::fromString('[email protected]'),
);

$repository->save($aggregate);

$aggregate->visitProfile(ProfileId::fromString('2'));

$repository->save($aggregate);
}

public function testSaveAggregateWithSplitStream(): void
{
$store = $this->prophesize(Store::class);
Expand Down

0 comments on commit 4fec8c5

Please sign in to comment.