From 5921104f71d65d58c6523b4d2d5421f087eb190b Mon Sep 17 00:00:00 2001 From: Anatolii Popov Date: Mon, 8 Jan 2024 11:38:07 +0200 Subject: [PATCH] RemoteFetcherTierStatemachine logging --- .../log/remote/metadata/storage/ConsumerTask.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java index 8ef1cea0fb937..57007352067d8 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java @@ -164,8 +164,12 @@ private void processConsumerRecord(ConsumerRecord record) { log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata); } - log.error("Updating consumed offset: {} for partition {}", record.offset(), record.partition()); - readOffsetsByMetadataPartition.put(record.partition(), record.offset()); + if(!readOffsetsByMetadataPartition.containsKey(record.partition()) || + readOffsetsByMetadataPartition.get(record.partition()) == record.offset() -1) { + log.error("Updating consumed offset: {} for partition {} previously read offsets {}", + record.offset(), record.partition(), readOffsetsByMetadataPartition.get(record.partition())); + readOffsetsByMetadataPartition.put(record.partition(), record.offset()); + } } private boolean shouldProcess(final RemoteLogMetadata metadata, final long recordOffset) { @@ -259,8 +263,8 @@ void maybeWaitForPartitionAssignments() throws InterruptedException { remoteLogPartitions.stream() .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) && readOffsetsByMetadataPartition.containsKey(tp.partition())) - .peek(tp -> log.error("Reading from the offset where the processing left last time for partition: {}", - tp.partition())) + .peek(tp -> log.error("Reading from the offset {} where the processing left last time for partition: {}", + readOffsetsByMetadataPartition.get(tp.partition()), tp.partition())) .forEach(tp -> consumer.seek(tp, readOffsetsByMetadataPartition.get(tp.partition()))); Set processedAssignmentPartitions = new HashSet<>(); // mark all the user-topic-partitions as assigned to the consumer.