Skip to content

Commit

Permalink
RemoteFetcherTierStatemachine logging
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Jan 8, 2024
1 parent 3df4601 commit 5921104
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,12 @@ private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer",
remoteLogMetadata);
}
log.error("Updating consumed offset: {} for partition {}", record.offset(), record.partition());
readOffsetsByMetadataPartition.put(record.partition(), record.offset());
if(!readOffsetsByMetadataPartition.containsKey(record.partition()) ||
readOffsetsByMetadataPartition.get(record.partition()) == record.offset() -1) {
log.error("Updating consumed offset: {} for partition {} previously read offsets {}",
record.offset(), record.partition(), readOffsetsByMetadataPartition.get(record.partition()));
readOffsetsByMetadataPartition.put(record.partition(), record.offset());
}
}

private boolean shouldProcess(final RemoteLogMetadata metadata, final long recordOffset) {
Expand Down Expand Up @@ -259,8 +263,8 @@ void maybeWaitForPartitionAssignments() throws InterruptedException {
remoteLogPartitions.stream()
.filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
readOffsetsByMetadataPartition.containsKey(tp.partition()))
.peek(tp -> log.error("Reading from the offset where the processing left last time for partition: {}",
tp.partition()))
.peek(tp -> log.error("Reading from the offset {} where the processing left last time for partition: {}",
readOffsetsByMetadataPartition.get(tp.partition()), tp.partition()))
.forEach(tp -> consumer.seek(tp, readOffsetsByMetadataPartition.get(tp.partition())));
Set<TopicIdPartition> processedAssignmentPartitions = new HashSet<>();
// mark all the user-topic-partitions as assigned to the consumer.
Expand Down

0 comments on commit 5921104

Please sign in to comment.