diff --git a/src/Store/ArrayStream.php b/src/Store/ArrayStream.php index 4fd62dce7..2dab74282 100644 --- a/src/Store/ArrayStream.php +++ b/src/Store/ArrayStream.php @@ -14,8 +14,8 @@ /** @implements IteratorAggregate */ final class ArrayStream implements Stream, IteratorAggregate { - /** @var Iterator $iterator */ - private readonly Iterator $iterator; + /** @var Iterator|null $iterator */ + private Iterator|null $iterator; /** @var positive-int|0|null */ private int|null $position; @@ -33,17 +33,26 @@ public function __construct(array $messages = []) public function close(): void { + $this->iterator = null; } /** @return Traversable */ public function getIterator(): Traversable { + if ($this->iterator === null) { + throw new StreamClosed(); + } + return $this->iterator; } /** @return positive-int|0|null */ public function position(): int|null { + if ($this->iterator === null) { + throw new StreamClosed(); + } + if ($this->position === null) { $this->iterator->key(); } @@ -58,6 +67,10 @@ public function position(): int|null */ public function index(): int|null { + if ($this->iterator === null) { + throw new StreamClosed(); + } + if ($this->index === null) { $this->iterator->key(); } @@ -67,16 +80,28 @@ public function index(): int|null public function next(): void { + if ($this->iterator === null) { + throw new StreamClosed(); + } + $this->iterator->next(); } public function end(): bool { + if ($this->iterator === null) { + throw new StreamClosed(); + } + return !$this->iterator->valid(); } public function current(): Message|null { + if ($this->iterator === null) { + throw new StreamClosed(); + } + return $this->iterator->current() ?: null; } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 8cb1b160b..dcf5166d4 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -347,6 +347,7 @@ function (array $subscriptions) use ($limit): void { } } } finally { + $endIndex = $stream?->index(); $stream?->close(); if ($messageCounter > 0) { @@ -381,7 +382,7 @@ function (array $subscriptions) use ($limit): void { $this->logger?->info( sprintf( 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', - $stream->index() ?: 'unknown', + $endIndex ?: 'unknown', ), ); }, diff --git a/tests/Unit/Store/ArrayStreamTest.php b/tests/Unit/Store/ArrayStreamTest.php index 5bab74a72..e6921a6e4 100644 --- a/tests/Unit/Store/ArrayStreamTest.php +++ b/tests/Unit/Store/ArrayStreamTest.php @@ -6,6 +6,7 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Store\ArrayStream; +use Patchlevel\EventSourcing\Store\StreamClosed; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; @@ -139,6 +140,8 @@ public function testWithNoList(): void public function testClose(): void { + $this->expectException(StreamClosed::class); + $message = Message::create( new ProfileCreated( ProfileId::fromString('foo'), @@ -156,10 +159,6 @@ public function testClose(): void self::assertSame(false, $stream->end()); $stream->close(); - - self::assertSame(1, $stream->index()); - self::assertSame(0, $stream->position()); - self::assertSame($message, $stream->current()); - self::assertSame(false, $stream->end()); + $stream->index(); } } diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index e447795eb..e6ce5068c 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -33,6 +33,7 @@ use PHPUnit\Framework\TestCase; use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; +use Psr\Log\NullLogger; use RuntimeException; /** @covers \Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine */ @@ -51,6 +52,7 @@ public function testNothingToSetup(): void $streamableStore->reveal(), $store, new MetadataSubscriberAccessorRepository([]), + logger: new NullLogger(), ); $engine->setup(); @@ -77,6 +79,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->setup(); @@ -125,6 +128,7 @@ public function create(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->setup(); @@ -180,6 +184,7 @@ public function create(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->setup(); @@ -228,6 +233,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->setup(null, true); @@ -267,6 +273,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->setup(); @@ -305,6 +312,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->setup(); @@ -331,6 +339,7 @@ public function testNothingToBoot(): void $streamableStore->reveal(), $store, new MetadataSubscriberAccessorRepository([]), + logger: new NullLogger(), ); $engine->boot(); @@ -355,6 +364,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->boot(); @@ -403,6 +413,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->boot(); @@ -461,6 +472,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->boot(new SubscriptionEngineCriteria(), 1); @@ -531,6 +543,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber1, $subscriber2]), + logger: new NullLogger(), ); $engine->boot(); @@ -604,6 +617,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->boot(); @@ -660,6 +674,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->boot(); @@ -698,6 +713,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->run(); @@ -744,6 +760,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->run(); @@ -797,6 +814,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->run(new SubscriptionEngineCriteria(), 1); @@ -865,6 +883,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber1, $subscriber2]), + logger: new NullLogger(), ); $engine->run(); @@ -925,6 +944,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->run(); @@ -968,6 +988,7 @@ public function testRunningMarkOutdated(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([]), + logger: new NullLogger(), ); $engine->run(); @@ -1003,6 +1024,7 @@ public function testRunningWithoutActiveSubscribers(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([]), + logger: new NullLogger(), ); $engine->run(); @@ -1044,6 +1066,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->run(); @@ -1093,6 +1116,7 @@ public function handle(Message $message): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->run(); @@ -1131,6 +1155,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->teardown(); @@ -1167,6 +1192,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->teardown(); @@ -1205,6 +1231,7 @@ public function drop(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->teardown(); @@ -1244,6 +1271,7 @@ public function drop(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->teardown(); @@ -1271,6 +1299,7 @@ public function testTeardownWithoutSubscriber(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([]), + logger: new NullLogger(), ); $engine->teardown(); @@ -1293,6 +1322,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->remove(); @@ -1335,6 +1365,7 @@ public function drop(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->remove(); @@ -1365,6 +1396,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->remove(); @@ -1401,6 +1433,7 @@ public function drop(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->remove(); @@ -1427,6 +1460,7 @@ public function testRemoveWithoutSubscriber(): void $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([]), + logger: new NullLogger(), ); $engine->remove(); @@ -1449,6 +1483,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->reactivate(); @@ -1487,6 +1522,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->reactivate(); @@ -1524,6 +1560,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->reactivate(); @@ -1560,6 +1597,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->reactivate(); @@ -1596,6 +1634,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->reactivate(); @@ -1624,6 +1663,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->pause(); @@ -1660,6 +1700,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->pause(); @@ -1696,6 +1737,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->pause(); @@ -1734,6 +1776,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->pause(); @@ -1764,6 +1807,7 @@ class { $streamableStore->reveal(), $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $subscriptions = $engine->subscriptions(); @@ -1823,6 +1867,7 @@ public function subscribe(): void $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), $retryStrategy->reveal(), + new NullLogger(), ); $engine->run(); @@ -1875,6 +1920,7 @@ class { $subscriptionStore, new MetadataSubscriberAccessorRepository([$subscriber]), $retryStrategy->reveal(), + new NullLogger(), ); $engine->run(); @@ -1909,6 +1955,7 @@ class { $streamableStore->reveal(), $subscriptionStore->reveal(), new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engineCriteria = new SubscriptionEngineCriteria( @@ -1949,6 +1996,7 @@ class { $streamableStore->reveal(), $subscriptionStore->reveal(), new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), ); $engine->{$method}();