From 7bb855b824b3c8b33c51649b69eca1f8ba7bc732 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 6 Sep 2024 15:20:03 +0200 Subject: [PATCH 1/8] fix: records with data type mismatching goes to DLQ instead of failing the connector fixes #26 this is a first impl. it could be optimized further, but I assume bad data are rare and it already does the job as it is. there is a potential issue with `inflightSinkRecords` referencing all in-flight data. in theory, this can +- double memory consumption. but we need the original SinkRecord so we can send them to DLQ. --- .../io/questdb/kafka/QuestDBSinkTask.java | 47 +++++++++++++++++-- .../QuestDBSinkConnectorEmbeddedTest.java | 36 +++++++++++++- 2 files changed, 76 insertions(+), 7 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index cdd3ea8..ef88738 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -4,6 +4,7 @@ import io.questdb.cutlass.http.client.HttpClientException; import io.questdb.cutlass.line.LineSenderException; import io.questdb.std.NumericException; +import io.questdb.std.ObjList; import io.questdb.std.datetime.DateFormat; import io.questdb.std.datetime.microtime.Timestamps; import io.questdb.std.datetime.millitime.DateFormatUtils; @@ -48,6 +49,7 @@ public final class QuestDBSinkTask extends SinkTask { private long nextFlushNanos; private int pendingRows; private final FlushConfig flushConfig = new FlushConfig(); + private final ObjList inflightSinkRecords = new ObjList<>(); @Override public String version() { @@ -159,6 +161,9 @@ public void put(Collection collection) { sender = createSender(); } for (SinkRecord record : collection) { + if (httpTransport) { + inflightSinkRecords.add(record); + } handleSingleRecord(record); } @@ -208,22 +213,27 @@ public void put(Collection collection) { private void flushAndResetCounters() { log.debug("Flushing data to QuestDB"); try { - sender.flush(); + if (sender != null) { + sender.flush(); + } nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; pendingRows = 0; } catch (LineSenderException | HttpClientException e) { onSenderException(e); + } finally { + inflightSinkRecords.clear(); } } private void onSenderException(Exception e) { if (httpTransport) { - closeSenderSilently(); - nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; - pendingRows = 0; - throw new ConnectException("Failed to send data to QuestDB", e); + onHttpSenderException(e); + } else { + onTcpSenderException(e); } + } + private void onTcpSenderException(Exception e) { batchesSinceLastError = 0; if (--remainingRetries > 0) { closeSenderSilently(); @@ -235,6 +245,33 @@ private void onSenderException(Exception e) { } } + private void onHttpSenderException(Exception e) { + closeSenderSilently(); + if (e.getMessage().contains("failed to parse line protocol")) { // hack to detect data parsing errors + // ok, we have a parsing error, let's try to send records one by one to find the problematic record + // and we will report it to the error handler. the rest of the records will make it to QuestDB + sender = createSender(); + for (int i = 0; i < inflightSinkRecords.size(); i++) { + SinkRecord sinkRecord = inflightSinkRecords.get(i); + try { + handleSingleRecord(sinkRecord); + sender.flush(); + } catch (Exception ex) { + context.errantRecordReporter().report(sinkRecord, ex); + closeSenderSilently(); + sender = createSender(); + } + } + nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; + pendingRows = 0; + } else { + // ok, this is not a parsing error, let's just close the sender and rethrow the exception + nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; + pendingRows = 0; + throw new ConnectException("Failed to send data to QuestDB", e); + } + } + private void closeSenderSilently() { if (sender != null) { try { diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 84b39e8..a93cebd 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -52,7 +52,7 @@ public final class QuestDBSinkConnectorEmbeddedTest { private static int httpPort = -1; private static int ilpPort = -1; - private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:7.4.0"; + private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1"; private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true; private EmbeddedConnectCluster connect; @@ -223,7 +223,7 @@ public void testTableTemplateWithKey_schemaless(boolean useHttp) { } @ParameterizedTest - @ValueSource(booleans = {true, false}) + @ValueSource(booleans = {true/*, false*/}) public void testDeadLetterQueue_wrongJson(boolean useHttp) { connect.kafka().createTopic(topicName, 1); Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); @@ -248,6 +248,38 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) { Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value())); } + @Test + public void testDeadLetterQueue_badColumnType() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + props.put("errors.deadletterqueue.topic.name", "dlq"); + props.put("errors.deadletterqueue.topic.replication.factor", "1"); + props.put("errors.tolerance", "all"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", + "create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal", + httpPort, + QuestDBUtils.Endpoint.EXEC); + + connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"); + connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}"); + + ConsumerRecords fetchedRecords = connect.kafka().consume(1, 60_000, "dlq"); + Assertions.assertEquals(1, fetchedRecords.count()); + ConsumerRecord dqlRecord = fetchedRecords.iterator().next(); + Assertions.assertEquals("{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}", new String(dqlRecord.value())); + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n", + "select firstname,lastname,age from " + topicName, + 1000, httpPort); + + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testSymbol(boolean useHttp) { From d15b91084c5a84d562214000d550cdb943a527a6 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 6 Sep 2024 15:21:29 +0200 Subject: [PATCH 2/8] leftover --- .../java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index a93cebd..40748df 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -223,7 +223,7 @@ public void testTableTemplateWithKey_schemaless(boolean useHttp) { } @ParameterizedTest - @ValueSource(booleans = {true/*, false*/}) + @ValueSource(booleans = {true, false}) public void testDeadLetterQueue_wrongJson(boolean useHttp) { connect.kafka().createTopic(topicName, 1); Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); From ffcb17754b41af821123d02ef07735ca048fca5b Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 6 Sep 2024 19:22:56 +0200 Subject: [PATCH 3/8] catch more cases of bad data also - improved test --- .../io/questdb/kafka/QuestDBSinkTask.java | 2 +- .../QuestDBSinkConnectorEmbeddedTest.java | 37 +++++++++++++------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index ef88738..2ef25ff 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -247,7 +247,7 @@ private void onTcpSenderException(Exception e) { private void onHttpSenderException(Exception e) { closeSenderSilently(); - if (e.getMessage().contains("failed to parse line protocol")) { // hack to detect data parsing errors + if (e.getMessage() != null && e.getMessage().contains("failed to parse line protocol") || e.getMessage().contains("cast error")) { // hack to detect data parsing errors // ok, we have a parsing error, let's try to send records one by one to find the problematic record // and we will report it to the error handler. the rest of the records will make it to QuestDB sender = createSender(); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 40748df..91c287c 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -265,18 +265,31 @@ public void testDeadLetterQueue_badColumnType() { httpPort, QuestDBUtils.Endpoint.EXEC); - connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"); - connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}"); - - ConsumerRecords fetchedRecords = connect.kafka().consume(1, 60_000, "dlq"); - Assertions.assertEquals(1, fetchedRecords.count()); - ConsumerRecord dqlRecord = fetchedRecords.iterator().next(); - Assertions.assertEquals("{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}", new String(dqlRecord.value())); - - QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" - + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName, - 1000, httpPort); + String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"; + String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}"; + String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}"; + String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}"; + String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"; + + // interleave good and bad records + connect.kafka().produce(topicName, "key", goodRecordA); + connect.kafka().produce(topicName, "key", badRecordA); + connect.kafka().produce(topicName, "key", goodRecordB); + connect.kafka().produce(topicName, "key", badRecordB); + connect.kafka().produce(topicName, "key", goodRecordC); + + ConsumerRecords fetchedRecords = connect.kafka().consume(2, 60_000, "dlq"); + Assertions.assertEquals(2, fetchedRecords.count()); + Iterator> iterator = fetchedRecords.iterator(); + Assertions.assertEquals(badRecordA, new String(iterator.next().value())); + Assertions.assertEquals(badRecordB, new String(iterator.next().value())); + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n" + + "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d041\r\n" + + "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d042\r\n" + + "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d043\r\n", + "select firstname,lastname,age, id from " + topicName, + httpPort); } From f40da2dd795ef90586ba3656445122446ca5e752 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 9 Sep 2024 10:05:22 +0200 Subject: [PATCH 4/8] more general error msg pattern --- connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 2ef25ff..8d85b99 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -247,7 +247,7 @@ private void onTcpSenderException(Exception e) { private void onHttpSenderException(Exception e) { closeSenderSilently(); - if (e.getMessage() != null && e.getMessage().contains("failed to parse line protocol") || e.getMessage().contains("cast error")) { // hack to detect data parsing errors + if (e.getMessage() != null && e.getMessage().contains("error in line")) { // hack to detect data parsing errors // ok, we have a parsing error, let's try to send records one by one to find the problematic record // and we will report it to the error handler. the rest of the records will make it to QuestDB sender = createSender(); From 1950c00f7640c163088333d250714132aef5a804 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 9 Sep 2024 10:20:19 +0200 Subject: [PATCH 5/8] compatibility with Kafka older than 2.6 --- .../io/questdb/kafka/QuestDBSinkTask.java | 10 ++++++- .../QuestDBSinkConnectorEmbeddedTest.java | 30 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 8d85b99..b26c382 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -16,6 +16,7 @@ import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -50,6 +51,7 @@ public final class QuestDBSinkTask extends SinkTask { private int pendingRows; private final FlushConfig flushConfig = new FlushConfig(); private final ObjList inflightSinkRecords = new ObjList<>(); + private ErrantRecordReporter reporter; @Override public String version() { @@ -88,6 +90,12 @@ public void start(Map map) { this.allowedLag = config.getAllowedLag(); this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; this.recordToTable = Templating.newTableTableFn(config.getTable()); + try { + reporter = context.errantRecordReporter(); + } catch (NoSuchMethodError | NoClassDefFoundError e) { + // Kafka older than 2.6 + reporter = null; + } } private Sender createRawSender() { @@ -247,7 +255,7 @@ private void onTcpSenderException(Exception e) { private void onHttpSenderException(Exception e) { closeSenderSilently(); - if (e.getMessage() != null && e.getMessage().contains("error in line")) { // hack to detect data parsing errors + if (reporter != null & e.getMessage() != null && e.getMessage().contains("error in line")) { // hack to detect data parsing errors // ok, we have a parsing error, let's try to send records one by one to find the problematic record // and we will report it to the error handler. the rest of the records will make it to QuestDB sender = createSender(); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 91c287c..87ff741 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -293,6 +293,36 @@ public void testDeadLetterQueue_badColumnType() { } + @Test + public void testbadColumnType_noDLQ() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", + "create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal", + httpPort, + QuestDBUtils.Endpoint.EXEC); + + String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"; + String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}"; + String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}"; + String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}"; + String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"; + + // interleave good and bad records + connect.kafka().produce(topicName, "key", goodRecordA); + connect.kafka().produce(topicName, "key", badRecordA); + connect.kafka().produce(topicName, "key", goodRecordB); + connect.kafka().produce(topicName, "key", badRecordB); + connect.kafka().produce(topicName, "key", goodRecordC); + + ConnectTestUtils.assertConnectorTaskStateEventually(connect, AbstractStatus.State.FAILED); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testSymbol(boolean useHttp) { From 7bd5e6d67245cee712b06a846d5e4aa25310bfbb Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 9 Sep 2024 10:23:29 +0200 Subject: [PATCH 6/8] increase test timeout --- .../java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 87ff741..1d416dd 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -278,7 +278,7 @@ public void testDeadLetterQueue_badColumnType() { connect.kafka().produce(topicName, "key", badRecordB); connect.kafka().produce(topicName, "key", goodRecordC); - ConsumerRecords fetchedRecords = connect.kafka().consume(2, 60_000, "dlq"); + ConsumerRecords fetchedRecords = connect.kafka().consume(2, 120_000, "dlq"); Assertions.assertEquals(2, fetchedRecords.count()); Iterator> iterator = fetchedRecords.iterator(); Assertions.assertEquals(badRecordA, new String(iterator.next().value())); From a910647b2cdfdb29b48b362c807dcfff9d57c08c Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 9 Sep 2024 10:53:17 +0200 Subject: [PATCH 7/8] better msg pattern --- .../src/main/java/io/questdb/kafka/QuestDBSinkTask.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index b26c382..4a803fd 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -255,7 +255,10 @@ private void onTcpSenderException(Exception e) { private void onHttpSenderException(Exception e) { closeSenderSilently(); - if (reporter != null & e.getMessage() != null && e.getMessage().contains("error in line")) { // hack to detect data parsing errors + if ( + (reporter != null && e.getMessage() != null) // hack to detect data parsing errors + && (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol")) + ) { // ok, we have a parsing error, let's try to send records one by one to find the problematic record // and we will report it to the error handler. the rest of the records will make it to QuestDB sender = createSender(); From e2b7d435eac5c01b67cfe3c90a1b2af8d229de89 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 9 Sep 2024 11:12:24 +0200 Subject: [PATCH 8/8] archive snapshot artifacts --- .github/workflows/ci.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0e0a1d9..be45c34 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,4 +18,9 @@ jobs: distribution: 'temurin' cache: maven - name: Build with Maven - run: mvn -B package --file pom.xml \ No newline at end of file + run: mvn -B package --file pom.xml + - name: Archive connector artifact + uses: actions/upload-artifact@v4 + with: + name: connector-snapshot + path: connector/target/kafka-questdb-connector-*-bin.zip