Skip to content

Commit

Permalink
feat: disable interval-based auto-flushes by default
Browse files Browse the repository at this point in the history
the connector has its own mechanism to flush on inactivity,
we don't need the client to do this as well.

we disable it only when it's not set explicitly. since if a user
sets an explicit flush interval then we can assume they know
what they are doing.
  • Loading branch information
jerrinot committed Apr 7, 2024
1 parent 0c08aac commit c50b150
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 18 deletions.
66 changes: 66 additions & 0 deletions connector/src/main/java/io/questdb/kafka/ClientConfUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.questdb.kafka;

import io.questdb.client.impl.ConfStringParser;
import io.questdb.std.Chars;
import io.questdb.std.str.StringSink;

final class ClientConfUtils {
private ClientConfUtils() {
}

static boolean patchConfStr(String confStr, StringSink sink) {
int pos = ConfStringParser.of(confStr, sink);
if (pos < 0) {
sink.clear();
sink.put(confStr);
return false;
}

boolean isHttpTransport = Chars.equals(sink, "http") || Chars.equals(sink, "https");
boolean intervalFlushSetExplicitly = false;
boolean flushesDisabled = false;
boolean parseError = false;
boolean hasAtLeastOneParam = false;

// disable interval based flushes
// unless they are explicitly set or auto_flush is entirely off
// why? the connector has its own mechanism to flush data in a timely manner
while (ConfStringParser.hasNext(confStr, pos)) {
hasAtLeastOneParam = true;
pos = ConfStringParser.nextKey(confStr, pos, sink);
if (pos < 0) {
parseError = true;
break;
}
if (Chars.equals(sink, "auto_flush_interval")) {
intervalFlushSetExplicitly = true;
pos = ConfStringParser.value(confStr, pos, sink);
} else if (Chars.equals(sink, "auto_flush")) {
pos = ConfStringParser.value(confStr, pos, sink);
flushesDisabled = Chars.equals(sink, "off");
} else {
pos = ConfStringParser.value(confStr, pos, sink); // skip other values
}
if (pos < 0) {
parseError = true;
break;
}
}
sink.clear();
sink.put(confStr);
if (!parseError // we don't want to mess with the config if there was a parse error
&& isHttpTransport // we only want to patch http transport
&& !flushesDisabled // if auto-flush is disabled we don't need to do anything
&& !intervalFlushSetExplicitly // if auto_flush_interval is set explicitly we don't want to override it
&& hasAtLeastOneParam // no parameter is also an error since at least address should be set. we let client throw exception in this case
) {
// if everything is ok, we set auto_flush_interval to max value
// this will effectively disable interval based flushes
// and the connector will flush data only when it is told to do so by Connector
// or if a row count limit is reached
sink.put("auto_flush_interval=").put(Integer.MAX_VALUE).put(';');
}

return isHttpTransport;
}
}
24 changes: 13 additions & 11 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import io.questdb.client.Sender;
import io.questdb.cutlass.http.client.HttpClientException;
import io.questdb.cutlass.line.LineSenderException;
import io.questdb.cutlass.line.http.LineHttpSender;
import io.questdb.std.NumericException;
import io.questdb.std.datetime.DateFormat;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.datetime.millitime.DateFormatUtils;
import io.questdb.std.str.StringSink;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.types.Password;
Expand Down Expand Up @@ -89,9 +89,9 @@ private Sender createRawSender() {
}
if (confStr != null && !confStr.isEmpty()) {
log.debug("Using client configuration string");
Sender s = Sender.fromConfig(confStr);
httpTransport = s instanceof LineHttpSender;
return s;
StringSink sink = new StringSink();
httpTransport = ClientConfUtils.patchConfStr(confStr, sink);
return Sender.fromConfig(sink);
}
log.debug("Using legacy client configuration");
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost());
Expand Down Expand Up @@ -128,8 +128,8 @@ public void put(Collection<SinkRecord> collection) {
if (httpTransport) {
log.debug("Received empty collection, let's flush the buffer");
// Ok, there are no new records to send. Let's flush! Why?
// We do not want locally buffered row to be stuck in the buffer for too long. Increases latency
// between the time the record is produced and the time it is visible in QuestDB.
// We do not want locally buffered row to be stuck in the buffer for too long. It increases
// latency between the time the record is produced and the time it is visible in QuestDB.
// If the local buffer is empty then flushing is a cheap no-op.
try {
sender.flush();
Expand All @@ -140,11 +140,6 @@ public void put(Collection<SinkRecord> collection) {
log.debug("Received empty collection, nothing to do");
}
return;
} if (httpTransport) {
// there are some records to send. good.
// let's set a timeout so Kafka Connect will call us again in time
// even if there are no new records to send. this gives us a chance to flush the buffer.
context.timeout(allowedLag);
}

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -181,6 +176,13 @@ public void put(Collection<SinkRecord> collection) {
} catch (LineSenderException | HttpClientException e) {
onSenderException(e);
}

if (httpTransport) {
// we successfully added some rows to the local buffer.
// let's set a timeout so Kafka Connect will call us again in time even if there are
// no new records to send. this gives us a chance to flush the buffer.
context.timeout(allowedLag);
}
}

private void onSenderException(Exception e) {
Expand Down
55 changes: 55 additions & 0 deletions connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.questdb.kafka;

import io.questdb.std.Chars;
import io.questdb.std.str.StringSink;
import org.junit.Test;

import static org.junit.jupiter.api.Assertions.*;

public class ClientConfUtilsTest {

@Test
public void testHttpTransportIsResolved() {
StringSink sink = new StringSink();
assertTrue(ClientConfUtils.patchConfStr("http::addr=localhost:9000;", sink));
assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink));
assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink));
assertFalse(ClientConfUtils.patchConfStr("tcp::addr=localhost:9000;", sink));
assertFalse(ClientConfUtils.patchConfStr("tcps::addr=localhost:9000;", sink));
}

@Test
public void testHttpTransportTimeBasedFlushesDisabledByDefault() {
assertConfStringIsPatched("http::addr=localhost:9000;");
assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;");
assertConfStringIsPatched("https::addr=localhost:9000;auto_flush_rows=1;");
assertConfStringIsPatched("https::addr=localhost:9000;auto_flush=on;");

assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush_interval=100;");
assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush=off;");
assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar");
assertConfStringIsNotPatched("https::addr");
assertConfStringIsNotPatched("https");
assertConfStringIsNotPatched("tcp::addr=localhost:9000;");
assertConfStringIsNotPatched("tcps::addr=localhost:9000;foo=bar;");
assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush_rows=1;");
assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush=on;");
assertConfStringIsNotPatched("unknown::addr=localhost:9000;auto_flush=on;");
}

private static void assertConfStringIsPatched(String confStr) {
StringSink sink = new StringSink();
ClientConfUtils.patchConfStr(confStr, sink);

String expected = confStr + "auto_flush_interval=" + Integer.MAX_VALUE + ";";
assertTrue(Chars.equals(expected, sink), "Conf string = " + confStr + ", expected = " + expected + ", actual = " + sink);
}

private static void assertConfStringIsNotPatched(String confStr) {
StringSink sink = new StringSink();
ClientConfUtils.patchConfStr(confStr, sink);

assertEquals(confStr, sink.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static Map<String, String> baseConnectorProps(GenericContainer<?> questDBContain
props.put("host", ilpIUrl);
} else {
int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT);
confString = "http::addr="+host+":"+ port + ";";
confString = "http::addr=" + host + ":" + port + ";";
props.put("client.conf.string", confString);
}
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public final class QuestDBSinkConnectorEmbeddedTest {

@BeforeAll
public static void createContainer() {
questDBContainer = newQuestDbConnector();
questDBContainer = newQuestDbContainer();
}

@AfterAll
Expand All @@ -85,7 +85,7 @@ private static String questDBDirectory() {

private static GenericContainer<?> questDBContainer;

private static GenericContainer<?> newQuestDbConnector() {
private static GenericContainer<?> newQuestDbContainer() {
FixedHostPortGenericContainer<?> selfGenericContainer = new FixedHostPortGenericContainer<>(OFFICIAL_QUESTDB_DOCKER);
if (httpPort != -1) {
selfGenericContainer = selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT);
Expand Down Expand Up @@ -120,7 +120,6 @@ public void setUp() {

Map<String, String> props = new HashMap<>();
props.put("connector.client.config.override.policy", "All");
props.put("offset.flush.interval.ms", "1000");
connect = new EmbeddedConnectCluster.Builder()
.name("questdb-connect-cluster")
.workerProps(props)
Expand Down Expand Up @@ -300,7 +299,7 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw
}

// restart QuestDB
questDBContainer = newQuestDbConnector();
questDBContainer = newQuestDbContainer();
for (int i = 0; i < 50; i++) {
connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}");
}
Expand Down Expand Up @@ -541,7 +540,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt

private static void restartQuestDB() {
questDBContainer.stop();
questDBContainer = newQuestDbConnector();
questDBContainer = newQuestDbContainer();
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) {
}

private static void startConnector() throws IOException, InterruptedException, URISyntaxException {
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;";
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;retry_timeout=60000;";

String payload = "{\"name\":\"my-connector\",\"config\":{" +
"\"tasks.max\":\"4\"," +
Expand Down

0 comments on commit c50b150

Please sign in to comment.