diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index ef88738..2ef25ff 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -247,7 +247,7 @@ private void onTcpSenderException(Exception e) { private void onHttpSenderException(Exception e) { closeSenderSilently(); - if (e.getMessage().contains("failed to parse line protocol")) { // hack to detect data parsing errors + if (e.getMessage() != null && e.getMessage().contains("failed to parse line protocol") || e.getMessage().contains("cast error")) { // hack to detect data parsing errors // ok, we have a parsing error, let's try to send records one by one to find the problematic record // and we will report it to the error handler. the rest of the records will make it to QuestDB sender = createSender(); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 40748df..91c287c 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -265,18 +265,31 @@ public void testDeadLetterQueue_badColumnType() { httpPort, QuestDBUtils.Endpoint.EXEC); - connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"); - connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}"); - - ConsumerRecords fetchedRecords = connect.kafka().consume(1, 60_000, "dlq"); - Assertions.assertEquals(1, fetchedRecords.count()); - ConsumerRecord dqlRecord = fetchedRecords.iterator().next(); - Assertions.assertEquals("{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}", new String(dqlRecord.value())); - - QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" - + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName, - 1000, httpPort); + String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"; + String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}"; + String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}"; + String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}"; + String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}"; + + // interleave good and bad records + connect.kafka().produce(topicName, "key", goodRecordA); + connect.kafka().produce(topicName, "key", badRecordA); + connect.kafka().produce(topicName, "key", goodRecordB); + connect.kafka().produce(topicName, "key", badRecordB); + connect.kafka().produce(topicName, "key", goodRecordC); + + ConsumerRecords fetchedRecords = connect.kafka().consume(2, 60_000, "dlq"); + Assertions.assertEquals(2, fetchedRecords.count()); + Iterator> iterator = fetchedRecords.iterator(); + Assertions.assertEquals(badRecordA, new String(iterator.next().value())); + Assertions.assertEquals(badRecordB, new String(iterator.next().value())); + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n" + + "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d041\r\n" + + "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d042\r\n" + + "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d043\r\n", + "select firstname,lastname,age, id from " + topicName, + httpPort); }