Skip to content

Commit

Permalink
fix: Fix NPE when Kafka Connect requests a commit right after an error
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed May 24, 2024
1 parent 8985184 commit 8a1b9e4
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,18 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
return false;
}

@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
if (sender != null) {
flush(currentOffsets);
return currentOffsets;
} else {
// null sender indicates there was an error and we cannot guarantee that the data was actually sent
// returning empty map will cause the task to avoid committing offsets to Kafka
return Collections.emptyMap();
}
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
if (httpTransport) {
Expand Down

0 comments on commit 8a1b9e4

Please sign in to comment.