Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: disable interval-based auto-flushes by default #18

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions connector/src/main/java/io/questdb/kafka/ClientConfUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.questdb.kafka;

import io.questdb.client.impl.ConfStringParser;
import io.questdb.std.Chars;
import io.questdb.std.str.StringSink;

final class ClientConfUtils {
private ClientConfUtils() {
}

static boolean patchConfStr(String confStr, StringSink sink) {
int pos = ConfStringParser.of(confStr, sink);
if (pos < 0) {
sink.clear();
sink.put(confStr);
return false;
}

boolean isHttpTransport = Chars.equals(sink, "http") || Chars.equals(sink, "https");
boolean intervalFlushSetExplicitly = false;
boolean flushesDisabled = false;
boolean parseError = false;
boolean hasAtLeastOneParam = false;

// disable interval based flushes
// unless they are explicitly set or auto_flush is entirely off
// why? the connector has its own mechanism to flush data in a timely manner
while (ConfStringParser.hasNext(confStr, pos)) {
hasAtLeastOneParam = true;
pos = ConfStringParser.nextKey(confStr, pos, sink);
if (pos < 0) {
parseError = true;
break;
}
if (Chars.equals(sink, "auto_flush_interval")) {
intervalFlushSetExplicitly = true;
pos = ConfStringParser.value(confStr, pos, sink);
} else if (Chars.equals(sink, "auto_flush")) {
pos = ConfStringParser.value(confStr, pos, sink);
flushesDisabled = Chars.equals(sink, "off");
} else {
pos = ConfStringParser.value(confStr, pos, sink); // skip other values
}
if (pos < 0) {
parseError = true;
break;
}
}
sink.clear();
sink.put(confStr);
if (!parseError // we don't want to mess with the config if there was a parse error
&& isHttpTransport // we only want to patch http transport
&& !flushesDisabled // if auto-flush is disabled we don't need to do anything
&& !intervalFlushSetExplicitly // if auto_flush_interval is set explicitly we don't want to override it
&& hasAtLeastOneParam // no parameter is also an error since at least address should be set. we let client throw exception in this case
) {
// if everything is ok, we set auto_flush_interval to max value
// this will effectively disable interval based flushes
// and the connector will flush data only when it is told to do so by Connector
// or if a row count limit is reached
sink.put("auto_flush_interval=").put(Integer.MAX_VALUE).put(';');
}

return isHttpTransport;
}
}
24 changes: 13 additions & 11 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import io.questdb.client.Sender;
import io.questdb.cutlass.http.client.HttpClientException;
import io.questdb.cutlass.line.LineSenderException;
import io.questdb.cutlass.line.http.LineHttpSender;
import io.questdb.std.NumericException;
import io.questdb.std.datetime.DateFormat;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.datetime.millitime.DateFormatUtils;
import io.questdb.std.str.StringSink;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.types.Password;
Expand Down Expand Up @@ -89,9 +89,9 @@ private Sender createRawSender() {
}
if (confStr != null && !confStr.isEmpty()) {
log.debug("Using client configuration string");
Sender s = Sender.fromConfig(confStr);
httpTransport = s instanceof LineHttpSender;
return s;
StringSink sink = new StringSink();
httpTransport = ClientConfUtils.patchConfStr(confStr, sink);
return Sender.fromConfig(sink);
}
log.debug("Using legacy client configuration");
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost());
Expand Down Expand Up @@ -128,8 +128,8 @@ public void put(Collection<SinkRecord> collection) {
if (httpTransport) {
log.debug("Received empty collection, let's flush the buffer");
// Ok, there are no new records to send. Let's flush! Why?
// We do not want locally buffered row to be stuck in the buffer for too long. Increases latency
// between the time the record is produced and the time it is visible in QuestDB.
// We do not want locally buffered row to be stuck in the buffer for too long. It increases
// latency between the time the record is produced and the time it is visible in QuestDB.
// If the local buffer is empty then flushing is a cheap no-op.
try {
sender.flush();
Expand All @@ -140,11 +140,6 @@ public void put(Collection<SinkRecord> collection) {
log.debug("Received empty collection, nothing to do");
}
return;
} if (httpTransport) {
// there are some records to send. good.
// let's set a timeout so Kafka Connect will call us again in time
// even if there are no new records to send. this gives us a chance to flush the buffer.
context.timeout(allowedLag);
}

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -181,6 +176,13 @@ public void put(Collection<SinkRecord> collection) {
} catch (LineSenderException | HttpClientException e) {
onSenderException(e);
}

if (httpTransport) {
// we successfully added some rows to the local buffer.
// let's set a timeout so Kafka Connect will call us again in time even if there are
// no new records to send. this gives us a chance to flush the buffer.
context.timeout(allowedLag);
}
}

private void onSenderException(Exception e) {
Expand Down
55 changes: 55 additions & 0 deletions connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.questdb.kafka;

import io.questdb.std.Chars;
import io.questdb.std.str.StringSink;
import org.junit.Test;

import static org.junit.jupiter.api.Assertions.*;

public class ClientConfUtilsTest {

@Test
public void testHttpTransportIsResolved() {
StringSink sink = new StringSink();
assertTrue(ClientConfUtils.patchConfStr("http::addr=localhost:9000;", sink));
assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink));
assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink));
assertFalse(ClientConfUtils.patchConfStr("tcp::addr=localhost:9000;", sink));
assertFalse(ClientConfUtils.patchConfStr("tcps::addr=localhost:9000;", sink));
}

@Test
public void testHttpTransportTimeBasedFlushesDisabledByDefault() {
assertConfStringIsPatched("http::addr=localhost:9000;");
assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;");
assertConfStringIsPatched("https::addr=localhost:9000;auto_flush_rows=1;");
assertConfStringIsPatched("https::addr=localhost:9000;auto_flush=on;");

assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush_interval=100;");
assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush=off;");
assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar");
assertConfStringIsNotPatched("https::addr");
assertConfStringIsNotPatched("https");
assertConfStringIsNotPatched("tcp::addr=localhost:9000;");
assertConfStringIsNotPatched("tcps::addr=localhost:9000;foo=bar;");
assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush_rows=1;");
assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush=on;");
assertConfStringIsNotPatched("unknown::addr=localhost:9000;auto_flush=on;");
}

private static void assertConfStringIsPatched(String confStr) {
StringSink sink = new StringSink();
ClientConfUtils.patchConfStr(confStr, sink);

String expected = confStr + "auto_flush_interval=" + Integer.MAX_VALUE + ";";
assertTrue(Chars.equals(expected, sink), "Conf string = " + confStr + ", expected = " + expected + ", actual = " + sink);
}

private static void assertConfStringIsNotPatched(String confStr) {
StringSink sink = new StringSink();
ClientConfUtils.patchConfStr(confStr, sink);

assertEquals(confStr, sink.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static Map<String, String> baseConnectorProps(GenericContainer<?> questDBContain
props.put("host", ilpIUrl);
} else {
int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT);
confString = "http::addr="+host+":"+ port + ";";
confString = "http::addr=" + host + ":" + port + ";";
props.put("client.conf.string", confString);
}
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public final class QuestDBSinkConnectorEmbeddedTest {

@BeforeAll
public static void createContainer() {
questDBContainer = newQuestDbConnector();
questDBContainer = newQuestDbContainer();
}

@AfterAll
Expand All @@ -85,7 +85,7 @@ private static String questDBDirectory() {

private static GenericContainer<?> questDBContainer;

private static GenericContainer<?> newQuestDbConnector() {
private static GenericContainer<?> newQuestDbContainer() {
FixedHostPortGenericContainer<?> selfGenericContainer = new FixedHostPortGenericContainer<>(OFFICIAL_QUESTDB_DOCKER);
if (httpPort != -1) {
selfGenericContainer = selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT);
Expand Down Expand Up @@ -120,7 +120,6 @@ public void setUp() {

Map<String, String> props = new HashMap<>();
props.put("connector.client.config.override.policy", "All");
props.put("offset.flush.interval.ms", "1000");
connect = new EmbeddedConnectCluster.Builder()
.name("questdb-connect-cluster")
.workerProps(props)
Expand Down Expand Up @@ -300,7 +299,7 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw
}

// restart QuestDB
questDBContainer = newQuestDbConnector();
questDBContainer = newQuestDbContainer();
for (int i = 0; i < 50; i++) {
connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}");
}
Expand Down Expand Up @@ -541,7 +540,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt

private static void restartQuestDB() {
questDBContainer.stop();
questDBContainer = newQuestDbConnector();
questDBContainer = newQuestDbContainer();
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) {
}

private static void startConnector() throws IOException, InterruptedException, URISyntaxException {
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;";
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;retry_timeout=60000;";

String payload = "{\"name\":\"my-connector\",\"config\":{" +
"\"tasks.max\":\"4\"," +
Expand Down
Loading