Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for templating in table names #24

Merged
merged 3 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public final class QuestDBSinkTask extends SinkTask {
private static final char STRUCT_FIELD_SEPARATOR = '_';
private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key";
private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value";

private Function<SinkRecord, ? extends CharSequence> recordToTable;
private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class);
private Sender sender;
private QuestDBSinkConnectorConfig config;
Expand Down Expand Up @@ -83,6 +85,7 @@ public void start(Map<String, String> map) {
this.timestampUnits = config.getTimestampUnitsOrNull();
this.allowedLag = config.getAllowedLag();
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
this.recordToTable = Templating.newTableTableFn(config.getTable());
}

private Sender createRawSender() {
Expand Down Expand Up @@ -247,8 +250,7 @@ private void closeSenderSilently() {
private void handleSingleRecord(SinkRecord record) {
assert timestampColumnValue == Long.MIN_VALUE;

String explicitTable = config.getTable();
String tableName = explicitTable == null ? record.topic() : explicitTable;
CharSequence tableName = recordToTable.apply(record);
sender.table(tableName);

if (config.isIncludeKey()) {
Expand Down
77 changes: 77 additions & 0 deletions connector/src/main/java/io/questdb/kafka/Templating.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.questdb.kafka;

import io.questdb.std.str.StringSink;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

final class Templating {
private Templating() {
}

static Function<SinkRecord, ? extends CharSequence> newTableTableFn(String template) {
if (template == null || template.isEmpty()) {
return SinkRecord::topic;
}
int currentPos = 0;
List<Function<SinkRecord, String>> partials = null;
for (;;) {
int templateStartPos = template.indexOf("${", currentPos);
if (templateStartPos == -1) {
break;
}
int templateEndPos = template.indexOf('}', templateStartPos + 2);
if (templateEndPos == -1) {
throw new ConnectException("Unbalanced brackets in a table template, missing closing '}', table template: '" + template + "'");
}
int nextTemplateStartPos = template.indexOf("${", templateStartPos + 1);
if (nextTemplateStartPos != -1 && nextTemplateStartPos < templateEndPos) {
throw new ConnectException("Nesting templates in a table name are not supported, table template: '" + template + "'");
}
String templateName = template.substring(templateStartPos + 2, templateEndPos);
if (templateName.isEmpty()) {
throw new ConnectException("Empty template in table name, table template: '" + template + "'");
}
if (partials == null) {
partials = new ArrayList<>();
}
String literal = template.substring(currentPos, templateStartPos);
if (!literal.isEmpty()) {
partials.add(record -> literal);
}
switch (templateName) {
case "topic": {
partials.add(SinkRecord::topic);
break;
}
case "key": {
partials.add(record -> record.key() == null ? "null" : record.key().toString());
break;
}
default: {
throw new ConnectException("Unknown template in table name, table template: '" + template + "'");
}
}
currentPos = templateEndPos + 1;
}
if (partials == null) {
return record -> template;
}
String literal = template.substring(currentPos);
if (!literal.isEmpty()) {
partials.add(record -> literal);
}
List<Function<SinkRecord, String>> finalPartials = partials;
StringSink sink = new StringSink();
return record -> {
sink.clear();
for (Function<SinkRecord, String> partial : finalPartials) {
sink.put(partial.apply(record));
}
return sink;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,68 @@ public void testSmoke(boolean useHttp) {
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTableTemplateWithKey_withSchema(boolean useHttp) {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${topic}.${key}");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
Schema schema = SchemaBuilder.struct().name("com.example.Person")
.field("firstname", Schema.STRING_SCHEMA)
.field("lastname", Schema.STRING_SCHEMA)
.field("age", Schema.INT8_SCHEMA)
.build();

Struct john = new Struct(schema)
.put("firstname", "John")
.put("lastname", "Doe")
.put("age", (byte) 42);

Struct jane = new Struct(schema)
.put("firstname", "Jane")
.put("lastname", "Doe")
.put("age", (byte) 41);

connect.kafka().produce(topicName, "john", new String(converter.fromConnectData(topicName, schema, john)));
connect.kafka().produce(topicName, "jane", new String(converter.fromConnectData(topicName, schema, jane)));

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n",
"select firstname,lastname,age from " + topicName + "." + "john",
httpPort);
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName + "." + "jane",
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTableTemplateWithKey_schemaless(boolean useHttp) {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "literal_${topic}_literal_${key}_literal");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

connect.kafka().produce(topicName, "john", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, "jane", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n",
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "john_literal",
httpPort);
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "jane_literal",
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Expand Down
105 changes: 105 additions & 0 deletions connector/src/test/java/io/questdb/kafka/TemplatingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.questdb.kafka;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Test;

import java.util.function.Function;

public class TemplatingTest {

@Test
public void testPlainTableName() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("table");
SinkRecord record = newSinkRecord("topic", "key");
assertTableName(fn, record, "table");
}

@Test
public void testEmptyTableName() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("");
SinkRecord record = newSinkRecord("topic", "key");
assertTableName(fn, record, "topic");
}

@Test
public void testNullTableName() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn(null);
SinkRecord record = newSinkRecord("topic", "key");
assertTableName(fn, record, "topic");
}

@Test
public void testSimpleTopicTemplate() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}");
SinkRecord record = newSinkRecord("mytopic", "key");
assertTableName(fn, record, "mytopic");
}

@Test
public void testTopicWithKeyTemplates() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
SinkRecord record = newSinkRecord("mytopic", "mykey");
assertTableName(fn, record, "mytopic_mykey");
}

@Test
public void testTopicWithNullKey() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
SinkRecord record = newSinkRecord("mytopic", null);
assertTableName(fn, record, "mytopic_null");
}

@Test
public void testMissingClosingBrackets() {
assertIllegalTemplate("${topic", "Unbalanced brackets in a table template, missing closing '}', table template: '${topic'");
}

@Test
public void testOverlappingTemplates() {
assertIllegalTemplate("${topic${key}", "Nesting templates in a table name are not supported, table template: '${topic${key}'");
}

@Test
public void testEmptyTemplate() {
assertIllegalTemplate("${}", "Empty template in table name, table template: '${}'");
}

@Test
public void testIllegalTemplate() {
assertIllegalTemplate("${unknown}", "Unknown template in table name, table template: '${unknown}'");
}

@Test
public void testSuffixLiteral() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_suffix");
SinkRecord record = newSinkRecord("mytopic", "key");
assertTableName(fn, record, "mytopic_suffix");
}

private static void assertIllegalTemplate(String template, String expectedMessage) {
try {
Templating.newTableTableFn(template);
Assert.fail();
} catch (ConnectException e) {
Assert.assertEquals(expectedMessage, e.getMessage());
}
}

@Test
public void testTopicWithEmptyKey() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}");
SinkRecord record = newSinkRecord("mytopic", "");
assertTableName(fn, record, "mytopic_");
}

private static void assertTableName(Function<SinkRecord, ? extends CharSequence> fn, SinkRecord record, String expectedTable) {
Assert.assertEquals(expectedTable, fn.apply(record).toString());
}

private static SinkRecord newSinkRecord(String topic, String key) {
return new SinkRecord(topic, 0, null, key, null, null, 0);
}

}
Loading