Skip to content

Commit

Permalink
Merge pull request #556 from patchlevel/fix-stream-close
Browse files Browse the repository at this point in the history
throw stream close exception in array stream implementation
  • Loading branch information
DavidBadura authored Mar 27, 2024
2 parents c2c089e + 950a7c9 commit a985655
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
29 changes: 27 additions & 2 deletions src/Store/ArrayStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
/** @implements IteratorAggregate<Message> */
final class ArrayStream implements Stream, IteratorAggregate
{
/** @var Iterator<Message> $iterator */
private readonly Iterator $iterator;
/** @var Iterator<Message>|null $iterator */
private Iterator|null $iterator;

/** @var positive-int|0|null */
private int|null $position;
Expand All @@ -33,17 +33,26 @@ public function __construct(array $messages = [])

public function close(): void
{
$this->iterator = null;
}

/** @return Traversable<Message> */
public function getIterator(): Traversable
{
if ($this->iterator === null) {
throw new StreamClosed();
}

return $this->iterator;
}

/** @return positive-int|0|null */
public function position(): int|null
{
if ($this->iterator === null) {
throw new StreamClosed();
}

if ($this->position === null) {
$this->iterator->key();
}
Expand All @@ -58,6 +67,10 @@ public function position(): int|null
*/
public function index(): int|null
{
if ($this->iterator === null) {
throw new StreamClosed();
}

if ($this->index === null) {
$this->iterator->key();
}
Expand All @@ -67,16 +80,28 @@ public function index(): int|null

public function next(): void
{
if ($this->iterator === null) {
throw new StreamClosed();
}

$this->iterator->next();
}

public function end(): bool
{
if ($this->iterator === null) {
throw new StreamClosed();
}

return !$this->iterator->valid();
}

public function current(): Message|null
{
if ($this->iterator === null) {
throw new StreamClosed();
}

return $this->iterator->current() ?: null;
}

Expand Down
3 changes: 2 additions & 1 deletion src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ function (array $subscriptions) use ($limit): void {
}
}
} finally {
$endIndex = $stream?->index();
$stream?->close();

if ($messageCounter > 0) {
Expand Down Expand Up @@ -381,7 +382,7 @@ function (array $subscriptions) use ($limit): void {
$this->logger?->info(
sprintf(
'Subscription Engine: End of stream on position "%d" has been reached, finish processing.',
$stream->index() ?: 'unknown',
$endIndex ?: 'unknown',
),
);
},
Expand Down
9 changes: 4 additions & 5 deletions tests/Unit/Store/ArrayStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\ArrayStream;
use Patchlevel\EventSourcing\Store\StreamClosed;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId;
Expand Down Expand Up @@ -139,6 +140,8 @@ public function testWithNoList(): void

public function testClose(): void
{
$this->expectException(StreamClosed::class);

$message = Message::create(
new ProfileCreated(
ProfileId::fromString('foo'),
Expand All @@ -156,10 +159,6 @@ public function testClose(): void
self::assertSame(false, $stream->end());

$stream->close();

self::assertSame(1, $stream->index());
self::assertSame(0, $stream->position());
self::assertSame($message, $stream->current());
self::assertSame(false, $stream->end());
$stream->index();
}
}
Loading

0 comments on commit a985655

Please sign in to comment.