diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index bd4fa48..1fe8d56 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -248,6 +248,66 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) { Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value())); } + @Test + public void testDeadLetterQueue_unsupportedType() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("errors.deadletterqueue.topic.name", "dlq"); + props.put("errors.deadletterqueue.topic.replication.factor", "1"); + props.put("errors.tolerance", "all"); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // contains array - not supported + String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":[1, 2, 3]}"; + + connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + connect.kafka().produce(topicName, "key", badObjectString); + connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}"); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from " + topicName, + httpPort); + + ConsumerRecords fetchedRecords = connect.kafka().consume(1, 120_000, "dlq"); + Assertions.assertEquals(1, fetchedRecords.count()); + Iterator> iterator = fetchedRecords.iterator(); + Assertions.assertEquals(badObjectString, new String(iterator.next().value())); + } + + @Test + public void testDeadLetterQueue_emptyTable() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}"); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + props.put("value.converter.schemas.enable", "false"); + props.put("errors.deadletterqueue.topic.name", "dlq"); + props.put("errors.deadletterqueue.topic.replication.factor", "1"); + props.put("errors.tolerance", "all"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + connect.kafka().produce(topicName, "tab", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + String emptyRecordValue = "{\"firstname\":\"empty\",\"lastname\":\"\",\"age\":-41}"; + connect.kafka().produce(topicName, "", emptyRecordValue); + connect.kafka().produce(topicName, "tab", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}"); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from tab", + httpPort); + + ConsumerRecords fetchedRecords = connect.kafka().consume(1, 120_000, "dlq"); + Assertions.assertEquals(1, fetchedRecords.count()); + Iterator> iterator = fetchedRecords.iterator(); + Assertions.assertEquals(emptyRecordValue, new String(iterator.next().value())); + } + @Test public void testDeadLetterQueue_badColumnType() { connect.kafka().createTopic(topicName, 1);