Skip to content

Commit

Permalink
compatibility with Kafka older than 2.6
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed Sep 9, 2024
1 parent f40da2d commit 1950c00
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
10 changes: 9 additions & 1 deletion connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,6 +51,7 @@ public final class QuestDBSinkTask extends SinkTask {
private int pendingRows;
private final FlushConfig flushConfig = new FlushConfig();
private final ObjList<SinkRecord> inflightSinkRecords = new ObjList<>();
private ErrantRecordReporter reporter;

@Override
public String version() {
Expand Down Expand Up @@ -88,6 +90,12 @@ public void start(Map<String, String> map) {
this.allowedLag = config.getAllowedLag();
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
this.recordToTable = Templating.newTableTableFn(config.getTable());
try {
reporter = context.errantRecordReporter();
} catch (NoSuchMethodError | NoClassDefFoundError e) {
// Kafka older than 2.6
reporter = null;
}
}

private Sender createRawSender() {
Expand Down Expand Up @@ -247,7 +255,7 @@ private void onTcpSenderException(Exception e) {

private void onHttpSenderException(Exception e) {
closeSenderSilently();
if (e.getMessage() != null && e.getMessage().contains("error in line")) { // hack to detect data parsing errors
if (reporter != null & e.getMessage() != null && e.getMessage().contains("error in line")) { // hack to detect data parsing errors
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
// and we will report it to the error handler. the rest of the records will make it to QuestDB
sender = createSender();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,36 @@ public void testDeadLetterQueue_badColumnType() {

}

@Test
public void testbadColumnType_noDLQ() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

QuestDBUtils.assertSql(
"{\"ddl\":\"OK\"}",
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
httpPort,
QuestDBUtils.Endpoint.EXEC);

String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";

// interleave good and bad records
connect.kafka().produce(topicName, "key", goodRecordA);
connect.kafka().produce(topicName, "key", badRecordA);
connect.kafka().produce(topicName, "key", goodRecordB);
connect.kafka().produce(topicName, "key", badRecordB);
connect.kafka().produce(topicName, "key", goodRecordC);

ConnectTestUtils.assertConnectorTaskStateEventually(connect, AbstractStatus.State.FAILED);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSymbol(boolean useHttp) {
Expand Down

0 comments on commit 1950c00

Please sign in to comment.