From 76e6becdd186145313034c7f273e9c6c5d69c607 Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 8 Mar 2024 15:02:08 +0100 Subject: [PATCH] allow to free result after stream closed --- Makefile | 4 +++ docker-compose.yaml | 8 ++++++ src/Store/DoctrineDbalStoreStream.php | 36 ++++++++++++++++++++++++--- src/Store/Stream.php | 15 +++++++++-- src/Store/StreamClosed.php | 15 +++++++++++ tests/Integration/Store/StoreTest.php | 28 +++++++++++++-------- 6 files changed, 90 insertions(+), 16 deletions(-) create mode 100644 docker-compose.yaml create mode 100644 src/Store/StreamClosed.php diff --git a/Makefile b/Makefile index 1fc5334de..e0e363d1a 100644 --- a/Makefile +++ b/Makefile @@ -36,6 +36,10 @@ phpunit: vendor phpunit-unit phpunit-integration # phpunit-integration: vendor ## run phpunit integration tests vendor/bin/phpunit --testsuite=integration +.PHONY: phpunit-integration-postgres +phpunit-integration-postgres: vendor ## run phpunit integration tests on postgres + DB_URL="pdo-pgsql://postgres:postgres@localhost:5432/eventstore?charset=utf8" vendor/bin/phpunit --testsuite=integration + .PHONY: phpunit-unit phpunit-unit: vendor ## run phpunit unit tests XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite=unit diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 000000000..fd68b02a0 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,8 @@ +services: + postgres: + image: postgres:alpine + environment: + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=eventstore + expose: + - 5432 \ No newline at end of file diff --git a/src/Store/DoctrineDbalStoreStream.php b/src/Store/DoctrineDbalStoreStream.php index 48bc371b0..11a8a385c 100644 --- a/src/Store/DoctrineDbalStoreStream.php +++ b/src/Store/DoctrineDbalStoreStream.php @@ -18,8 +18,10 @@ /** @implements IteratorAggregate */ final class DoctrineDbalStoreStream implements Stream, IteratorAggregate { + private Result|null $result; + /** @var Generator */ - private readonly Generator $generator; + private Generator|null $generator; /** @var positive-int|0|null */ private int|null $position; @@ -28,11 +30,12 @@ final class DoctrineDbalStoreStream implements Stream, IteratorAggregate private int|null $index; public function __construct( - private readonly Result $result, + Result $result, EventSerializer $eventSerializer, HeadersSerializer $headersSerializer, AbstractPlatform $platform, ) { + $this->result = $result; $this->generator = $this->buildGenerator($result, $eventSerializer, $headersSerializer, $platform); $this->position = null; $this->index = null; @@ -40,27 +43,46 @@ public function __construct( public function close(): void { - $this->result->free(); + $this->result?->free(); + + $this->result = null; + $this->generator = null; } public function next(): void { + if ($this->result === null || $this->generator === null) { + throw new StreamClosed(); + } + $this->generator->next(); } public function end(): bool { + if ($this->result === null || $this->generator === null) { + throw new StreamClosed(); + } + return !$this->generator->valid(); } public function current(): Message|null { + if ($this->result === null || $this->generator === null) { + throw new StreamClosed(); + } + return $this->generator->current() ?: null; } /** @return positive-int|0|null */ public function position(): int|null { + if ($this->result === null || $this->generator === null) { + throw new StreamClosed(); + } + if ($this->position === null) { $this->generator->key(); } @@ -71,6 +93,10 @@ public function position(): int|null /** @return positive-int|null */ public function index(): int|null { + if ($this->result === null || $this->generator === null) { + throw new StreamClosed(); + } + if ($this->index === null) { $this->generator->key(); } @@ -81,6 +107,10 @@ public function index(): int|null /** @return Traversable */ public function getIterator(): Traversable { + if ($this->result === null || $this->generator === null) { + throw new StreamClosed(); + } + return $this->generator; } diff --git a/src/Store/Stream.php b/src/Store/Stream.php index 9fa960e50..7ca0c48d0 100644 --- a/src/Store/Stream.php +++ b/src/Store/Stream.php @@ -12,15 +12,26 @@ interface Stream extends Traversable { public function close(): void; + /** @throws StreamClosed */ public function next(): void; + /** @throws StreamClosed */ public function current(): Message|null; + /** @throws StreamClosed */ public function end(): bool; - /** @return positive-int|0|null */ + /** + * @return positive-int|0|null + * + * @throws StreamClosed + */ public function position(): int|null; - /** @return positive-int|null */ + /** + * @return positive-int|null + * + * @throws StreamClosed + */ public function index(): int|null; } diff --git a/src/Store/StreamClosed.php b/src/Store/StreamClosed.php new file mode 100644 index 000000000..ac68dd8ad --- /dev/null +++ b/src/Store/StreamClosed.php @@ -0,0 +1,15 @@ +store->save($message); - $stream = $this->store->load(); + $stream = null; - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); + try { + $stream = $this->store->load(); - $loadedMessage = $stream->current(); + self::assertSame(1, $stream->index()); + self::assertSame(0, $stream->position()); - self::assertInstanceOf(Message::class, $loadedMessage); - self::assertNotSame($message, $loadedMessage); - self::assertEquals($message->event(), $loadedMessage->event()); - self::assertEquals($message->header(AggregateHeader::class)->aggregateId, $loadedMessage->header(AggregateHeader::class)->aggregateId); - self::assertEquals($message->header(AggregateHeader::class)->aggregateName, $loadedMessage->header(AggregateHeader::class)->aggregateName); - self::assertEquals($message->header(AggregateHeader::class)->playhead, $loadedMessage->header(AggregateHeader::class)->playhead); - self::assertEquals($message->header(AggregateHeader::class)->recordedOn, $loadedMessage->header(AggregateHeader::class)->recordedOn); + $loadedMessage = $stream->current(); + + self::assertInstanceOf(Message::class, $loadedMessage); + self::assertNotSame($message, $loadedMessage); + self::assertEquals($message->event(), $loadedMessage->event()); + self::assertEquals($message->header(AggregateHeader::class)->aggregateId, $loadedMessage->header(AggregateHeader::class)->aggregateId); + self::assertEquals($message->header(AggregateHeader::class)->aggregateName, $loadedMessage->header(AggregateHeader::class)->aggregateName); + self::assertEquals($message->header(AggregateHeader::class)->playhead, $loadedMessage->header(AggregateHeader::class)->playhead); + self::assertEquals($message->header(AggregateHeader::class)->recordedOn, $loadedMessage->header(AggregateHeader::class)->recordedOn); + } finally { + $stream?->close(); + } } }