Skip to content

Commit

Permalink
allow to free result after stream closed
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Mar 8, 2024
1 parent 5c5ec5a commit 9214346
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 17 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ phpunit: vendor phpunit-unit phpunit-integration #
phpunit-integration: vendor ## run phpunit integration tests
vendor/bin/phpunit --testsuite=integration

.PHONY: phpunit-integration-postgres
phpunit-integration-postgres: vendor ## run phpunit integration tests on postgres
DB_URL="pdo-pgsql://postgres:postgres@localhost:5432/eventstore?charset=utf8" vendor/bin/phpunit --testsuite=integration

.PHONY: phpunit-unit
phpunit-unit: vendor ## run phpunit unit tests
XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite=unit
Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
services:
postgres:
image: postgres:alpine
environment:
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=eventstore
expose:
- 5432
36 changes: 33 additions & 3 deletions src/Store/DoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
/** @implements IteratorAggregate<Message> */
final class DoctrineDbalStoreStream implements Stream, IteratorAggregate
{
private Result|null $result;

/** @var Generator<Message> */
private readonly Generator $generator;
private Generator|null $generator;

/** @var positive-int|0|null */
private int|null $position;
Expand All @@ -28,39 +30,59 @@ final class DoctrineDbalStoreStream implements Stream, IteratorAggregate
private int|null $index;

public function __construct(
private readonly Result $result,
Result $result,
EventSerializer $eventSerializer,
HeadersSerializer $headersSerializer,
AbstractPlatform $platform,
) {
$this->result = $result;
$this->generator = $this->buildGenerator($result, $eventSerializer, $headersSerializer, $platform);
$this->position = null;
$this->index = null;
}

public function close(): void
{
$this->result->free();
$this->result?->free();

$this->result = null;
$this->generator = null;
}

public function next(): void
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

$this->generator->next();
}

public function end(): bool
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

return !$this->generator->valid();
}

public function current(): Message|null
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

return $this->generator->current() ?: null;
}

/** @return positive-int|0|null */
public function position(): int|null
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

if ($this->position === null) {
$this->generator->key();
}
Expand All @@ -71,6 +93,10 @@ public function position(): int|null
/** @return positive-int|null */
public function index(): int|null
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

if ($this->index === null) {
$this->generator->key();
}
Expand All @@ -81,6 +107,10 @@ public function index(): int|null
/** @return Traversable<Message> */
public function getIterator(): Traversable
{
if ($this->result === null || $this->generator === null) {
throw new StreamClosed();
}

return $this->generator;
}

Expand Down
15 changes: 13 additions & 2 deletions src/Store/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@ interface Stream extends Traversable
{
public function close(): void;

/** @throws StreamClosed */
public function next(): void;

/** @throws StreamClosed */
public function current(): Message|null;

/** @throws StreamClosed */
public function end(): bool;

/** @return positive-int|0|null */
/**
* @return positive-int|0|null
*
* @throws StreamClosed
*/
public function position(): int|null;

/** @return positive-int|null */
/**
* @return positive-int|null
*
* @throws StreamClosed
*/
public function index(): int|null;
}
15 changes: 15 additions & 0 deletions src/Store/StreamClosed.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

use RuntimeException;

final class StreamClosed extends RuntimeException
{
public function __construct()
{
parent::__construct('Stream is already closed.');
}
}
8 changes: 7 additions & 1 deletion tests/DbalManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
use Doctrine\DBAL\Tools\DsnParser;
use Patchlevel\EventSourcing\Console\DoctrineHelper;
use RuntimeException;
use Throwable;

use function dd;
use function getenv;
use function in_array;
use function is_string;
Expand Down Expand Up @@ -45,7 +47,11 @@ public static function createConnection(string $dbName = self::DEFAULT_DB_NAME):
$databases = $schemaManager->listDatabases();

if (in_array($dbName, $databases, true)) {
$schemaManager->dropDatabase($dbName);
try {
$schemaManager->dropDatabase($dbName);
} catch (Throwable) {
dd($connection->fetchAllAssociative("SELECT * FROM pg_stat_activity WHERE datname = '{$dbName}';"));
}
}

$schemaManager->createDatabase($dbName);
Expand Down
28 changes: 17 additions & 11 deletions tests/Integration/Store/StoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,25 @@ public function testLoad(): void

$this->store->save($message);

$stream = $this->store->load();
$stream = null;

self::assertSame(1, $stream->index());
self::assertSame(0, $stream->position());
try {
$stream = $this->store->load();

$loadedMessage = $stream->current();
self::assertSame(1, $stream->index());
self::assertSame(0, $stream->position());

self::assertInstanceOf(Message::class, $loadedMessage);
self::assertNotSame($message, $loadedMessage);
self::assertEquals($message->event(), $loadedMessage->event());
self::assertEquals($message->header(AggregateHeader::class)->aggregateId, $loadedMessage->header(AggregateHeader::class)->aggregateId);
self::assertEquals($message->header(AggregateHeader::class)->aggregateName, $loadedMessage->header(AggregateHeader::class)->aggregateName);
self::assertEquals($message->header(AggregateHeader::class)->playhead, $loadedMessage->header(AggregateHeader::class)->playhead);
self::assertEquals($message->header(AggregateHeader::class)->recordedOn, $loadedMessage->header(AggregateHeader::class)->recordedOn);
$loadedMessage = $stream->current();

self::assertInstanceOf(Message::class, $loadedMessage);
self::assertNotSame($message, $loadedMessage);
self::assertEquals($message->event(), $loadedMessage->event());
self::assertEquals($message->header(AggregateHeader::class)->aggregateId, $loadedMessage->header(AggregateHeader::class)->aggregateId);
self::assertEquals($message->header(AggregateHeader::class)->aggregateName, $loadedMessage->header(AggregateHeader::class)->aggregateName);
self::assertEquals($message->header(AggregateHeader::class)->playhead, $loadedMessage->header(AggregateHeader::class)->playhead);
self::assertEquals($message->header(AggregateHeader::class)->recordedOn, $loadedMessage->header(AggregateHeader::class)->recordedOn);
} finally {
$stream?->close();
}
}
}

0 comments on commit 9214346

Please sign in to comment.