Skip to content

Commit

Permalink
RemoteFetcherTierStatemachine logging
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Jan 5, 2024
1 parent 815a323 commit c7eae32
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void run() {
maybeWaitForPartitionAssignments();
}

log.trace("Polling consumer to receive remote log metadata topic records");
log.debug("Polling consumer to receive remote log metadata topic records");
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs));
for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
processConsumerRecord(record);
Expand All @@ -153,11 +153,15 @@ public void run() {

private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value());
log.error("Received remote log metadata: {} from partition: {} with offset: {}",
remoteLogMetadata, record.partition(), record.offset());
if (shouldProcess(remoteLogMetadata, record.offset())) {
log.error("Processing remote log metadata: {} from partition: {} with offset: {}",
remoteLogMetadata, record.partition(), record.offset());
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset());
} else {
log.trace("The event {} is skipped because it is either already processed or not assigned to this consumer",
log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer",
remoteLogMetadata);
}
log.trace("Updating consumed offset: {} for partition {}", record.offset(), record.partition());
Expand Down Expand Up @@ -254,6 +258,7 @@ void maybeWaitForPartitionAssignments() throws InterruptedException {
if (!utp.isAssigned) {
// Note that there can be a race between `remove` and `add` partition assignment. Calling the
// `maybeLoadPartition` here again to be sure that the partition gets loaded on the handler.
log.error("Loading partition {} from maybeWaitForPartitionAssignments", utp.topicIdPartition);
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
utp.isAssigned = true;
}
Expand Down Expand Up @@ -463,4 +468,4 @@ public String toString() {
'}';
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,11 @@ public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
if (!initialized.get()) {
// If it is not yet initialized, then keep them as pending partitions and assign them
// when it is initialized successfully in initializeResources().
log.error("Pending assign partitions {}", pendingAssignPartitions);
this.pendingAssignPartitions.addAll(allPartitions);
log.error("added more partitions to pending {} .", allPartitions);
} else {
log.error("Assigning partitions {} while topic-based RLMM is already initialized.", allPartitions);
assignPartitions(allPartitions);
}
} finally {
Expand Down Expand Up @@ -426,6 +429,8 @@ private void initializeResources() {
}

if (!pendingAssignPartitions.isEmpty()) {
log.error("Pending assign partitions {} found while initializing topic-based RLMM resources.",
pendingAssignPartitions);
assignPartitions(pendingAssignPartitions);
pendingAssignPartitions.clear();
}
Expand Down

0 comments on commit c7eae32

Please sign in to comment.