Skip to content

Commit

Permalink
RemoteFetcherTierStatemachine logging
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Jan 8, 2024
1 parent b2df14b commit 0b50546
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public void run() {

log.debug("Polling consumer to receive remote log metadata topic records");
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs));
if (hasAssignmentChanged) {
log.warn("The assignment has changed. Update processedAssignmentOfUserTopicIdPartitions records");
maybeWaitForPartitionAssignments();
}
for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
processConsumerRecord(record);
}
Expand All @@ -157,7 +161,7 @@ private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value());
log.error("Received remote log metadata: {} from partition: {} with offset: {}",
remoteLogMetadata, record.partition(), record.offset());
if (shouldProcess(record.partition(), record.offset())) {
if (shouldProcess(remoteLogMetadata, record.partition(), record.offset())) {
log.error("Processing remote log metadata: {} from partition: {} with offset: {}",
remoteLogMetadata, record.partition(), record.offset());
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
Expand All @@ -177,12 +181,13 @@ private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
// }
}

private boolean shouldProcess(final int recordPartition, final long recordOffset) {
private boolean shouldProcess(final RemoteLogMetadata metadata, final int recordPartition, final long recordOffset) {
final TopicIdPartition tpId = metadata.topicIdPartition();
final Long readOffset = readOffsetsByMetadataPartition.get(recordPartition);
log.error("Checking if the event should be processed. Read offset: {} and record offset: {} and record partition {} ",
readOffset, recordOffset, recordPartition);
log.error("Checking if the event {} should be processed. Read offset: {} and record offset: {} and record partition {} ",
metadata, readOffset, recordOffset, recordPartition);
// log.error("processedAssignmentOfUserTopicIdPartitions does not contain {}: {}",tpId, processedAssignmentOfUserTopicIdPartitions);
return (readOffset == null || readOffset < recordOffset);
return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && (readOffset == null || readOffset < recordOffset);
}

private void maybeMarkUserPartitionsAsReady() {
Expand Down Expand Up @@ -284,7 +289,7 @@ void maybeWaitForPartitionAssignments() throws InterruptedException {
processedAssignmentPartitions.add(utp.topicIdPartition);
});
log.error("Processed assignment partitions: {}", processedAssignmentPartitions);
// processedAssignmentOfUserTopicIdPartitions = new HashSet<>(processedAssignmentPartitions);
processedAssignmentOfUserTopicIdPartitions = new HashSet<>(processedAssignmentPartitions);
clearResourcesForUnassignedUserTopicPartitions(processedAssignmentPartitions);
isAllUserTopicPartitionsInitialized = false;
uninitializedAt = time.milliseconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ public void testCanProcessRecord() throws InterruptedException {
// shouldn't read tpId2 records because it's not assigned
addRecord(consumer, metadataPartition, tpId2, 3);
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)), "Couldn't read record");
assertEquals(4, handler.metadataCounter);
assertEquals(3, handler.metadataCounter);

addRecord(consumer, metadataPartition, tpId1, 4);
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)), "Couldn't read record");
assertEquals(5, handler.metadataCounter);
assertEquals(4, handler.metadataCounter);
}

@Test
Expand Down

0 comments on commit 0b50546

Please sign in to comment.