Skip to content

Commit

Permalink
fix: DQL for client-side validations failures (#29)
Browse files Browse the repository at this point in the history
When an individual message fails client-side validation then we send this message to DQL (if configured)
  • Loading branch information
jerrinot authored Nov 27, 2024
1 parent af36753 commit 913a224
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 13 deletions.
18 changes: 18 additions & 0 deletions connector/src/main/java/io/questdb/kafka/BufferingSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ public Sender boolColumn(CharSequence name, boolean value) {
return this;
}

@Override
public void cancelRow() {
symbolColumnNames.clear();
symbolColumnValues.clear();
stringNames.clear();
stringValues.clear();
longNames.clear();
longValues.clear();
doubleNames.clear();
doubleValues.clear();
boolNames.clear();
boolValues.clear();
timestampNames.clear();
timestampValues.clear();

sender.cancelRow();
}

@Override
public Sender timestampColumn(CharSequence name, long value, ChronoUnit unit) {
if (symbolColumns.contains(name)) {
Expand Down
14 changes: 14 additions & 0 deletions connector/src/main/java/io/questdb/kafka/InvalidDataException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.questdb.kafka;

import io.questdb.std.NumericException;
import org.apache.kafka.connect.errors.ConnectException;

public final class InvalidDataException extends ConnectException {
public InvalidDataException(String message) {
super(message);
}

public InvalidDataException(String message, Throwable e) {
super(message, e);
}
}
53 changes: 42 additions & 11 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,22 @@ public void put(Collection<SinkRecord> collection) {
if (httpTransport) {
inflightSinkRecords.add(record);
}
handleSingleRecord(record);
try {
handleSingleRecord(record);
} catch (InvalidDataException ex) {
// data format error generated on client-side

if (httpTransport && reporter != null) {
// we have DLQ set, let's report this single object

// remove the last item from in-flight records
inflightSinkRecords.setPos(inflightSinkRecords.size() - 1);
context.errantRecordReporter().report(record, ex);
} else {
// ok, no DQL, let's error the connector
throw ex;
}
}
}

if (httpTransport) {
Expand Down Expand Up @@ -257,7 +272,7 @@ private void onTcpSenderException(Exception e) {
private void onHttpSenderException(Exception e) {
closeSenderSilently();
if (
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors originating at server-side
&& (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol"))
) {
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
Expand Down Expand Up @@ -300,16 +315,32 @@ private void handleSingleRecord(SinkRecord record) {
assert timestampColumnValue == Long.MIN_VALUE;

CharSequence tableName = recordToTable.apply(record);
sender.table(tableName);
if (tableName == null || tableName.equals("")) {
throw new InvalidDataException("Table name cannot be empty");
}

if (config.isIncludeKey()) {
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
try {
sender.table(tableName);
if (config.isIncludeKey()) {
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
}
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);
} catch (InvalidDataException ex) {
if (httpTransport) {
sender.cancelRow();
}
throw ex;
} catch (LineSenderException ex) {
if (httpTransport) {
sender.cancelRow();
}
throw new InvalidDataException("object contains invalid data", ex);
}
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);

if (kafkaTimestampsEnabled) {
timestampColumnValue = TimeUnit.MILLISECONDS.toNanos(record.timestamp());
}

if (timestampColumnValue == Long.MIN_VALUE) {
sender.atNow();
} else {
Expand Down Expand Up @@ -338,7 +369,7 @@ private void handleMap(String name, Map<?, ?> value, String fallbackName) {
for (Map.Entry<?, ?> entry : value.entrySet()) {
Object mapKey = entry.getKey();
if (!(mapKey instanceof String)) {
throw new ConnectException("Map keys must be strings");
throw new InvalidDataException("Map keys must be strings");
}
String mapKeyName = (String) mapKey;
String entryName = name.isEmpty() ? mapKeyName : name + STRUCT_FIELD_SEPARATOR + mapKeyName;
Expand All @@ -365,7 +396,7 @@ private void handleObject(String name, Schema schema, Object value, String fallb
if (isDesignatedColumnName(name, fallbackName)) {
assert timestampColumnValue == Long.MIN_VALUE;
if (value == null) {
throw new ConnectException("Timestamp column value cannot be null");
throw new InvalidDataException("Timestamp column value cannot be null");
}
timestampColumnValue = resolveDesignatedTimestampColumnValue(value, schema);
return;
Expand Down Expand Up @@ -393,7 +424,7 @@ private long resolveDesignatedTimestampColumnValue(Object value, Schema schema)
return parseToMicros((String) value) * 1000;
}
if (!(value instanceof Long)) {
throw new ConnectException("Unsupported timestamp column type: " + value.getClass());
throw new InvalidDataException("Unsupported timestamp column type: " + value.getClass());
}
long longValue = (Long) value;
TimeUnit inputUnit;
Expand Down Expand Up @@ -453,7 +484,7 @@ private long parseToMicros(String timestamp) {
try {
return dataFormat.parse(timestamp, DateFormatUtils.EN_LOCALE);
} catch (NumericException e) {
throw new ConnectException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '"
throw new InvalidDataException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '"
+ QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT + "' to configure the right timestamp format. " +
"See https://questdb.io/docs/reference/function/date-time/#date-and-timestamp-format for timestamp parser documentation. ", e);
}
Expand Down Expand Up @@ -513,7 +544,7 @@ private void onUnsupportedType(String name, Object type) {
if (config.isSkipUnsupportedTypes()) {
log.debug("Skipping unsupported type: {}, name: {}", type, name);
} else {
throw new ConnectException("Unsupported type: " + type + ", name: " + name);
throw new InvalidDataException("Unsupported type: " + type + ", name: " + name);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
public final class QuestDBSinkConnectorEmbeddedTest {
private static int httpPort = -1;
private static int ilpPort = -1;
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1";
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.2.0";
private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true;

private EmbeddedConnectCluster connect;
Expand Down Expand Up @@ -248,6 +248,127 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value()));
}

@Test
public void testDeadLetterQueue_invalidTableName() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
props.put("value.converter.schemas.enable", "false");
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

// we send this with an invalid key - contains dots
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":88}";

connect.kafka().produce(topicName, topicName, "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "k,e,y", badObjectString);
connect.kafka().produce(topicName, topicName, "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName,
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_invalidColumnName() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

// invalid column - contains a star
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"a*g*e\":88}";

connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "key", badObjectString);
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName,
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_unsupportedType() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

// contains array - not supported
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":[1, 2, 3]}";

connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "key", badObjectString);
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName,
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_emptyTable() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
props.put("value.converter.schemas.enable", "false");
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

connect.kafka().produce(topicName, "tab", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
String emptyRecordValue = "{\"firstname\":\"empty\",\"lastname\":\"\",\"age\":-41}";
connect.kafka().produce(topicName, "", emptyRecordValue);
connect.kafka().produce(topicName, "tab", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from tab",
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(emptyRecordValue, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_badColumnType() {
connect.kafka().createTopic(topicName, 1);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<dependency>
<groupId>org.questdb</groupId>
<artifactId>questdb</artifactId>
<version>7.4.0</version>
<version>8.2.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down

0 comments on commit 913a224

Please sign in to comment.