Skip to content

Commit

Permalink
improve projectionist logging
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jan 9, 2024
1 parent e75f320 commit 637d533
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 31 deletions.
72 changes: 41 additions & 31 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public function boot(
$this->projectionStore->save($projection);

$this->logger?->info(sprintf(
'projector "%s" for "%s" has been set to booting',
'Projectionist: Projector "%s" for "%s" has been set to booting.',
$projector::class,
$projection->id()->toString(),
));
Expand All @@ -67,7 +67,7 @@ public function boot(

if (!$setupMethod) {
$this->logger?->info(sprintf(
'projector "%s" for "%s" has no "setup" method',
'Projectionist: Projector "%s" for "%s" has no "setup" method.',
$projector::class,
$projection->id()->toString(),
));
Expand All @@ -78,13 +78,13 @@ public function boot(
try {
$setupMethod();
$this->logger?->info(sprintf(
'projector "%s" for "%s" prepared',
'Projectionist: Projector "%s" for "%s" prepared.',
$projector::class,
$projection->id()->toString(),
));
} catch (Throwable $e) {
$this->logger?->error(sprintf(
'preparing error in "%s" for "%s": %s',
'Projectionist: Preparing error in "%s" for "%s": %s',
$projector::class,
$projection->id()->toString(),
$e->getMessage(),
Expand All @@ -107,14 +107,14 @@ public function boot(
$projections = $projections->filterByProjectionStatus(ProjectionStatus::Booting);

if ($projections->count() === 0) {
$this->logger?->info('no projections to process, finish');
$this->logger?->info('Projectionist: No projections to process, finish.');

return;
}

$currentPosition = $projections->getLowestProjectionPosition();

$this->logger?->debug(sprintf('event stream is processed from position %s', $currentPosition));
$this->logger?->debug(sprintf('Projectionist: Event stream is processed from position %s', $currentPosition));

$stream = null;

Expand All @@ -132,10 +132,10 @@ public function boot(
$currentPosition++;
$messageCounter++;

$this->logger?->info(sprintf('current event stream position: %s', $currentPosition));
$this->logger?->info(sprintf('Projectionist: Current event stream position: %s', $currentPosition));

if ($limit !== null && $messageCounter >= $limit) {
$this->logger?->info('message limit reached, finish');
$this->logger?->info('Projectionist: Message limit reached, finish.');

return;
}
Expand All @@ -144,19 +144,19 @@ public function boot(
$stream?->close();
}

$this->logger?->info('end of stream has been reached');
$this->logger?->info('Projectionist: End of stream has been reached.');

foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) {
$projection->active();
$this->projectionStore->save($projection);

$this->logger?->info(sprintf(
'projection "%s" has been set to active',
'Projectionist: Projection "%s" has been set to active.',
$projection->id()->toString(),
));
}

$this->logger?->info('finish');
$this->logger?->info('Projectionist: Finish.');
}

public function run(
Expand All @@ -172,14 +172,14 @@ public function run(
$projections = $projections->filterByProjectionStatus(ProjectionStatus::Active);

if ($projections->count() === 0) {
$this->logger?->info('no projections to process, finish');
$this->logger?->info('Projectionist: No projections to process, finish.');

return;
}

$currentPosition = $projections->getLowestProjectionPosition();

$this->logger?->debug(sprintf('event stream is processed from position %s', $currentPosition));
$this->logger?->debug(sprintf('Projectionist: Event stream is processed from position %s.', $currentPosition));

$stream = null;

Expand All @@ -194,7 +194,7 @@ public function run(
if ($projection->position() > $currentPosition) {
$this->logger?->debug(
sprintf(
'projection "%s" is farther than the current position (%s) and will be skipped',
'Projectionist: Projection "%s" is farther than the current position (%s) and will be skipped.',
$projection->id()->toString(),
$projection->position(),
),
Expand All @@ -209,10 +209,10 @@ public function run(
$currentPosition++;
$messageCounter++;

$this->logger?->info(sprintf('current event stream position: %s', $currentPosition));
$this->logger?->info(sprintf('Projectionist: Current event stream position: %s', $currentPosition));

if ($limit !== null && $messageCounter >= $limit) {
$this->logger?->info('message limit reached, finish');
$this->logger?->info('Projectionist: Message limit reached, finish.');

return;
}
Expand All @@ -221,7 +221,7 @@ public function run(
$stream?->close();
}

$this->logger?->debug('end of stream has been reached, finish');
$this->logger?->debug('Projectionist: End of stream has been reached, finish.');
}

public function teardown(ProjectionCriteria $criteria = new ProjectionCriteria()): void
Expand All @@ -236,7 +236,7 @@ public function teardown(ProjectionCriteria $criteria = new ProjectionCriteria()

if (!$projector) {
$this->logger?->warning(
sprintf('projector for "%s" not found, skipped', $projection->id()->toString()),
sprintf('Projectionist: Projector for "%s" not found, skipped.', $projection->id()->toString()),
);

continue;
Expand All @@ -249,7 +249,7 @@ public function teardown(ProjectionCriteria $criteria = new ProjectionCriteria()
$teardownMethod();
} catch (Throwable $e) {
$this->logger?->error(
sprintf('projection for "%s" could not be removed, skipped', $projection->id()->toString()),
sprintf('Projectionist: Projection for "%s" could not be removed, skipped.', $projection->id()->toString()),
);
$this->logger?->error($e->getMessage());
continue;
Expand All @@ -259,7 +259,7 @@ public function teardown(ProjectionCriteria $criteria = new ProjectionCriteria()
$this->projectionStore->remove($projection->id());

$this->logger?->info(
sprintf('projection for "%s" removed', $projection->id()->toString()),
sprintf('Projectionist: Projection for "%s" removed.', $projection->id()->toString()),
);
}
}
Expand All @@ -275,7 +275,7 @@ public function remove(ProjectionCriteria $criteria = new ProjectionCriteria()):
$this->projectionStore->remove($projection->id());

$this->logger?->info(
sprintf('projection "%s" removed without a suitable projector', $projection->id()->toString()),
sprintf('Projectionist: Projection "%s" removed without a suitable projector.', $projection->id()->toString()),
);

continue;
Expand All @@ -287,7 +287,7 @@ public function remove(ProjectionCriteria $criteria = new ProjectionCriteria()):
$this->projectionStore->remove($projection->id());

$this->logger?->info(
sprintf('projection "%s" removed', $projection->id()->toString()),
sprintf('Projectionist: Projection "%s" removed.', $projection->id()->toString()),
);

continue;
Expand All @@ -298,17 +298,17 @@ public function remove(ProjectionCriteria $criteria = new ProjectionCriteria()):
} catch (Throwable $e) {
$this->logger?->error(
sprintf(
'projector "%s" teardown method could not be executed:',
'Projectionist: Projector "%s" teardown method could not be executed: %s',
$projector::class,
$e->getMessage(),
),
);
$this->logger?->error($e->getMessage());
}

$this->projectionStore->remove($projection->id());

$this->logger?->info(
sprintf('projection "%s" removed', $projection->id()->toString()),
sprintf('Projectionist: Projection "%s" removed.', $projection->id()->toString()),
);
}
}
Expand All @@ -325,7 +325,7 @@ public function reactivate(ProjectionCriteria $criteria = new ProjectionCriteria

if (!$projector) {
$this->logger?->info(
sprintf('projector for "%s" not found, skipped', $projection->id()->toString()),
sprintf('Projectionist: Projector for "%s" not found, skipped.', $projection->id()->toString()),
);

continue;
Expand All @@ -335,7 +335,7 @@ public function reactivate(ProjectionCriteria $criteria = new ProjectionCriteria
$this->projectionStore->save($projection);

$this->logger?->info(sprintf(
'projector "%s" for "%s" is reactivated',
'Projectionist: Projector "%s" for "%s" is reactivated.',
$projector::class,
$projection->id()->toString(),
));
Expand Down Expand Up @@ -383,7 +383,7 @@ private function handleMessage(Message $message, Projection $projection, bool $t

$this->logger?->debug(
sprintf(
'projector "%s" for "%s" processed the event "%s"',
'Projectionist: Projector "%s" for "%s" processed the event "%s".',
$projector::class,
$projection->id()->toString(),
$message->event()::class,
Expand All @@ -392,7 +392,7 @@ private function handleMessage(Message $message, Projection $projection, bool $t
} catch (Throwable $e) {
$this->logger?->error(
sprintf(
'projector "%s" for "%s" could not process the event: %s',
'Projectionist: Projector "%s" for "%s" could not process the event: %s',
$projector::class,
$projection->id()->toString(),
$e->getMessage(),
Expand Down Expand Up @@ -462,7 +462,12 @@ private function handleOutdatedProjections(ProjectionCollection $projections): v
$projection->outdated();
$this->projectionStore->save($projection);

$this->logger?->info(sprintf('projection "%s" has been marked as outdated', $projection->id()->toString()));
$this->logger?->info(
sprintf(
'Projectionist: Projection "%s" has been marked as outdated.',
$projection->id()->toString(),
),
);
}
}

Expand All @@ -476,7 +481,12 @@ private function handleRetryProjections(ProjectionCollection $projections): void
$projection->active();
$this->projectionStore->save($projection);

$this->logger?->info(sprintf('retry projection "%s"', $projection->id()->toString()));
$this->logger?->info(
sprintf(
'Projectionist: Retry projection "%s".',
$projection->id()->toString(),
),
);
}
}
}
51 changes: 51 additions & 0 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,28 @@ public function save(AggregateRoot $aggregate): void
$newAggregate = $playhead === 0;

if (!isset($this->aggregateIsValid[$aggregate]) && !$newAggregate) {
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" is unknown.',
$this->metadata->className,
$aggregate->aggregateRootId()->toString(),
),
);

throw new AggregateUnknown($aggregate::class, $aggregate->aggregateRootId());
}

if ($playhead < 0) {
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" has a playhead mismatch. Expected "%d" but got "%d".',
$this->metadata->className,
$aggregate->aggregateRootId()->toString(),
$aggregate->playhead(),
$eventCount,
),
);

throw new PlayheadMismatch(
$aggregate::class,
$aggregate->aggregateRootId(),
Expand Down Expand Up @@ -217,9 +235,25 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator,
$this->store->save(...$messages);
} catch (UniqueConstraintViolation) {
if ($newAggregate) {
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" already exists.',
$aggregate::class,
$aggregate->aggregateRootId()->toString(),
),
);

throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId());
}

$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" is outdated.',
$aggregate::class,
$aggregate->aggregateRootId()->toString(),
),
);

throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId());
}

Expand All @@ -228,6 +262,14 @@ static function (object $event) use ($aggregate, &$playhead, $messageDecorator,
});

$this->aggregateIsValid[$aggregate] = true;

$this->logger->debug(
sprintf(
'Repository: Aggregate "%s" with the id "%s" saved.',
$this->metadata->className,
$aggregate->aggregateRootId()->toString(),
),
);
} catch (Throwable $exception) {
$this->aggregateIsValid[$aggregate] = false;

Expand Down Expand Up @@ -342,6 +384,15 @@ private function archive(Message ...$messages): void
$lastMessageWithNewStreamStart->aggregateId(),
$lastMessageWithNewStreamStart->playhead(),
);

$this->logger->debug(
sprintf(
'Repository: Archive messages for aggregate "%s" with the id "%s" until playhead "%d".',
$lastMessageWithNewStreamStart->aggregateClass(),
$lastMessageWithNewStreamStart->aggregateId(),
$lastMessageWithNewStreamStart->playhead(),
),
);
}

/** @return Traversable<object> */
Expand Down

0 comments on commit 637d533

Please sign in to comment.