Skip to content

Commit

Permalink
feat: decrease latency when some rows are locally buffered
Browse files Browse the repository at this point in the history
we instruct Kafka Connect to call us in a reasonable time even
there are no new message to be delivered. this gives us a chance
to flush whenever we have buffered locally. this means users no
longer have to explicitly decrease offset.commit.interval.ms.
  • Loading branch information
jerrinot committed Apr 6, 2024
1 parent f0350f5 commit cabd273
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 8 deletions.
21 changes: 20 additions & 1 deletion connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,28 @@ private Sender createSender() {
@Override
public void put(Collection<SinkRecord> collection) {
if (collection.isEmpty()) {
log.debug("Received empty collection, ignoring");
if (httpTransport) {
log.debug("Received empty collection, let's flush the buffer");
// Ok, there are no new records to send. Let's flush! Why?
// We do not want locally buffered row to be stuck in the buffer for too long. Increases latency
// between the time the record is produced and the time it is visible in QuestDB.
// If the local buffer is empty then flushing is a cheap no-op.
try {
sender.flush();
} catch (LineSenderException | HttpClientException e) {
onSenderException(e);
}
} else {
log.debug("Received empty collection, nothing to do");
}
return;
} if (httpTransport) {
// there are some records to send. good.
// let's set a timeout so Kafka Connect will call us again in time
// even if there are no new records to send. this gives us a chance to flush the buffer.
context.timeout(1000);
}

if (log.isDebugEnabled()) {
SinkRecord record = collection.iterator().next();
log.debug("Received {} records. First record kafka coordinates:({}-{}-{}). ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class QuestDBSinkConnectorIT {
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush_rows=1;")
.withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush=off;") // intentionally disabled auto-flush
.withNetwork(network)
.withExposedPorts(8083)
.withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar")
Expand Down Expand Up @@ -97,6 +97,7 @@ public void test() throws Exception {
.with("key.converter", "org.apache.kafka.connect.storage.StringConverter")
.with("value.converter", "org.apache.kafka.connect.storage.StringConverter")
.with("topics", topicName);
// ilp client conf string set via environment variable

connectContainer.registerConnector("my-connector", connector);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ public class DebeziumIT {
private final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0")
.withNetwork(network)
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
.withEnv("QDB_CAIRO_COMMIT_LAG", "100")
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")));


private ConnectorConfiguration newQuestSinkBaseConfig(String questTableName) {
String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;auto_flush_rows=1000;";
String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;";
return ConnectorConfiguration.create()
.with("connector.class", QuestDBSinkConnector.class.getName())
.with("client.conf.string", confString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ private static GenericContainer<?> newConnectContainer(int id) {
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka0:9092")
.withEnv("CONNECT_GROUP_ID", "test")
.withEnv("CONNECT_OFFSET_FLUSH_INTERVAL_MS", "5000")
.withEnv("CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY", "All")
.withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic")
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic")
.withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic")
Expand Down Expand Up @@ -298,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) {
}

private static void startConnector() throws IOException, InterruptedException, URISyntaxException {
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=1200000;retry_timeout=60000;";
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;";

String payload = "{\"name\":\"my-connector\",\"config\":{" +
"\"tasks.max\":\"4\"," +
Expand Down

0 comments on commit cabd273

Please sign in to comment.