From 8a1b9e4c8e2e9e5ab7b902a1a75620eaceee275f Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 24 May 2024 10:24:08 +0200 Subject: [PATCH] fix: Fix NPE when Kafka Connect requests a commit right after an error --- .../main/java/io/questdb/kafka/QuestDBSinkTask.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index d8c51ae..612eb53 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -467,6 +467,18 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { return false; } + @Override + public Map preCommit(Map currentOffsets) { + if (sender != null) { + flush(currentOffsets); + return currentOffsets; + } else { + // null sender indicates there was an error and we cannot guarantee that the data was actually sent + // returning empty map will cause the task to avoid committing offsets to Kafka + return Collections.emptyMap(); + } + } + @Override public void flush(Map map) { if (httpTransport) {