Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DQL for client-side validations failures #29

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions connector/src/main/java/io/questdb/kafka/BufferingSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ public Sender boolColumn(CharSequence name, boolean value) {
return this;
}

@Override
public void cancelRow() {
symbolColumnNames.clear();
symbolColumnValues.clear();
stringNames.clear();
stringValues.clear();
longNames.clear();
longValues.clear();
doubleNames.clear();
doubleValues.clear();
boolNames.clear();
boolValues.clear();
timestampNames.clear();
timestampValues.clear();

sender.cancelRow();
}

@Override
public Sender timestampColumn(CharSequence name, long value, ChronoUnit unit) {
if (symbolColumns.contains(name)) {
Expand Down
14 changes: 14 additions & 0 deletions connector/src/main/java/io/questdb/kafka/InvalidDataException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.questdb.kafka;

import io.questdb.std.NumericException;
import org.apache.kafka.connect.errors.ConnectException;

public final class InvalidDataException extends ConnectException {
public InvalidDataException(String message) {
super(message);
}

public InvalidDataException(String message, Throwable e) {
super(message, e);
}
}
53 changes: 42 additions & 11 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,22 @@ public void put(Collection<SinkRecord> collection) {
if (httpTransport) {
inflightSinkRecords.add(record);
}
handleSingleRecord(record);
try {
handleSingleRecord(record);
} catch (InvalidDataException ex) {
// data format error generated on client-side

if (httpTransport && reporter != null) {
// we have DLQ set, let's report this single object

// remove the last item from in-flight records
inflightSinkRecords.setPos(inflightSinkRecords.size() - 1);
context.errantRecordReporter().report(record, ex);
} else {
// ok, no DQL, let's error the connector
throw ex;
}
}
}

if (httpTransport) {
Expand Down Expand Up @@ -257,7 +272,7 @@ private void onTcpSenderException(Exception e) {
private void onHttpSenderException(Exception e) {
closeSenderSilently();
if (
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors originating at server-side
&& (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol"))
) {
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
Expand Down Expand Up @@ -300,16 +315,32 @@ private void handleSingleRecord(SinkRecord record) {
assert timestampColumnValue == Long.MIN_VALUE;

CharSequence tableName = recordToTable.apply(record);
sender.table(tableName);
if (tableName == null || tableName.equals("")) {
throw new InvalidDataException("Table name cannot be empty");
}

if (config.isIncludeKey()) {
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
try {
sender.table(tableName);
if (config.isIncludeKey()) {
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
}
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);
} catch (InvalidDataException ex) {
if (httpTransport) {
sender.cancelRow();
}
throw ex;
} catch (LineSenderException ex) {
if (httpTransport) {
sender.cancelRow();
}
throw new InvalidDataException("object contains invalid data", ex);
}
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);

if (kafkaTimestampsEnabled) {
timestampColumnValue = TimeUnit.MILLISECONDS.toNanos(record.timestamp());
}

if (timestampColumnValue == Long.MIN_VALUE) {
sender.atNow();
} else {
Expand Down Expand Up @@ -338,7 +369,7 @@ private void handleMap(String name, Map<?, ?> value, String fallbackName) {
for (Map.Entry<?, ?> entry : value.entrySet()) {
Object mapKey = entry.getKey();
if (!(mapKey instanceof String)) {
throw new ConnectException("Map keys must be strings");
throw new InvalidDataException("Map keys must be strings");
}
String mapKeyName = (String) mapKey;
String entryName = name.isEmpty() ? mapKeyName : name + STRUCT_FIELD_SEPARATOR + mapKeyName;
Expand All @@ -365,7 +396,7 @@ private void handleObject(String name, Schema schema, Object value, String fallb
if (isDesignatedColumnName(name, fallbackName)) {
assert timestampColumnValue == Long.MIN_VALUE;
if (value == null) {
throw new ConnectException("Timestamp column value cannot be null");
throw new InvalidDataException("Timestamp column value cannot be null");
}
timestampColumnValue = resolveDesignatedTimestampColumnValue(value, schema);
return;
Expand Down Expand Up @@ -393,7 +424,7 @@ private long resolveDesignatedTimestampColumnValue(Object value, Schema schema)
return parseToMicros((String) value) * 1000;
}
if (!(value instanceof Long)) {
throw new ConnectException("Unsupported timestamp column type: " + value.getClass());
throw new InvalidDataException("Unsupported timestamp column type: " + value.getClass());
}
long longValue = (Long) value;
TimeUnit inputUnit;
Expand Down Expand Up @@ -453,7 +484,7 @@ private long parseToMicros(String timestamp) {
try {
return dataFormat.parse(timestamp, DateFormatUtils.EN_LOCALE);
} catch (NumericException e) {
throw new ConnectException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '"
throw new InvalidDataException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '"
+ QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT + "' to configure the right timestamp format. " +
"See https://questdb.io/docs/reference/function/date-time/#date-and-timestamp-format for timestamp parser documentation. ", e);
}
Expand Down Expand Up @@ -513,7 +544,7 @@ private void onUnsupportedType(String name, Object type) {
if (config.isSkipUnsupportedTypes()) {
log.debug("Skipping unsupported type: {}, name: {}", type, name);
} else {
throw new ConnectException("Unsupported type: " + type + ", name: " + name);
throw new InvalidDataException("Unsupported type: " + type + ", name: " + name);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
public final class QuestDBSinkConnectorEmbeddedTest {
private static int httpPort = -1;
private static int ilpPort = -1;
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1";
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.2.0";
private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true;

private EmbeddedConnectCluster connect;
Expand Down Expand Up @@ -248,6 +248,127 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value()));
}

@Test
public void testDeadLetterQueue_invalidTableName() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
props.put("value.converter.schemas.enable", "false");
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

// we send this with an invalid key - contains dots
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":88}";

connect.kafka().produce(topicName, topicName, "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "k,e,y", badObjectString);
connect.kafka().produce(topicName, topicName, "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName,
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_invalidColumnName() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

// invalid column - contains a star
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"a*g*e\":88}";

connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "key", badObjectString);
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName,
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_unsupportedType() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

// contains array - not supported
String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":[1, 2, 3]}";

connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "key", badObjectString);
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName,
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badObjectString, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_emptyTable() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
props.put("value.converter.schemas.enable", "false");
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

connect.kafka().produce(topicName, "tab", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
String emptyRecordValue = "{\"firstname\":\"empty\",\"lastname\":\"\",\"age\":-41}";
connect.kafka().produce(topicName, "", emptyRecordValue);
connect.kafka().produce(topicName, "tab", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from tab",
httpPort);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 120_000, "dlq");
Assertions.assertEquals(1, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(emptyRecordValue, new String(iterator.next().value()));
}

@Test
public void testDeadLetterQueue_badColumnType() {
connect.kafka().createTopic(topicName, 1);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<dependency>
<groupId>org.questdb</groupId>
<artifactId>questdb</artifactId>
<version>7.4.0</version>
<version>8.2.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down