Skip to content

Commit

Permalink
Fix arhived and newStreamStart handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielBadura committed Feb 8, 2024
1 parent 3c14d09 commit bb09358
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 37 deletions.
4 changes: 1 addition & 3 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,7 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $
->withAggregateName($aggregateName)
->withAggregateId($aggregateId)
->withPlayhead(++$playhead)
->withRecordedOn($clock->now())
->withArchived(false)
->withNewStreamStart(false);
->withRecordedOn($clock->now());

if ($messageDecorator) {
return $messageDecorator($message);
Expand Down
28 changes: 19 additions & 9 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,28 @@ function (Connection $connection) use ($messages): void {

$parameters[] = $message->recordedOn();
$types[$offset + 5] = $dateTimeType;

$parameters[] = $message->newStreamStart();
$types[$offset + 6] = $booleanType;

$parameters[] = $message->archived();
$types[$offset + 7] = $booleanType;

$parameters[] = $message->customHeaders();
$types[$offset + 8] = $jsonType;
} catch (HeaderNotFound $e) {
throw new MissingDataForStorage($e->name, $e);
}

try {
$newStreamStart = $message->newStreamStart();
} catch (HeaderNotFound) {
$newStreamStart = false;
}
$parameters[] = $newStreamStart;
$types[$offset + 6] = $booleanType;

try {
$archived = $message->archived();
} catch (HeaderNotFound) {
$archived = false;
}
$parameters[] = $archived;
$types[$offset + 7] = $booleanType;

$parameters[] = $message->customHeaders();
$types[$offset + 8] = $jsonType;
}

$query = sprintf(
Expand Down
22 changes: 12 additions & 10 deletions tests/Integration/Store/StoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,12 @@ public function testSave(): void
->withAggregateName('profile')
->withAggregateId('test')
->withPlayhead(1)
->withRecordedOn(new DateTimeImmutable('2020-01-01 00:00:00'))
->withArchived(false)
->withNewStreamStart(false),
->withRecordedOn(new DateTimeImmutable('2020-01-01 00:00:00')),
Message::create(new ProfileCreated(ProfileId::fromString('test'), 'test'))
->withAggregateName('profile')
->withAggregateId('test')
->withPlayhead(2)
->withRecordedOn(new DateTimeImmutable('2020-01-02 00:00:00'))
->withArchived(false)
->withNewStreamStart(false),
->withRecordedOn(new DateTimeImmutable('2020-01-02 00:00:00')),
];

$this->store->save(...$messages);
Expand Down Expand Up @@ -97,16 +93,22 @@ public function testLoad(): void
->withAggregateName('profile')
->withAggregateId('test')
->withPlayhead(1)
->withRecordedOn(new DateTimeImmutable('2020-01-01 00:00:00'))
->withArchived(false)
->withNewStreamStart(false);
->withRecordedOn(new DateTimeImmutable('2020-01-01 00:00:00'));

$this->store->save($message);

$stream = $this->store->load();

self::assertSame(1, $stream->index());
self::assertSame(0, $stream->position());
self::assertEquals($message, $stream->current());

$loadedMessage = $stream->current();

self::assertNotSame($message, $loadedMessage);
self::assertEquals($message->aggregateId(), $loadedMessage->aggregateId());

Check failure on line 108 in tests/Integration/Store/StoreTest.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

PossiblyNullReference

tests/Integration/Store/StoreTest.php:108:69: PossiblyNullReference: Cannot call method aggregateId on possibly null value (see https://psalm.dev/083)
self::assertEquals($message->aggregateName(), $loadedMessage->aggregateName());
self::assertEquals($message->playhead(), $loadedMessage->playhead());
self::assertEquals($message->event(), $loadedMessage->event());
self::assertEquals($message->recordedOn(), $loadedMessage->recordedOn());
}
}
14 changes: 2 additions & 12 deletions tests/Unit/Outbox/DoctrineOutboxStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ public function testSaveOutboxMessage(): void
->withAggregateName('profile')
->withAggregateId('1')
->withPlayhead(1)
->withRecordedOn($recordedOn)
->withNewStreamStart(false)
->withArchived(false);
->withRecordedOn($recordedOn);

$innerMockedConnection = $this->prophesize(Connection::class);
$innerMockedConnection->insert(
Expand Down Expand Up @@ -72,8 +70,6 @@ public function testMarkOutboxMessageConsumed(): void
->withAggregateId('1')
->withPlayhead(1)
->withRecordedOn($recordedOn)
->withNewStreamStart(false)
->withArchived(false)
->withHeader(DoctrineOutboxStore::HEADER_OUTBOX_IDENTIFIER, 42);

$innerMockedConnection = $this->prophesize(Connection::class);
Expand Down Expand Up @@ -108,9 +104,7 @@ public function testMarkOutboxMessageConsumedHeaderMissing(): void
->withAggregateName('profile')
->withAggregateId('1')
->withPlayhead(1)
->withRecordedOn($recordedOn)
->withNewStreamStart(false)
->withArchived(false);
->withRecordedOn($recordedOn);

$innerMockedConnection = $this->prophesize(Connection::class);
$innerMockedConnection->delete(
Expand Down Expand Up @@ -147,8 +141,6 @@ public function testMarkOutboxMessageConsumedHeaderInvalid(): void
->withAggregateId('1')
->withPlayhead(1)
->withRecordedOn($recordedOn)
->withNewStreamStart(false)
->withArchived(false)
->withHeader(DoctrineOutboxStore::HEADER_OUTBOX_IDENTIFIER, 'asd');

$innerMockedConnection = $this->prophesize(Connection::class);
Expand Down Expand Up @@ -254,8 +246,6 @@ public function testRetrieveOutboxMessages(): void
->withAggregateId('1')
->withPlayhead(1)
->withRecordedOn($recordedOn)
->withNewStreamStart(false)
->withArchived(false)
->withHeader(DoctrineOutboxStore::HEADER_OUTBOX_IDENTIFIER, 42);

$queryBuilder = $this->prophesize(QueryBuilder::class);
Expand Down
4 changes: 1 addition & 3 deletions tests/Unit/Store/DoctrineDbalStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,7 @@ public function testSaveWithOneEvent(): void
->withAggregateName('profile')
->withAggregateId('1')
->withPlayhead(1)
->withRecordedOn($recordedOn)
->withNewStreamStart(false)
->withArchived(false);
->withRecordedOn($recordedOn);

$innerMockedConnection = $this->prophesize(Connection::class);

Expand Down

0 comments on commit bb09358

Please sign in to comment.