From df946bbbb3bbb5e0f792a9eb2462495846571356 Mon Sep 17 00:00:00 2001 From: Anatolii Popov Date: Fri, 5 Jan 2024 11:11:19 +0200 Subject: [PATCH] RemoteFetcherTierStatemachine logging --- .../java/kafka/server/ReplicaFetcherTierStateMachine.java | 1 + core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala | 2 ++ .../metadata/storage/RemotePartitionMetadataStore.java | 5 +++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java index 0f0c5c7bebcda..ca96f7bd50a16 100644 --- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java +++ b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java @@ -97,6 +97,7 @@ public PartitionFetchState start(TopicPartition topicPartition, log.error("Starting the tier state machine for partition: {} with currentFetchState: {} and fetchPartitionData: {}", topicPartition, currentFetchState, fetchPartitionData); + log.error("the leader is: {}", leader); OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); log.error("Fetched the earliest local offset for partition: {} with epochAndLeaderLocalStartOffset: {}", topicPartition, epochAndLeaderLocalStartOffset); int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala index cb42a612d1969..09c2310044468 100644 --- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala @@ -132,7 +132,9 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = { val partition = replicaManager.getPartitionOrException(topicPartition) val localLogStartOffset = partition.localLogOrException.localLogStartOffset() + error(s"Fetching earliest local offset for $topicPartition returned $localLogStartOffset and partition $partition") val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(localLogStartOffset) + error(s"Got epoch $epoch from ${partition.localLogOrException.leaderEpochCache}") new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0)) } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java index f9394eee99f36..8e4d281af2f9f 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java @@ -151,7 +151,8 @@ private FileBasedRemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartiti if (remoteLogMetadataCache == null) { throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition); } - + log.error("remoteLogMetadataCache.isInitialized(): {}", remoteLogMetadataCache.isInitialized()); + log.error("remoteLogMetadataCache: {}", remoteLogMetadataCache); if (!remoteLogMetadataCache.isInitialized()) { // Throwing a retriable ReplicaNotAvailableException here for clients retry. We can introduce a new more // appropriate exception with a KIP in the future. @@ -196,7 +197,7 @@ public void maybeLoadPartition(TopicIdPartition partition) { @Override public void markInitialized(TopicIdPartition partition) { idToRemoteLogMetadataCache.get(partition).markInitialized(); - log.trace("Remote log components are initialized for user-partition: {}", partition); + log.error("Remote log components are initialized for user-partition: {}", partition); } @Override