diff --git a/src/Console/Command/ShowAggregateCommand.php b/src/Console/Command/ShowAggregateCommand.php index 4025b29da..a4bdc6cbf 100644 --- a/src/Console/Command/ShowAggregateCommand.php +++ b/src/Console/Command/ShowAggregateCommand.php @@ -79,6 +79,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int $console->message($this->serializer, $message); } + $stream->close(); + if ($hasMessage) { return 0; } diff --git a/src/Console/Command/ShowCommand.php b/src/Console/Command/ShowCommand.php index 87409d7ef..3c42ac613 100644 --- a/src/Console/Command/ShowCommand.php +++ b/src/Console/Command/ShowCommand.php @@ -91,6 +91,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int ); } while ($continue); + $stream->close(); + return 0; } } diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index 091fefa77..696219ef3 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -113,26 +113,32 @@ public function boot( $this->logger?->debug(sprintf('event stream is processed from position %s', $currentPosition)); - $criteria = (new CriteriaBuilder())->fromIndex($currentPosition)->build(); - $stream = $this->streamableMessageStore->load($criteria); + $stream = null; - $messageCounter = 0; + try { + $criteria = (new CriteriaBuilder())->fromIndex($currentPosition)->build(); + $stream = $this->streamableMessageStore->load($criteria); - foreach ($stream as $message) { - foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) { - $this->handleMessage($message, $projection, $throwByError); - } + $messageCounter = 0; + + foreach ($stream as $message) { + foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) { + $this->handleMessage($message, $projection, $throwByError); + } - $currentPosition++; - $messageCounter++; + $currentPosition++; + $messageCounter++; - $this->logger?->info(sprintf('current event stream position: %s', $currentPosition)); + $this->logger?->info(sprintf('current event stream position: %s', $currentPosition)); - if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info('message limit reached, finish'); + if ($limit !== null && $messageCounter >= $limit) { + $this->logger?->info('message limit reached, finish'); - return; + return; + } } + } finally { + $stream?->close(); } $this->logger?->info('end of stream has been reached'); @@ -184,38 +190,44 @@ public function run( $this->logger?->debug(sprintf('event stream is processed from position %s', $currentPosition)); - $criteria = (new CriteriaBuilder())->fromIndex($currentPosition)->build(); - $stream = $this->streamableMessageStore->load($criteria); + $stream = null; - $messageCounter = 0; + try { + $criteria = (new CriteriaBuilder())->fromIndex($currentPosition)->build(); + $stream = $this->streamableMessageStore->load($criteria); - foreach ($stream as $message) { - foreach ($projections->filterByProjectionStatus(ProjectionStatus::Active) as $projection) { - if ($projection->position() > $currentPosition) { - $this->logger?->debug( - sprintf( - 'projection "%s" is farther than the current position (%s) and will be skipped', - $projection->id()->toString(), - $projection->position(), - ), - ); + $messageCounter = 0; - continue; - } + foreach ($stream as $message) { + foreach ($projections->filterByProjectionStatus(ProjectionStatus::Active) as $projection) { + if ($projection->position() > $currentPosition) { + $this->logger?->debug( + sprintf( + 'projection "%s" is farther than the current position (%s) and will be skipped', + $projection->id()->toString(), + $projection->position(), + ), + ); - $this->handleMessage($message, $projection, $throwByError); - } + continue; + } + + $this->handleMessage($message, $projection, $throwByError); + } - $currentPosition++; - $messageCounter++; + $currentPosition++; + $messageCounter++; - $this->logger?->info(sprintf('current event stream position: %s', $currentPosition)); + $this->logger?->info(sprintf('current event stream position: %s', $currentPosition)); - if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info('message limit reached, finish'); + if ($limit !== null && $messageCounter >= $limit) { + $this->logger?->info('message limit reached, finish'); - return; + return; + } } + } finally { + $stream?->close(); } $this->logger?->debug('end of stream has been reached, finish'); diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index 41348ebd0..af8f65864 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -89,21 +89,27 @@ public function load(string $id): AggregateRoot ->archived(false) ->build(); - $stream = $this->store->load($criteria); + $stream = null; - $firstMessage = $stream->current(); + try { + $stream = $this->store->load($criteria); - if ($firstMessage === null) { - throw new AggregateNotFound($this->metadata->className, $id); - } + $firstMessage = $stream->current(); - $aggregate = $this->metadata->className::createFromEvents( - $this->unpack($stream), - $firstMessage->playhead() - 1, - ); + if ($firstMessage === null) { + throw new AggregateNotFound($this->metadata->className, $id); + } - if ($this->snapshotStore && $this->metadata->snapshot) { - $this->saveSnapshot($aggregate, $stream); + $aggregate = $this->metadata->className::createFromEvents( + $this->unpack($stream), + $firstMessage->playhead() - 1, + ); + + if ($this->snapshotStore && $this->metadata->snapshot) { + $this->saveSnapshot($aggregate, $stream->position()); + } + } finally { + $stream?->close(); } $this->aggregateIsValid[$aggregate] = true; @@ -200,21 +206,27 @@ private function loadFromSnapshot(string $aggregateClass, string $id): Aggregate ->fromPlayhead($aggregate->playhead()) ->build(); - $stream = $this->store->load($criteria); + $stream = null; - if ($stream->current() === null) { - $this->aggregateIsValid[$aggregate] = true; + try { + $stream = $this->store->load($criteria); - return $aggregate; - } + if ($stream->current() === null) { + $this->aggregateIsValid[$aggregate] = true; - try { - $aggregate->catchUp($this->unpack($stream)); - } catch (Throwable $exception) { - throw new SnapshotRebuildFailed($aggregateClass, $id, $exception); - } + return $aggregate; + } - $this->saveSnapshot($aggregate, $stream); + try { + $aggregate->catchUp($this->unpack($stream)); + } catch (Throwable $exception) { + throw new SnapshotRebuildFailed($aggregateClass, $id, $exception); + } + + $this->saveSnapshot($aggregate, $stream->position()); + } finally { + $stream?->close(); + } $this->aggregateIsValid[$aggregate] = true; @@ -222,18 +234,16 @@ private function loadFromSnapshot(string $aggregateClass, string $id): Aggregate } /** @param T $aggregate */ - private function saveSnapshot(AggregateRoot $aggregate, Stream $stream): void + private function saveSnapshot(AggregateRoot $aggregate, int|null $streamPosition): void { assert($this->snapshotStore instanceof SnapshotStore); - $position = $stream->position(); - - if ($position === null) { + if ($streamPosition === null) { return; } $batchSize = $this->metadata->snapshot?->batch ?: 1; - $count = $position + 1; + $count = $streamPosition + 1; if ($count < $batchSize) { return;