Skip to content

Commit

Permalink
feat: support for templating in table names (#24)
Browse files Browse the repository at this point in the history
This changeset introduces a simple templating language in the target
`table` configuration.

Previously, there were two options:
1. Unset Table Configuration (=default): The connector used the same QuestDB
   table as the topic name from which the message originated. When the connector
   was set to listen on multiple topics, it ingested into multiple QuestDB
   tables.

2. Explicit Table Configuration: The connector ingested into the one configured
   table, regardless of the topic from which the messages originated.

This change allows the use of templates in table configurations.
For example, setting table to `${topic}` will cause ingestion
into the table named after the topic from which the message
originated. This behavior mirrors the scenario where the table
configuration is not set.

However, it also supports more advanced scenarios, such as `${topic}_${key}`,
where key is a string representation of the message key.

Supported Placeholders:
1. `${topic}`
2. `${key}` - string representation of the message key or 'null'

More placeholders may be added in the future.

Caveats:
1. The key is intended for use with simple values and not with complex objects
   such as structs or arrays.
2. Using `${key}` can result in a large number of tables. QuestDB might require
   tuning when handling thousands of tables.
  • Loading branch information
jerrinot authored Jun 18, 2024
1 parent 35fb9bd commit 45e4b7c
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 2 deletions.
6 changes: 4 additions & 2 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public final class QuestDBSinkTask extends SinkTask {
private static final char STRUCT_FIELD_SEPARATOR = '_';
private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key";
private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value";

private Function<SinkRecord, ? extends CharSequence> recordToTable;
private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class);
private Sender sender;
private QuestDBSinkConnectorConfig config;
Expand Down Expand Up @@ -83,6 +85,7 @@ public void start(Map<String, String> map) {
this.timestampUnits = config.getTimestampUnitsOrNull();
this.allowedLag = config.getAllowedLag();
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
this.recordToTable = Templating.newTableTableFn(config.getTable());
}

private Sender createRawSender() {
Expand Down Expand Up @@ -247,8 +250,7 @@ private void closeSenderSilently() {
private void handleSingleRecord(SinkRecord record) {
assert timestampColumnValue == Long.MIN_VALUE;

String explicitTable = config.getTable();
String tableName = explicitTable == null ? record.topic() : explicitTable;
CharSequence tableName = recordToTable.apply(record);
sender.table(tableName);

if (config.isIncludeKey()) {
Expand Down
77 changes: 77 additions & 0 deletions connector/src/main/java/io/questdb/kafka/Templating.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.questdb.kafka;

import io.questdb.std.str.StringSink;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

final class Templating {
private Templating() {
}

static Function<SinkRecord, ? extends CharSequence> newTableTableFn(String template) {
if (template == null || template.isEmpty()) {
return SinkRecord::topic;
}
int currentPos = 0;
List<Function<SinkRecord, String>> partials = null;
for (;;) {
int templateStartPos = template.indexOf("${", currentPos);
if (templateStartPos == -1) {
break;
}
int templateEndPos = template.indexOf('}', templateStartPos + 2);
if (templateEndPos == -1) {
throw new ConnectException("Unbalanced brackets in a table template, missing closing '}', table template: '" + template + "'");
}
int nextTemplateStartPos = template.indexOf("${", templateStartPos + 1);
if (nextTemplateStartPos != -1 && nextTemplateStartPos < templateEndPos) {
throw new ConnectException("Nesting templates in a table name are not supported, table template: '" + template + "'");
}
String templateName = template.substring(templateStartPos + 2, templateEndPos);
if (templateName.isEmpty()) {
throw new ConnectException("Empty template in table name, table template: '" + template + "'");
}
if (partials == null) {
partials = new ArrayList<>();
}
String literal = template.substring(currentPos, templateStartPos);
if (!literal.isEmpty()) {
partials.add(record -> literal);
}
switch (templateName) {
case "topic": {
partials.add(SinkRecord::topic);
break;
}
case "key": {
partials.add(record -> record.key() == null ? "null" : record.key().toString());
break;
}
default: {
throw new ConnectException("Unknown template in table name, table template: '" + template + "'");
}
}
currentPos = templateEndPos + 1;
}
if (partials == null) {
return record -> template;
}
String literal = template.substring(currentPos);
if (!literal.isEmpty()) {
partials.add(record -> literal);
}
List<Function<SinkRecord, String>> finalPartials = partials;
StringSink sink = new StringSink();
return record -> {
sink.clear();
for (Function<SinkRecord, String> partial : finalPartials) {
sink.put(partial.apply(record));
}
return sink;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,68 @@ public void testSmoke(boolean useHttp) {
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTableTemplateWithKey_withSchema(boolean useHttp) {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${topic}.${key}");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
Schema schema = SchemaBuilder.struct().name("com.example.Person")
.field("firstname", Schema.STRING_SCHEMA)
.field("lastname", Schema.STRING_SCHEMA)
.field("age", Schema.INT8_SCHEMA)
.build();

Struct john = new Struct(schema)
.put("firstname", "John")
.put("lastname", "Doe")
.put("age", (byte) 42);

Struct jane = new Struct(schema)
.put("firstname", "Jane")
.put("lastname", "Doe")
.put("age", (byte) 41);

connect.kafka().produce(topicName, "john", new String(converter.fromConnectData(topicName, schema, john)));
connect.kafka().produce(topicName, "jane", new String(converter.fromConnectData(topicName, schema, jane)));

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n",
"select firstname,lastname,age from " + topicName + "." + "john",
httpPort);
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName + "." + "jane",
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTableTemplateWithKey_schemaless(boolean useHttp) {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "literal_${topic}_literal_${key}_literal");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

connect.kafka().produce(topicName, "john", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "jane", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n",
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "john_literal",
httpPort);
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "jane_literal",
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Expand Down
105 changes: 105 additions & 0 deletions connector/src/test/java/io/questdb/kafka/TemplatingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.questdb.kafka;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Test;

import java.util.function.Function;

public class TemplatingTest {

@Test
public void testPlainTableName() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("table");
SinkRecord record = newSinkRecord("topic", "key");
assertTableName(fn, record, "table");
}

@Test
public void testEmptyTableName() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("");
SinkRecord record = newSinkRecord("topic", "key");
assertTableName(fn, record, "topic");
}

@Test
public void testNullTableName() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn(null);
SinkRecord record = newSinkRecord("topic", "key");
assertTableName(fn, record, "topic");
}

@Test
public void testSimpleTopicTemplate() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}");
SinkRecord record = newSinkRecord("mytopic", "key");
assertTableName(fn, record, "mytopic");
}

@Test
public void testTopicWithKeyTemplates() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
SinkRecord record = newSinkRecord("mytopic", "mykey");
assertTableName(fn, record, "mytopic_mykey");
}

@Test
public void testTopicWithNullKey() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
SinkRecord record = newSinkRecord("mytopic", null);
assertTableName(fn, record, "mytopic_null");
}

@Test
public void testMissingClosingBrackets() {
assertIllegalTemplate("${topic", "Unbalanced brackets in a table template, missing closing '}', table template: '${topic'");
}

@Test
public void testOverlappingTemplates() {
assertIllegalTemplate("${topic${key}", "Nesting templates in a table name are not supported, table template: '${topic${key}'");
}

@Test
public void testEmptyTemplate() {
assertIllegalTemplate("${}", "Empty template in table name, table template: '${}'");
}

@Test
public void testIllegalTemplate() {
assertIllegalTemplate("${unknown}", "Unknown template in table name, table template: '${unknown}'");
}

@Test
public void testSuffixLiteral() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_suffix");
SinkRecord record = newSinkRecord("mytopic", "key");
assertTableName(fn, record, "mytopic_suffix");
}

private static void assertIllegalTemplate(String template, String expectedMessage) {
try {
Templating.newTableTableFn(template);
Assert.fail();
} catch (ConnectException e) {
Assert.assertEquals(expectedMessage, e.getMessage());
}
}

@Test
public void testTopicWithEmptyKey() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
SinkRecord record = newSinkRecord("mytopic", "");
assertTableName(fn, record, "mytopic_");
}

private static void assertTableName(Function<SinkRecord, ? extends CharSequence> fn, SinkRecord record, String expectedTable) {
Assert.assertEquals(expectedTable, fn.apply(record).toString());
}

private static SinkRecord newSinkRecord(String topic, String key) {
return new SinkRecord(topic, 0, null, key, null, null, 0);
}

}

0 comments on commit 45e4b7c

Please sign in to comment.