From 0b505461c4b36a06c861706d896bc6a534dd111a Mon Sep 17 00:00:00 2001 From: Anatolii Popov Date: Mon, 8 Jan 2024 19:19:09 +0200 Subject: [PATCH] RemoteFetcherTierStatemachine logging --- .../remote/metadata/storage/ConsumerTask.java | 17 +++++++++++------ .../metadata/storage/ConsumerTaskTest.java | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java index 084cf657661ae..a683749e9f62f 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java @@ -131,6 +131,10 @@ public void run() { log.debug("Polling consumer to receive remote log metadata topic records"); final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); + if (hasAssignmentChanged) { + log.warn("The assignment has changed. Update processedAssignmentOfUserTopicIdPartitions records"); + maybeWaitForPartitionAssignments(); + } for (ConsumerRecord record : consumerRecords) { processConsumerRecord(record); } @@ -157,7 +161,7 @@ private void processConsumerRecord(ConsumerRecord record) { final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value()); log.error("Received remote log metadata: {} from partition: {} with offset: {}", remoteLogMetadata, record.partition(), record.offset()); - if (shouldProcess(record.partition(), record.offset())) { + if (shouldProcess(remoteLogMetadata, record.partition(), record.offset())) { log.error("Processing remote log metadata: {} from partition: {} with offset: {}", remoteLogMetadata, record.partition(), record.offset()); remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata); @@ -177,12 +181,13 @@ private void processConsumerRecord(ConsumerRecord record) { // } } - private boolean shouldProcess(final int recordPartition, final long recordOffset) { + private boolean shouldProcess(final RemoteLogMetadata metadata, final int recordPartition, final long recordOffset) { + final TopicIdPartition tpId = metadata.topicIdPartition(); final Long readOffset = readOffsetsByMetadataPartition.get(recordPartition); - log.error("Checking if the event should be processed. Read offset: {} and record offset: {} and record partition {} ", - readOffset, recordOffset, recordPartition); + log.error("Checking if the event {} should be processed. Read offset: {} and record offset: {} and record partition {} ", + metadata, readOffset, recordOffset, recordPartition); // log.error("processedAssignmentOfUserTopicIdPartitions does not contain {}: {}",tpId, processedAssignmentOfUserTopicIdPartitions); - return (readOffset == null || readOffset < recordOffset); + return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && (readOffset == null || readOffset < recordOffset); } private void maybeMarkUserPartitionsAsReady() { @@ -284,7 +289,7 @@ void maybeWaitForPartitionAssignments() throws InterruptedException { processedAssignmentPartitions.add(utp.topicIdPartition); }); log.error("Processed assignment partitions: {}", processedAssignmentPartitions); -// processedAssignmentOfUserTopicIdPartitions = new HashSet<>(processedAssignmentPartitions); + processedAssignmentOfUserTopicIdPartitions = new HashSet<>(processedAssignmentPartitions); clearResourcesForUnassignedUserTopicPartitions(processedAssignmentPartitions); isAllUserTopicPartitionsInitialized = false; uninitializedAt = time.milliseconds(); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java index 0b2250586f3d2..0d2c6b896b3b0 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java @@ -251,11 +251,11 @@ public void testCanProcessRecord() throws InterruptedException { // shouldn't read tpId2 records because it's not assigned addRecord(consumer, metadataPartition, tpId2, 3); TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)), "Couldn't read record"); - assertEquals(4, handler.metadataCounter); + assertEquals(3, handler.metadataCounter); addRecord(consumer, metadataPartition, tpId1, 4); TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)), "Couldn't read record"); - assertEquals(5, handler.metadataCounter); + assertEquals(4, handler.metadataCounter); } @Test