diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index d8c51ae..612eb53 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -467,6 +467,18 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { return false; } + @Override + public Map preCommit(Map currentOffsets) { + if (sender != null) { + flush(currentOffsets); + return currentOffsets; + } else { + // null sender indicates there was an error and we cannot guarantee that the data was actually sent + // returning empty map will cause the task to avoid committing offsets to Kafka + return Collections.emptyMap(); + } + } + @Override public void flush(Map map) { if (httpTransport) {