Skip to content

Commit

Permalink
Merge pull request #421 from patchlevel/close-stream
Browse files Browse the repository at this point in the history
close stream
  • Loading branch information
DavidBadura authored Oct 13, 2023
2 parents 2bffd7a + d171e42 commit 0923b9d
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 63 deletions.
2 changes: 2 additions & 0 deletions src/Console/Command/ShowAggregateCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$console->message($this->serializer, $message);
}

$stream->close();

if ($hasMessage) {
return 0;
}
Expand Down
2 changes: 2 additions & 0 deletions src/Console/Command/ShowCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
);
} while ($continue);

$stream->close();

return 0;
}
}
84 changes: 48 additions & 36 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,26 +113,32 @@ public function boot(

$this->logger?->debug(sprintf('event stream is processed from position %s', $currentPosition));

$criteria = (new CriteriaBuilder())->fromIndex($currentPosition)->build();
$stream = $this->streamableMessageStore->load($criteria);
$stream = null;

$messageCounter = 0;
try {
$criteria = (new CriteriaBuilder())->fromIndex($currentPosition)->build();
$stream = $this->streamableMessageStore->load($criteria);

foreach ($stream as $message) {
foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) {
$this->handleMessage($message, $projection, $throwByError);
}
$messageCounter = 0;

foreach ($stream as $message) {
foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) {
$this->handleMessage($message, $projection, $throwByError);
}

$currentPosition++;
$messageCounter++;
$currentPosition++;
$messageCounter++;

$this->logger?->info(sprintf('current event stream position: %s', $currentPosition));
$this->logger?->info(sprintf('current event stream position: %s', $currentPosition));

if ($limit !== null && $messageCounter >= $limit) {
$this->logger?->info('message limit reached, finish');
if ($limit !== null && $messageCounter >= $limit) {
$this->logger?->info('message limit reached, finish');

return;
return;
}
}
} finally {
$stream?->close();
}

$this->logger?->info('end of stream has been reached');
Expand Down Expand Up @@ -184,38 +190,44 @@ public function run(

$this->logger?->debug(sprintf('event stream is processed from position %s', $currentPosition));

$criteria = (new CriteriaBuilder())->fromIndex($currentPosition)->build();
$stream = $this->streamableMessageStore->load($criteria);
$stream = null;

$messageCounter = 0;
try {
$criteria = (new CriteriaBuilder())->fromIndex($currentPosition)->build();
$stream = $this->streamableMessageStore->load($criteria);

foreach ($stream as $message) {
foreach ($projections->filterByProjectionStatus(ProjectionStatus::Active) as $projection) {
if ($projection->position() > $currentPosition) {
$this->logger?->debug(
sprintf(
'projection "%s" is farther than the current position (%s) and will be skipped',
$projection->id()->toString(),
$projection->position(),
),
);
$messageCounter = 0;

continue;
}
foreach ($stream as $message) {
foreach ($projections->filterByProjectionStatus(ProjectionStatus::Active) as $projection) {
if ($projection->position() > $currentPosition) {
$this->logger?->debug(
sprintf(
'projection "%s" is farther than the current position (%s) and will be skipped',
$projection->id()->toString(),
$projection->position(),
),
);

$this->handleMessage($message, $projection, $throwByError);
}
continue;
}

$this->handleMessage($message, $projection, $throwByError);
}

$currentPosition++;
$messageCounter++;
$currentPosition++;
$messageCounter++;

$this->logger?->info(sprintf('current event stream position: %s', $currentPosition));
$this->logger?->info(sprintf('current event stream position: %s', $currentPosition));

if ($limit !== null && $messageCounter >= $limit) {
$this->logger?->info('message limit reached, finish');
if ($limit !== null && $messageCounter >= $limit) {
$this->logger?->info('message limit reached, finish');

return;
return;
}
}
} finally {
$stream?->close();
}

$this->logger?->debug('end of stream has been reached, finish');
Expand Down
64 changes: 37 additions & 27 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,27 @@ public function load(string $id): AggregateRoot
->archived(false)
->build();

$stream = $this->store->load($criteria);
$stream = null;

$firstMessage = $stream->current();
try {
$stream = $this->store->load($criteria);

if ($firstMessage === null) {
throw new AggregateNotFound($this->metadata->className, $id);
}
$firstMessage = $stream->current();

$aggregate = $this->metadata->className::createFromEvents(
$this->unpack($stream),
$firstMessage->playhead() - 1,
);
if ($firstMessage === null) {
throw new AggregateNotFound($this->metadata->className, $id);
}

if ($this->snapshotStore && $this->metadata->snapshot) {
$this->saveSnapshot($aggregate, $stream);
$aggregate = $this->metadata->className::createFromEvents(
$this->unpack($stream),
$firstMessage->playhead() - 1,
);

if ($this->snapshotStore && $this->metadata->snapshot) {
$this->saveSnapshot($aggregate, $stream->position());
}
} finally {
$stream?->close();
}

$this->aggregateIsValid[$aggregate] = true;
Expand Down Expand Up @@ -200,40 +206,44 @@ private function loadFromSnapshot(string $aggregateClass, string $id): Aggregate
->fromPlayhead($aggregate->playhead())
->build();

$stream = $this->store->load($criteria);
$stream = null;

if ($stream->current() === null) {
$this->aggregateIsValid[$aggregate] = true;
try {
$stream = $this->store->load($criteria);

return $aggregate;
}
if ($stream->current() === null) {
$this->aggregateIsValid[$aggregate] = true;

try {
$aggregate->catchUp($this->unpack($stream));
} catch (Throwable $exception) {
throw new SnapshotRebuildFailed($aggregateClass, $id, $exception);
}
return $aggregate;
}

$this->saveSnapshot($aggregate, $stream);
try {
$aggregate->catchUp($this->unpack($stream));
} catch (Throwable $exception) {
throw new SnapshotRebuildFailed($aggregateClass, $id, $exception);
}

$this->saveSnapshot($aggregate, $stream->position());
} finally {
$stream?->close();
}

$this->aggregateIsValid[$aggregate] = true;

return $aggregate;
}

/** @param T $aggregate */
private function saveSnapshot(AggregateRoot $aggregate, Stream $stream): void
private function saveSnapshot(AggregateRoot $aggregate, int|null $streamPosition): void
{
assert($this->snapshotStore instanceof SnapshotStore);

$position = $stream->position();

if ($position === null) {
if ($streamPosition === null) {
return;
}

$batchSize = $this->metadata->snapshot?->batch ?: 1;
$count = $position + 1;
$count = $streamPosition + 1;

if ($count < $batchSize) {
return;
Expand Down

0 comments on commit 0923b9d

Please sign in to comment.