diff --git a/connector/src/main/java/io/questdb/kafka/BufferingSender.java b/connector/src/main/java/io/questdb/kafka/BufferingSender.java index 08a84b1..36ad3d3 100644 --- a/connector/src/main/java/io/questdb/kafka/BufferingSender.java +++ b/connector/src/main/java/io/questdb/kafka/BufferingSender.java @@ -97,6 +97,24 @@ public Sender boolColumn(CharSequence name, boolean value) { return this; } + @Override + public void cancelRow() { + symbolColumnNames.clear(); + symbolColumnValues.clear(); + stringNames.clear(); + stringValues.clear(); + longNames.clear(); + longValues.clear(); + doubleNames.clear(); + doubleValues.clear(); + boolNames.clear(); + boolValues.clear(); + timestampNames.clear(); + timestampValues.clear(); + + sender.cancelRow(); + } + @Override public Sender timestampColumn(CharSequence name, long value, ChronoUnit unit) { if (symbolColumns.contains(name)) { diff --git a/connector/src/main/java/io/questdb/kafka/InvalidDataException.java b/connector/src/main/java/io/questdb/kafka/InvalidDataException.java new file mode 100644 index 0000000..07a9308 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/InvalidDataException.java @@ -0,0 +1,14 @@ +package io.questdb.kafka; + +import io.questdb.std.NumericException; +import org.apache.kafka.connect.errors.ConnectException; + +public final class InvalidDataException extends ConnectException { + public InvalidDataException(String message) { + super(message); + } + + public InvalidDataException(String message, Throwable e) { + super(message, e); + } +} diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index dc41a5b..81f13bc 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -172,7 +172,22 @@ public void put(Collection collection) { if (httpTransport) { inflightSinkRecords.add(record); } - handleSingleRecord(record); + try { + handleSingleRecord(record); + } catch (InvalidDataException ex) { + // data format error generated on client-side + + if (httpTransport && reporter != null) { + // we have DLQ set, let's report this single object + + // remove the last item from in-flight records + inflightSinkRecords.setPos(inflightSinkRecords.size() - 1); + context.errantRecordReporter().report(record, ex); + } else { + // ok, no DQL, let's error the connector + throw ex; + } + } } if (httpTransport) { @@ -257,7 +272,7 @@ private void onTcpSenderException(Exception e) { private void onHttpSenderException(Exception e) { closeSenderSilently(); if ( - (reporter != null && e.getMessage() != null) // hack to detect data parsing errors + (reporter != null && e.getMessage() != null) // hack to detect data parsing errors originating at server-side && (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol")) ) { // ok, we have a parsing error, let's try to send records one by one to find the problematic record @@ -300,16 +315,32 @@ private void handleSingleRecord(SinkRecord record) { assert timestampColumnValue == Long.MIN_VALUE; CharSequence tableName = recordToTable.apply(record); - sender.table(tableName); + if (tableName == null || tableName.equals("")) { + throw new InvalidDataException("Table name cannot be empty"); + } - if (config.isIncludeKey()) { - handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME); + try { + sender.table(tableName); + if (config.isIncludeKey()) { + handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME); + } + handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME); + } catch (InvalidDataException ex) { + if (httpTransport) { + sender.cancelRow(); + } + throw ex; + } catch (LineSenderException ex) { + if (httpTransport) { + sender.cancelRow(); + } + throw new InvalidDataException("object contains invalid data", ex); } - handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME); if (kafkaTimestampsEnabled) { timestampColumnValue = TimeUnit.MILLISECONDS.toNanos(record.timestamp()); } + if (timestampColumnValue == Long.MIN_VALUE) { sender.atNow(); } else { @@ -338,7 +369,7 @@ private void handleMap(String name, Map value, String fallbackName) { for (Map.Entry entry : value.entrySet()) { Object mapKey = entry.getKey(); if (!(mapKey instanceof String)) { - throw new ConnectException("Map keys must be strings"); + throw new InvalidDataException("Map keys must be strings"); } String mapKeyName = (String) mapKey; String entryName = name.isEmpty() ? mapKeyName : name + STRUCT_FIELD_SEPARATOR + mapKeyName; @@ -365,7 +396,7 @@ private void handleObject(String name, Schema schema, Object value, String fallb if (isDesignatedColumnName(name, fallbackName)) { assert timestampColumnValue == Long.MIN_VALUE; if (value == null) { - throw new ConnectException("Timestamp column value cannot be null"); + throw new InvalidDataException("Timestamp column value cannot be null"); } timestampColumnValue = resolveDesignatedTimestampColumnValue(value, schema); return; @@ -393,7 +424,7 @@ private long resolveDesignatedTimestampColumnValue(Object value, Schema schema) return parseToMicros((String) value) * 1000; } if (!(value instanceof Long)) { - throw new ConnectException("Unsupported timestamp column type: " + value.getClass()); + throw new InvalidDataException("Unsupported timestamp column type: " + value.getClass()); } long longValue = (Long) value; TimeUnit inputUnit; @@ -453,7 +484,7 @@ private long parseToMicros(String timestamp) { try { return dataFormat.parse(timestamp, DateFormatUtils.EN_LOCALE); } catch (NumericException e) { - throw new ConnectException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '" + throw new InvalidDataException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '" + QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT + "' to configure the right timestamp format. " + "See https://questdb.io/docs/reference/function/date-time/#date-and-timestamp-format for timestamp parser documentation. ", e); } @@ -513,7 +544,7 @@ private void onUnsupportedType(String name, Object type) { if (config.isSkipUnsupportedTypes()) { log.debug("Skipping unsupported type: {}, name: {}", type, name); } else { - throw new ConnectException("Unsupported type: " + type + ", name: " + name); + throw new InvalidDataException("Unsupported type: " + type + ", name: " + name); } } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 1d416dd..11cccfe 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -52,7 +52,7 @@ public final class QuestDBSinkConnectorEmbeddedTest { private static int httpPort = -1; private static int ilpPort = -1; - private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1"; + private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.2.0"; private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true; private EmbeddedConnectCluster connect; @@ -248,6 +248,127 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) { Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value())); } + @Test + public void testDeadLetterQueue_invalidTableName() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("errors.deadletterqueue.topic.name", "dlq"); + props.put("errors.deadletterqueue.topic.replication.factor", "1"); + props.put("errors.tolerance", "all"); + props.put("value.converter.schemas.enable", "false"); + props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // we send this with an invalid key - contains dots + String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":88}"; + + connect.kafka().produce(topicName, topicName, "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + connect.kafka().produce(topicName, "k,e,y", badObjectString); + connect.kafka().produce(topicName, topicName, "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}"); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from " + topicName, + httpPort); + + ConsumerRecords fetchedRecords = connect.kafka().consume(1, 120_000, "dlq"); + Assertions.assertEquals(1, fetchedRecords.count()); + Iterator> iterator = fetchedRecords.iterator(); + Assertions.assertEquals(badObjectString, new String(iterator.next().value())); + } + + @Test + public void testDeadLetterQueue_invalidColumnName() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("errors.deadletterqueue.topic.name", "dlq"); + props.put("errors.deadletterqueue.topic.replication.factor", "1"); + props.put("errors.tolerance", "all"); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // invalid column - contains a star + String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"a*g*e\":88}"; + + connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + connect.kafka().produce(topicName, "key", badObjectString); + connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}"); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from " + topicName, + httpPort); + + ConsumerRecords fetchedRecords = connect.kafka().consume(1, 120_000, "dlq"); + Assertions.assertEquals(1, fetchedRecords.count()); + Iterator> iterator = fetchedRecords.iterator(); + Assertions.assertEquals(badObjectString, new String(iterator.next().value())); + } + + @Test + public void testDeadLetterQueue_unsupportedType() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("errors.deadletterqueue.topic.name", "dlq"); + props.put("errors.deadletterqueue.topic.replication.factor", "1"); + props.put("errors.tolerance", "all"); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // contains array - not supported + String badObjectString = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":[1, 2, 3]}"; + + connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + connect.kafka().produce(topicName, "key", badObjectString); + connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}"); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from " + topicName, + httpPort); + + ConsumerRecords fetchedRecords = connect.kafka().consume(1, 120_000, "dlq"); + Assertions.assertEquals(1, fetchedRecords.count()); + Iterator> iterator = fetchedRecords.iterator(); + Assertions.assertEquals(badObjectString, new String(iterator.next().value())); + } + + @Test + public void testDeadLetterQueue_emptyTable() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${key}"); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + props.put("value.converter.schemas.enable", "false"); + props.put("errors.deadletterqueue.topic.name", "dlq"); + props.put("errors.deadletterqueue.topic.replication.factor", "1"); + props.put("errors.tolerance", "all"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + connect.kafka().produce(topicName, "tab", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + String emptyRecordValue = "{\"firstname\":\"empty\",\"lastname\":\"\",\"age\":-41}"; + connect.kafka().produce(topicName, "", emptyRecordValue); + connect.kafka().produce(topicName, "tab", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}"); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from tab", + httpPort); + + ConsumerRecords fetchedRecords = connect.kafka().consume(1, 120_000, "dlq"); + Assertions.assertEquals(1, fetchedRecords.count()); + Iterator> iterator = fetchedRecords.iterator(); + Assertions.assertEquals(emptyRecordValue, new String(iterator.next().value())); + } + @Test public void testDeadLetterQueue_badColumnType() { connect.kafka().createTopic(topicName, 1); diff --git a/pom.xml b/pom.xml index eb88136..2dd74de 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ org.questdb questdb - 7.4.0 + 8.2.0 org.junit.jupiter