From c50b1501ba6a1cea46288308947879f7c2045770 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Sun, 7 Apr 2024 15:17:34 +0200 Subject: [PATCH] feat: disable interval-based auto-flushes by default the connector has its own mechanism to flush on inactivity, we don't need the client to do this as well. we disable it only when it's not set explicitly. since if a user sets an explicit flush interval then we can assume they know what they are doing. --- .../io/questdb/kafka/ClientConfUtils.java | 66 +++++++++++++++++++ .../io/questdb/kafka/QuestDBSinkTask.java | 24 +++---- .../io/questdb/kafka/ClientConfUtilsTest.java | 55 ++++++++++++++++ .../io/questdb/kafka/ConnectTestUtils.java | 2 +- .../QuestDBSinkConnectorEmbeddedTest.java | 9 ++- .../java/io/questdb/kafka/ExactlyOnceIT.java | 2 +- 6 files changed, 140 insertions(+), 18 deletions(-) create mode 100644 connector/src/main/java/io/questdb/kafka/ClientConfUtils.java create mode 100644 connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java diff --git a/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java new file mode 100644 index 0000000..6374cf1 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java @@ -0,0 +1,66 @@ +package io.questdb.kafka; + +import io.questdb.client.impl.ConfStringParser; +import io.questdb.std.Chars; +import io.questdb.std.str.StringSink; + +final class ClientConfUtils { + private ClientConfUtils() { + } + + static boolean patchConfStr(String confStr, StringSink sink) { + int pos = ConfStringParser.of(confStr, sink); + if (pos < 0) { + sink.clear(); + sink.put(confStr); + return false; + } + + boolean isHttpTransport = Chars.equals(sink, "http") || Chars.equals(sink, "https"); + boolean intervalFlushSetExplicitly = false; + boolean flushesDisabled = false; + boolean parseError = false; + boolean hasAtLeastOneParam = false; + + // disable interval based flushes + // unless they are explicitly set or auto_flush is entirely off + // why? the connector has its own mechanism to flush data in a timely manner + while (ConfStringParser.hasNext(confStr, pos)) { + hasAtLeastOneParam = true; + pos = ConfStringParser.nextKey(confStr, pos, sink); + if (pos < 0) { + parseError = true; + break; + } + if (Chars.equals(sink, "auto_flush_interval")) { + intervalFlushSetExplicitly = true; + pos = ConfStringParser.value(confStr, pos, sink); + } else if (Chars.equals(sink, "auto_flush")) { + pos = ConfStringParser.value(confStr, pos, sink); + flushesDisabled = Chars.equals(sink, "off"); + } else { + pos = ConfStringParser.value(confStr, pos, sink); // skip other values + } + if (pos < 0) { + parseError = true; + break; + } + } + sink.clear(); + sink.put(confStr); + if (!parseError // we don't want to mess with the config if there was a parse error + && isHttpTransport // we only want to patch http transport + && !flushesDisabled // if auto-flush is disabled we don't need to do anything + && !intervalFlushSetExplicitly // if auto_flush_interval is set explicitly we don't want to override it + && hasAtLeastOneParam // no parameter is also an error since at least address should be set. we let client throw exception in this case + ) { + // if everything is ok, we set auto_flush_interval to max value + // this will effectively disable interval based flushes + // and the connector will flush data only when it is told to do so by Connector + // or if a row count limit is reached + sink.put("auto_flush_interval=").put(Integer.MAX_VALUE).put(';'); + } + + return isHttpTransport; + } +} diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 12f9ce1..8695c4f 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -3,11 +3,11 @@ import io.questdb.client.Sender; import io.questdb.cutlass.http.client.HttpClientException; import io.questdb.cutlass.line.LineSenderException; -import io.questdb.cutlass.line.http.LineHttpSender; import io.questdb.std.NumericException; import io.questdb.std.datetime.DateFormat; import io.questdb.std.datetime.microtime.Timestamps; import io.questdb.std.datetime.millitime.DateFormatUtils; +import io.questdb.std.str.StringSink; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.types.Password; @@ -89,9 +89,9 @@ private Sender createRawSender() { } if (confStr != null && !confStr.isEmpty()) { log.debug("Using client configuration string"); - Sender s = Sender.fromConfig(confStr); - httpTransport = s instanceof LineHttpSender; - return s; + StringSink sink = new StringSink(); + httpTransport = ClientConfUtils.patchConfStr(confStr, sink); + return Sender.fromConfig(sink); } log.debug("Using legacy client configuration"); Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost()); @@ -128,8 +128,8 @@ public void put(Collection collection) { if (httpTransport) { log.debug("Received empty collection, let's flush the buffer"); // Ok, there are no new records to send. Let's flush! Why? - // We do not want locally buffered row to be stuck in the buffer for too long. Increases latency - // between the time the record is produced and the time it is visible in QuestDB. + // We do not want locally buffered row to be stuck in the buffer for too long. It increases + // latency between the time the record is produced and the time it is visible in QuestDB. // If the local buffer is empty then flushing is a cheap no-op. try { sender.flush(); @@ -140,11 +140,6 @@ public void put(Collection collection) { log.debug("Received empty collection, nothing to do"); } return; - } if (httpTransport) { - // there are some records to send. good. - // let's set a timeout so Kafka Connect will call us again in time - // even if there are no new records to send. this gives us a chance to flush the buffer. - context.timeout(allowedLag); } if (log.isDebugEnabled()) { @@ -181,6 +176,13 @@ public void put(Collection collection) { } catch (LineSenderException | HttpClientException e) { onSenderException(e); } + + if (httpTransport) { + // we successfully added some rows to the local buffer. + // let's set a timeout so Kafka Connect will call us again in time even if there are + // no new records to send. this gives us a chance to flush the buffer. + context.timeout(allowedLag); + } } private void onSenderException(Exception e) { diff --git a/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java new file mode 100644 index 0000000..6cfd5d4 --- /dev/null +++ b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java @@ -0,0 +1,55 @@ +package io.questdb.kafka; + +import io.questdb.std.Chars; +import io.questdb.std.str.StringSink; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class ClientConfUtilsTest { + + @Test + public void testHttpTransportIsResolved() { + StringSink sink = new StringSink(); + assertTrue(ClientConfUtils.patchConfStr("http::addr=localhost:9000;", sink)); + assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink)); + assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink)); + assertFalse(ClientConfUtils.patchConfStr("tcp::addr=localhost:9000;", sink)); + assertFalse(ClientConfUtils.patchConfStr("tcps::addr=localhost:9000;", sink)); + } + + @Test + public void testHttpTransportTimeBasedFlushesDisabledByDefault() { + assertConfStringIsPatched("http::addr=localhost:9000;"); + assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;"); + assertConfStringIsPatched("https::addr=localhost:9000;auto_flush_rows=1;"); + assertConfStringIsPatched("https::addr=localhost:9000;auto_flush=on;"); + + assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush_interval=100;"); + assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush=off;"); + assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar"); + assertConfStringIsNotPatched("https::addr"); + assertConfStringIsNotPatched("https"); + assertConfStringIsNotPatched("tcp::addr=localhost:9000;"); + assertConfStringIsNotPatched("tcps::addr=localhost:9000;foo=bar;"); + assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush_rows=1;"); + assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush=on;"); + assertConfStringIsNotPatched("unknown::addr=localhost:9000;auto_flush=on;"); + } + + private static void assertConfStringIsPatched(String confStr) { + StringSink sink = new StringSink(); + ClientConfUtils.patchConfStr(confStr, sink); + + String expected = confStr + "auto_flush_interval=" + Integer.MAX_VALUE + ";"; + assertTrue(Chars.equals(expected, sink), "Conf string = " + confStr + ", expected = " + expected + ", actual = " + sink); + } + + private static void assertConfStringIsNotPatched(String confStr) { + StringSink sink = new StringSink(); + ClientConfUtils.patchConfStr(confStr, sink); + + assertEquals(confStr, sink.toString()); + } + +} \ No newline at end of file diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java index e2f71bd..047a2c2 100644 --- a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -57,7 +57,7 @@ static Map baseConnectorProps(GenericContainer questDBContain props.put("host", ilpIUrl); } else { int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); - confString = "http::addr="+host+":"+ port + ";"; + confString = "http::addr=" + host + ":" + port + ";"; props.put("client.conf.string", confString); } return props; diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index f774b05..988c976 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -64,7 +64,7 @@ public final class QuestDBSinkConnectorEmbeddedTest { @BeforeAll public static void createContainer() { - questDBContainer = newQuestDbConnector(); + questDBContainer = newQuestDbContainer(); } @AfterAll @@ -85,7 +85,7 @@ private static String questDBDirectory() { private static GenericContainer questDBContainer; - private static GenericContainer newQuestDbConnector() { + private static GenericContainer newQuestDbContainer() { FixedHostPortGenericContainer selfGenericContainer = new FixedHostPortGenericContainer<>(OFFICIAL_QUESTDB_DOCKER); if (httpPort != -1) { selfGenericContainer = selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT); @@ -120,7 +120,6 @@ public void setUp() { Map props = new HashMap<>(); props.put("connector.client.config.override.policy", "All"); - props.put("offset.flush.interval.ms", "1000"); connect = new EmbeddedConnectCluster.Builder() .name("questdb-connect-cluster") .workerProps(props) @@ -300,7 +299,7 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw } // restart QuestDB - questDBContainer = newQuestDbConnector(); + questDBContainer = newQuestDbContainer(); for (int i = 0; i < 50; i++) { connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}"); } @@ -541,7 +540,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt private static void restartQuestDB() { questDBContainer.stop(); - questDBContainer = newQuestDbConnector(); + questDBContainer = newQuestDbContainer(); } @ParameterizedTest diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 22e24a4..bf325de 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -297,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) { } private static void startConnector() throws IOException, InterruptedException, URISyntaxException { - String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;"; + String confString = "http::addr=questdb:9000;auto_flush_rows=10000;retry_timeout=60000;"; String payload = "{\"name\":\"my-connector\",\"config\":{" + "\"tasks.max\":\"4\"," +