Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow to free result after stream closed #528

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ phpunit: vendor phpunit-unit phpunit-integration #
phpunit-integration: vendor ## run phpunit integration tests
vendor/bin/phpunit --testsuite=integration

.PHONY: phpunit-integration-postgres
phpunit-integration-postgres: vendor ## run phpunit integration tests on postgres
DB_URL="pdo-pgsql://postgres:postgres@localhost:5432/eventstore?charset=utf8" vendor/bin/phpunit --testsuite=integration

.PHONY: phpunit-unit
phpunit-unit: vendor ## run phpunit unit tests
XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite=unit
Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
services:
postgres:
image: postgres:alpine
environment:
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=eventstore
expose:
- 5432
36 changes: 33 additions & 3 deletions src/Store/DoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
/** @implements IteratorAggregate<Message> */
final class DoctrineDbalStoreStream implements Stream, IteratorAggregate
{
private Result|null $result;

/** @var Generator<Message> */
private readonly Generator $generator;
private Generator|null $generator;

/** @var positive-int|0|null */
private int|null $position;
Expand All @@ -28,39 +30,59 @@ final class DoctrineDbalStoreStream implements Stream, IteratorAggregate
private int|null $index;

public function __construct(
private readonly Result $result,
Result $result,
EventSerializer $eventSerializer,
HeadersSerializer $headersSerializer,
AbstractPlatform $platform,
) {
$this->result = $result;
$this->generator = $this->buildGenerator($result, $eventSerializer, $headersSerializer, $platform);
$this->position = null;
$this->index = null;
}

public function close(): void
{
$this->result->free();
$this->result?->free();

$this->result = null;
$this->generator = null;
}

public function next(): void
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

$this->generator->next();
}

public function end(): bool
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

return !$this->generator->valid();
}

public function current(): Message|null
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

return $this->generator->current() ?: null;
}

/** @return positive-int|0|null */
public function position(): int|null
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

if ($this->position === null) {
$this->generator->key();
}
Expand All @@ -71,6 +93,10 @@ public function position(): int|null
/** @return positive-int|null */
public function index(): int|null
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

if ($this->index === null) {
$this->generator->key();
}
Expand All @@ -81,6 +107,10 @@ public function index(): int|null
/** @return Traversable<Message> */
public function getIterator(): Traversable
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

return $this->generator;
}

Expand Down
15 changes: 13 additions & 2 deletions src/Store/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@ interface Stream extends Traversable
{
public function close(): void;

/** @throws StreamClosed */
public function next(): void;

/** @throws StreamClosed */
public function current(): Message|null;

/** @throws StreamClosed */
public function end(): bool;

/** @return positive-int|0|null */
/**
* @return positive-int|0|null
*
* @throws StreamClosed
*/
public function position(): int|null;

/** @return positive-int|null */
/**
* @return positive-int|null
*
* @throws StreamClosed
*/
public function index(): int|null;
}
15 changes: 15 additions & 0 deletions src/Store/StreamClosed.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

use RuntimeException;

final class StreamClosed extends RuntimeException
{
public function __construct()
{
parent::__construct('Stream is already closed.');
}
}
28 changes: 17 additions & 11 deletions tests/Integration/Store/StoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,25 @@ public function testLoad(): void

$this->store->save($message);

$stream = $this->store->load();
$stream = null;

self::assertSame(1, $stream->index());
self::assertSame(0, $stream->position());
try {
$stream = $this->store->load();

$loadedMessage = $stream->current();
self::assertSame(1, $stream->index());
self::assertSame(0, $stream->position());

self::assertInstanceOf(Message::class, $loadedMessage);
self::assertNotSame($message, $loadedMessage);
self::assertEquals($message->event(), $loadedMessage->event());
self::assertEquals($message->header(AggregateHeader::class)->aggregateId, $loadedMessage->header(AggregateHeader::class)->aggregateId);
self::assertEquals($message->header(AggregateHeader::class)->aggregateName, $loadedMessage->header(AggregateHeader::class)->aggregateName);
self::assertEquals($message->header(AggregateHeader::class)->playhead, $loadedMessage->header(AggregateHeader::class)->playhead);
self::assertEquals($message->header(AggregateHeader::class)->recordedOn, $loadedMessage->header(AggregateHeader::class)->recordedOn);
$loadedMessage = $stream->current();

self::assertInstanceOf(Message::class, $loadedMessage);
self::assertNotSame($message, $loadedMessage);
self::assertEquals($message->event(), $loadedMessage->event());
self::assertEquals($message->header(AggregateHeader::class)->aggregateId, $loadedMessage->header(AggregateHeader::class)->aggregateId);
self::assertEquals($message->header(AggregateHeader::class)->aggregateName, $loadedMessage->header(AggregateHeader::class)->aggregateName);
self::assertEquals($message->header(AggregateHeader::class)->playhead, $loadedMessage->header(AggregateHeader::class)->playhead);
self::assertEquals($message->header(AggregateHeader::class)->recordedOn, $loadedMessage->header(AggregateHeader::class)->recordedOn);
} finally {
$stream?->close();
}
}
}
Loading