diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php index 17e8a4001..52a334fb0 100644 --- a/src/Projection/Projectionist/DefaultProjectionist.php +++ b/src/Projection/Projectionist/DefaultProjectionist.php @@ -43,6 +43,10 @@ public function boot( int|null $limit = null, bool $throwByError = false, ): void { + $this->logger?->info( + 'Projectionist: Start booting.', + ); + $projections = $this->projections() ->filter(static fn (Projection $projection) => $projection->isNew() || $projection->isBooting()) ->filterByCriteria($criteria); @@ -58,7 +62,7 @@ public function boot( $this->projectionStore->save($projection); $this->logger?->info(sprintf( - 'projector "%s" for "%s" has been set to booting', + 'Projectionist: New Projector "%s" for "%s" was found and is now booting.', $projector::class, $projection->id()->toString(), )); @@ -66,8 +70,8 @@ public function boot( $setupMethod = $this->projectorResolver->resolveSetupMethod($projector); if (!$setupMethod) { - $this->logger?->info(sprintf( - 'projector "%s" for "%s" has no "setup" method', + $this->logger?->debug(sprintf( + 'Projectionist: Projector "%s" for "%s" has no setup method, continue.', $projector::class, $projection->id()->toString(), )); @@ -77,14 +81,14 @@ public function boot( try { $setupMethod(); - $this->logger?->info(sprintf( - 'projector "%s" for "%s" prepared', + $this->logger?->debug(sprintf( + 'Projectionist: For Projector "%s" for "%s" the setup method has been executed and is now prepared for data.', $projector::class, $projection->id()->toString(), )); } catch (Throwable $e) { $this->logger?->error(sprintf( - 'preparing error in "%s" for "%s": %s', + 'Projectionist: Projector "%s" for "%s" has an error in the setup method: %s', $projector::class, $projection->id()->toString(), $e->getMessage(), @@ -107,14 +111,19 @@ public function boot( $projections = $projections->filterByProjectionStatus(ProjectionStatus::Booting); if ($projections->count() === 0) { - $this->logger?->info('no projections to process, finish'); + $this->logger?->info('Projectionist: No projections in booting status, finish booting.'); return; } $currentPosition = $projections->getLowestProjectionPosition(); - $this->logger?->debug(sprintf('event stream is processed from position %s', $currentPosition)); + $this->logger?->debug( + sprintf( + 'Projectionist: Event stream is processed for booting from position %s.', + $currentPosition, + ), + ); $stream = null; @@ -132,10 +141,20 @@ public function boot( $currentPosition++; $messageCounter++; - $this->logger?->info(sprintf('current event stream position: %s', $currentPosition)); + $this->logger?->debug( + sprintf( + 'Projectionist: Current event stream position for booting: %s', + $currentPosition, + ), + ); if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info('message limit reached, finish'); + $this->logger?->info( + sprintf( + 'Projectionist: Message limit (%d) reached, finish booting.', + $limit, + ), + ); return; } @@ -144,19 +163,19 @@ public function boot( $stream?->close(); } - $this->logger?->info('end of stream has been reached'); + $this->logger?->debug('Projectionist: End of stream for booting has been reached.'); foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) { $projection->active(); $this->projectionStore->save($projection); $this->logger?->info(sprintf( - 'projection "%s" has been set to active', + 'Projectionist: Projection "%s" has been set to active after booting.', $projection->id()->toString(), )); } - $this->logger?->info('finish'); + $this->logger?->info('Projectionist: Finish booting.'); } public function run( @@ -164,6 +183,8 @@ public function run( int|null $limit = null, bool $throwByError = false, ): void { + $this->logger?->info('Projectionist: Start processing.'); + $projections = $this->projections()->filterByCriteria($criteria); $this->handleOutdatedProjections($projections); @@ -172,14 +193,19 @@ public function run( $projections = $projections->filterByProjectionStatus(ProjectionStatus::Active); if ($projections->count() === 0) { - $this->logger?->info('no projections to process, finish'); + $this->logger?->info('Projectionist: No projections to process, finish processing.'); return; } $currentPosition = $projections->getLowestProjectionPosition(); - $this->logger?->debug(sprintf('event stream is processed from position %s', $currentPosition)); + $this->logger?->debug( + sprintf( + 'Projectionist: Event stream is processed from position %d.', + $currentPosition, + ), + ); $stream = null; @@ -194,9 +220,10 @@ public function run( if ($projection->position() > $currentPosition) { $this->logger?->debug( sprintf( - 'projection "%s" is farther than the current position (%s) and will be skipped', + 'Projectionist: Projection "%s" is farther than the current position (%d > %d), continue processing.', $projection->id()->toString(), $projection->position(), + $currentPosition, ), ); @@ -209,10 +236,15 @@ public function run( $currentPosition++; $messageCounter++; - $this->logger?->info(sprintf('current event stream position: %s', $currentPosition)); + $this->logger?->debug(sprintf('Projectionist: Current event stream position: %s', $currentPosition)); if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info('message limit reached, finish'); + $this->logger?->info( + sprintf( + 'Projectionist: Message limit (%d) reached, finish processing.', + $limit, + ), + ); return; } @@ -221,11 +253,18 @@ public function run( $stream?->close(); } - $this->logger?->debug('end of stream has been reached, finish'); + $this->logger?->info( + sprintf( + 'Projectionist: End of stream on position "%d" has been reached, finish processing.', + $currentPosition, + ), + ); } public function teardown(ProjectionCriteria $criteria = new ProjectionCriteria()): void { + $this->logger?->info('Projectionist: Start teardown outdated projections.'); + $projections = $this ->projections() ->filterByProjectionStatus(ProjectionStatus::Outdated) @@ -236,7 +275,10 @@ public function teardown(ProjectionCriteria $criteria = new ProjectionCriteria() if (!$projector) { $this->logger?->warning( - sprintf('projector for "%s" not found, skipped', $projection->id()->toString()), + sprintf( + 'Projectionist: Projector for "%s" to teardown not found, skipped.', + $projection->id()->toString(), + ), ); continue; @@ -244,24 +286,51 @@ public function teardown(ProjectionCriteria $criteria = new ProjectionCriteria() $teardownMethod = $this->projectorResolver->resolveTeardownMethod($projector); - if ($teardownMethod) { - try { - $teardownMethod(); - } catch (Throwable $e) { - $this->logger?->error( - sprintf('projection for "%s" could not be removed, skipped', $projection->id()->toString()), - ); - $this->logger?->error($e->getMessage()); - continue; - } + if (!$teardownMethod) { + $this->projectionStore->remove($projection->id()); + + $this->logger?->info( + sprintf( + 'Projectionist: Projector "%s" for "%s" has no teardown method and was immediately removed.', + $projector::class, + $projection->id()->toString(), + ), + ); + + continue; + } + + try { + $teardownMethod(); + + $this->logger?->debug(sprintf( + 'Projectionist: For Projector "%s" for "%s" the teardown method has been executed and is now prepared to be removed.', + $projector::class, + $projection->id()->toString(), + )); + } catch (Throwable $e) { + $this->logger?->error( + sprintf( + 'Projectionist: Projection "%s" for "%s" has an error in the teardown method, skipped: %s', + $projector::class, + $projection->id()->toString(), + $e->getMessage(), + ), + ); + continue; } $this->projectionStore->remove($projection->id()); $this->logger?->info( - sprintf('projection for "%s" removed', $projection->id()->toString()), + sprintf( + 'Projectionist: Projection "%s" removed.', + $projection->id()->toString(), + ), ); } + + $this->logger?->info('Projectionist: Finish teardown.'); } public function remove(ProjectionCriteria $criteria = new ProjectionCriteria()): void @@ -275,7 +344,7 @@ public function remove(ProjectionCriteria $criteria = new ProjectionCriteria()): $this->projectionStore->remove($projection->id()); $this->logger?->info( - sprintf('projection "%s" removed without a suitable projector', $projection->id()->toString()), + sprintf('Projectionist: Projection "%s" removed without a suitable projector.', $projection->id()->toString()), ); continue; @@ -287,7 +356,7 @@ public function remove(ProjectionCriteria $criteria = new ProjectionCriteria()): $this->projectionStore->remove($projection->id()); $this->logger?->info( - sprintf('projection "%s" removed', $projection->id()->toString()), + sprintf('Projectionist: Projection "%s" removed.', $projection->id()->toString()), ); continue; @@ -298,17 +367,17 @@ public function remove(ProjectionCriteria $criteria = new ProjectionCriteria()): } catch (Throwable $e) { $this->logger?->error( sprintf( - 'projector "%s" teardown method could not be executed:', + 'Projectionist: Projector "%s" teardown method could not be executed: %s', $projector::class, + $e->getMessage(), ), ); - $this->logger?->error($e->getMessage()); } $this->projectionStore->remove($projection->id()); $this->logger?->info( - sprintf('projection "%s" removed', $projection->id()->toString()), + sprintf('Projectionist: Projection "%s" removed.', $projection->id()->toString()), ); } } @@ -324,8 +393,8 @@ public function reactivate(ProjectionCriteria $criteria = new ProjectionCriteria $projector = $this->projector($projection->id()); if (!$projector) { - $this->logger?->info( - sprintf('projector for "%s" not found, skipped', $projection->id()->toString()), + $this->logger?->debug( + sprintf('Projectionist: Projector for "%s" not found, skipped.', $projection->id()->toString()), ); continue; @@ -335,7 +404,7 @@ public function reactivate(ProjectionCriteria $criteria = new ProjectionCriteria $this->projectionStore->save($projection); $this->logger?->info(sprintf( - 'projector "%s" for "%s" is reactivated', + 'Projectionist: Projector "%s" for "%s" is reactivated.', $projector::class, $projection->id()->toString(), )); @@ -375,26 +444,27 @@ private function handleMessage(Message $message, Projection $projection, bool $t $projection->incrementPosition(); $this->projectionStore->save($projection); - return; - } - - try { - $subscribeMethod($message); - $this->logger?->debug( sprintf( - 'projector "%s" for "%s" processed the event "%s"', + 'Projectionist: Projector "%s" for "%s" has no subscribe method for "%s", continue.', $projector::class, $projection->id()->toString(), $message->event()::class, ), ); + + return; + } + + try { + $subscribeMethod($message); } catch (Throwable $e) { $this->logger?->error( sprintf( - 'projector "%s" for "%s" could not process the event: %s', + 'Projectionist: Projector "%s" for "%s" could not process the event "%s": %s', $projector::class, $projection->id()->toString(), + $message->event()::class, $e->getMessage(), ), ); @@ -417,6 +487,15 @@ private function handleMessage(Message $message, Projection $projection, bool $t $projection->incrementPosition(); $projection->resetRetry(); $this->projectionStore->save($projection); + + $this->logger?->debug( + sprintf( + 'Projectionist: Projector "%s" for "%s" processed the event "%s".', + $projector::class, + $projection->id()->toString(), + $message->event()::class, + ), + ); } private function projector(ProjectionId $projectorId): object|null @@ -462,7 +541,12 @@ private function handleOutdatedProjections(ProjectionCollection $projections): v $projection->outdated(); $this->projectionStore->save($projection); - $this->logger?->info(sprintf('projection "%s" has been marked as outdated', $projection->id()->toString())); + $this->logger?->info( + sprintf( + 'Projectionist: Projector for "%s" not found and has been marked as outdated.', + $projection->id()->toString(), + ), + ); } } @@ -476,7 +560,14 @@ private function handleRetryProjections(ProjectionCollection $projections): void $projection->active(); $this->projectionStore->save($projection); - $this->logger?->info(sprintf('retry projection "%s"', $projection->id()->toString())); + $this->logger?->info( + sprintf( + 'Projectionist: Retry projection "%s" (%d/%d) and set to active.', + $projection->id()->toString(), + $projection->retry(), + self::RETRY_LIMIT, + ), + ); } } } diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index eacdfa70f..4d7f4304c 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -63,13 +63,31 @@ public function load(AggregateRootId $id): AggregateRoot { if ($this->snapshotStore && $this->metadata->snapshot) { try { - return $this->loadFromSnapshot($this->metadata->className, $id); + $aggregate = $this->loadFromSnapshot($this->metadata->className, $id); + + $this->logger->debug( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" loaded from snapshot.', + $this->metadata->className, + $id->toString(), + ), + ); + + return $aggregate; } catch (SnapshotRebuildFailed $exception) { + $this->logger->error( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" could not be rebuild from snapshot.', + $this->metadata->className, + $id->toString(), + ), + ); + $this->logger->error($exception->getMessage()); } catch (SnapshotNotFound) { $this->logger->debug( sprintf( - 'snapshot for aggregate "%s" with the id "%s" not found', + 'Repository: Snapshot for aggregate "%s" with the id "%s" not found.', $this->metadata->className, $id->toString(), ), @@ -77,7 +95,7 @@ public function load(AggregateRootId $id): AggregateRoot } catch (SnapshotVersionInvalid) { $this->logger->debug( sprintf( - 'snapshot for aggregate "%s" with the id "%s" is invalid', + 'Repository: Snapshot for aggregate "%s" with the id "%s" is invalid.', $this->metadata->className, $id->toString(), ), @@ -99,6 +117,14 @@ public function load(AggregateRootId $id): AggregateRoot $firstMessage = $stream->current(); if ($firstMessage === null) { + $this->logger->debug( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" not found.', + $this->metadata->className, + $id->toString(), + ), + ); + throw new AggregateNotFound($this->metadata->className, $id); } @@ -116,6 +142,14 @@ public function load(AggregateRootId $id): AggregateRoot $this->aggregateIsValid[$aggregate] = true; + $this->logger->debug( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" loaded from store.', + $this->metadata->className, + $id->toString(), + ), + ); + return $aggregate; } @@ -146,10 +180,28 @@ public function save(AggregateRoot $aggregate): void $newAggregate = $playhead === 0; if (!isset($this->aggregateIsValid[$aggregate]) && !$newAggregate) { + $this->logger->error( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" is unknown.', + $this->metadata->className, + $aggregate->aggregateRootId()->toString(), + ), + ); + throw new AggregateUnknown($aggregate::class, $aggregate->aggregateRootId()); } if ($playhead < 0) { + $this->logger->error( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" has a playhead mismatch. Expected "%d" but got "%d".', + $this->metadata->className, + $aggregate->aggregateRootId()->toString(), + $aggregate->playhead(), + $eventCount, + ), + ); + throw new PlayheadMismatch( $aggregate::class, $aggregate->aggregateRootId(), @@ -183,9 +235,25 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator, $this->store->save(...$messages); } catch (UniqueConstraintViolation) { if ($newAggregate) { + $this->logger->error( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" already exists.', + $aggregate::class, + $aggregate->aggregateRootId()->toString(), + ), + ); + throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId()); } + $this->logger->error( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" is outdated.', + $aggregate::class, + $aggregate->aggregateRootId()->toString(), + ), + ); + throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId()); } @@ -194,6 +262,14 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator, }); $this->aggregateIsValid[$aggregate] = true; + + $this->logger->debug( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" saved.', + $this->metadata->className, + $aggregate->aggregateRootId()->toString(), + ), + ); } catch (Throwable $exception) { $this->aggregateIsValid[$aggregate] = false; @@ -261,6 +337,14 @@ private function saveSnapshot(AggregateRoot $aggregate, int|null $streamPosition return; } + $this->logger->debug( + sprintf( + 'Repository: Save snapshot for aggregate "%s" with the id "%s".', + $this->metadata->className, + $aggregate->aggregateRootId()->toString(), + ), + ); + $this->snapshotStore->save($aggregate); } @@ -300,6 +384,15 @@ private function archive(Message ...$messages): void $lastMessageWithNewStreamStart->aggregateId(), $lastMessageWithNewStreamStart->playhead(), ); + + $this->logger->debug( + sprintf( + 'Repository: Archive messages for aggregate "%s" with the id "%s" until playhead "%d".', + $lastMessageWithNewStreamStart->aggregateClass(), + $lastMessageWithNewStreamStart->aggregateId(), + $lastMessageWithNewStreamStart->playhead(), + ), + ); } /** @return Traversable */