From 9b60c71eaf267b9dc404a80dbdc3d78f62d20e8c Mon Sep 17 00:00:00 2001 From: Gianluca Finocchiaro Date: Sat, 14 Dec 2024 12:02:15 +0100 Subject: [PATCH] Avoid getting updates for non-routable items (#30) --- .../processor/RecordConsumerSupport.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/consumers/processor/RecordConsumerSupport.java b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/consumers/processor/RecordConsumerSupport.java index 0e4caa67..47c4ebdd 100644 --- a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/consumers/processor/RecordConsumerSupport.java +++ b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/consumers/processor/RecordConsumerSupport.java @@ -229,13 +229,17 @@ public void process(ConsumerRecord record) throws ValueException { log.atDebug().log(() -> "Mapped Kafka record"); Set routables = mappedRecord.route(subscribedItems); - - log.atDebug().log(() -> "Filtering updates"); - Map updates = mappedRecord.fieldsMap(); - - for (SubscribedItem sub : routables) { - log.atDebug().log(() -> "Sending updates: %s".formatted(updates)); - listener.smartUpdate(sub.itemHandle(), updates, false); + if (routables.size() > 0) { + log.atDebug().log(() -> "Filtering updates"); + Map updates = mappedRecord.fieldsMap(); + + log.atInfo().log("Routing record to {} items", routables.size()); + for (SubscribedItem sub : routables) { + log.atDebug().log(() -> "Sending updates: %s".formatted(updates)); + listener.smartUpdate(sub.itemHandle(), updates, false); + } + } else { + log.atInfo().log("No routable items found"); } } }