diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java index 84dd0dbf21090..4f4c329bd6a4c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java @@ -215,7 +215,7 @@ protected final void handleSegmentWithDeleteSegmentStartedState(RemoteLogSegment } private void handleSegmentWithDeleteSegmentFinishedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { - log.debug("Removing the entry as it reached the terminal state: [{}]", remoteLogSegmentMetadata); + log.error("Removing the entry as it reached the terminal state: [{}]", remoteLogSegmentMetadata); doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata, (leaderEpoch, remoteLogLeaderEpochState, startOffset, segmentId) -> @@ -236,6 +236,8 @@ private void doHandleSegmentStateTransitionForLeaderEpochs(RemoteLogSegmentMetad Integer leaderEpoch = entry.getKey(); Long startOffset = entry.getValue(); // leaderEpochEntries will be empty when resorting the metadata from snapshot. + log.error("Loading leaderEpochEntries: [{}] while handling state transition for remoteLogSegmentMetadata {} with state {}", + leaderEpochEntries, remoteLogSegmentMetadata, remoteLogSegmentMetadata.state()); RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.computeIfAbsent( leaderEpoch, x -> new RemoteLogLeaderEpochState()); action.accept(leaderEpoch, remoteLogLeaderEpochState, startOffset, remoteLogSegmentId); diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java index 8e4d281af2f9f..41bc38ab2a727 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java @@ -147,12 +147,14 @@ public Iterator listRemoteLogSegments(TopicIdPartition private FileBasedRemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition) throws RemoteResourceNotFoundException { + log.error("Getting RemoteLogMetadataCache for topicIdPartition: {} from {}", topicIdPartition, idToRemoteLogMetadataCache); FileBasedRemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition); if (remoteLogMetadataCache == null) { throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition); } log.error("remoteLogMetadataCache.isInitialized(): {}", remoteLogMetadataCache.isInitialized()); - log.error("remoteLogMetadataCache: {}", remoteLogMetadataCache); + log.error("remoteLogMetadataCache epoch entries: {}", remoteLogMetadataCache.leaderEpochEntries); + log.error("remoteLogMetadataCache idToSegmentMetadata entries: {}", remoteLogMetadataCache.idToSegmentMetadata); if (!remoteLogMetadataCache.isInitialized()) { // Throwing a retriable ReplicaNotAvailableException here for clients retry. We can introduce a new more // appropriate exception with a KIP in the future.