Skip to content

Commit

Permalink
tests added
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed Nov 26, 2024
1 parent 10a1d50 commit bf450b6
Showing 1 changed file with 60 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,66 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value()));
}

@Test
public void testDeadLetterQueue_unsupportedType() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

// contains array - not supported
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":[1, 2, 3]}";

connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "key", badObjectString);
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName,
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_emptyTable() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
props.put("value.converter.schemas.enable", "false");
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

connect.kafka().produce(topicName, "tab", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
String emptyRecordValue = "{\"firstname\":\"empty\",\"lastname\":\"\",\"age\":-41}";
connect.kafka().produce(topicName, "", emptyRecordValue);
connect.kafka().produce(topicName, "tab", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from tab",
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(emptyRecordValue, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_badColumnType() {
connect.kafka().createTopic(topicName, 1);
Expand Down

0 comments on commit bf450b6

Please sign in to comment.