Skip to content

Commit

Permalink
RemoteFetcherTierStatemachine logging
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Jan 5, 2024
1 parent 020b330 commit df946bb
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public PartitionFetchState start(TopicPartition topicPartition,

log.error("Starting the tier state machine for partition: {} with currentFetchState: {} and fetchPartitionData: {}",
topicPartition, currentFetchState, fetchPartitionData);
log.error("the leader is: {}", leader);
OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
log.error("Fetched the earliest local offset for partition: {} with epochAndLeaderLocalStartOffset: {}", topicPartition, epochAndLeaderLocalStartOffset);
int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val localLogStartOffset = partition.localLogOrException.localLogStartOffset()
error(s"Fetching earliest local offset for $topicPartition returned $localLogStartOffset and partition $partition")
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(localLogStartOffset)
error(s"Got epoch $epoch from ${partition.localLogOrException.leaderEpochCache}")
new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private FileBasedRemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartiti
if (remoteLogMetadataCache == null) {
throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition);
}

log.error("remoteLogMetadataCache.isInitialized(): {}", remoteLogMetadataCache.isInitialized());
log.error("remoteLogMetadataCache: {}", remoteLogMetadataCache);
if (!remoteLogMetadataCache.isInitialized()) {
// Throwing a retriable ReplicaNotAvailableException here for clients retry. We can introduce a new more
// appropriate exception with a KIP in the future.
Expand Down Expand Up @@ -196,7 +197,7 @@ public void maybeLoadPartition(TopicIdPartition partition) {
@Override
public void markInitialized(TopicIdPartition partition) {
idToRemoteLogMetadataCache.get(partition).markInitialized();
log.trace("Remote log components are initialized for user-partition: {}", partition);
log.error("Remote log components are initialized for user-partition: {}", partition);
}

@Override
Expand Down

0 comments on commit df946bb

Please sign in to comment.