diff --git a/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java new file mode 100644 index 0000000..6374cf1 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java @@ -0,0 +1,66 @@ +package io.questdb.kafka; + +import io.questdb.client.impl.ConfStringParser; +import io.questdb.std.Chars; +import io.questdb.std.str.StringSink; + +final class ClientConfUtils { + private ClientConfUtils() { + } + + static boolean patchConfStr(String confStr, StringSink sink) { + int pos = ConfStringParser.of(confStr, sink); + if (pos < 0) { + sink.clear(); + sink.put(confStr); + return false; + } + + boolean isHttpTransport = Chars.equals(sink, "http") || Chars.equals(sink, "https"); + boolean intervalFlushSetExplicitly = false; + boolean flushesDisabled = false; + boolean parseError = false; + boolean hasAtLeastOneParam = false; + + // disable interval based flushes + // unless they are explicitly set or auto_flush is entirely off + // why? the connector has its own mechanism to flush data in a timely manner + while (ConfStringParser.hasNext(confStr, pos)) { + hasAtLeastOneParam = true; + pos = ConfStringParser.nextKey(confStr, pos, sink); + if (pos < 0) { + parseError = true; + break; + } + if (Chars.equals(sink, "auto_flush_interval")) { + intervalFlushSetExplicitly = true; + pos = ConfStringParser.value(confStr, pos, sink); + } else if (Chars.equals(sink, "auto_flush")) { + pos = ConfStringParser.value(confStr, pos, sink); + flushesDisabled = Chars.equals(sink, "off"); + } else { + pos = ConfStringParser.value(confStr, pos, sink); // skip other values + } + if (pos < 0) { + parseError = true; + break; + } + } + sink.clear(); + sink.put(confStr); + if (!parseError // we don't want to mess with the config if there was a parse error + && isHttpTransport // we only want to patch http transport + && !flushesDisabled // if auto-flush is disabled we don't need to do anything + && !intervalFlushSetExplicitly // if auto_flush_interval is set explicitly we don't want to override it + && hasAtLeastOneParam // no parameter is also an error since at least address should be set. we let client throw exception in this case + ) { + // if everything is ok, we set auto_flush_interval to max value + // this will effectively disable interval based flushes + // and the connector will flush data only when it is told to do so by Connector + // or if a row count limit is reached + sink.put("auto_flush_interval=").put(Integer.MAX_VALUE).put(';'); + } + + return isHttpTransport; + } +} diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 12f9ce1..8695c4f 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -3,11 +3,11 @@ import io.questdb.client.Sender; import io.questdb.cutlass.http.client.HttpClientException; import io.questdb.cutlass.line.LineSenderException; -import io.questdb.cutlass.line.http.LineHttpSender; import io.questdb.std.NumericException; import io.questdb.std.datetime.DateFormat; import io.questdb.std.datetime.microtime.Timestamps; import io.questdb.std.datetime.millitime.DateFormatUtils; +import io.questdb.std.str.StringSink; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.types.Password; @@ -89,9 +89,9 @@ private Sender createRawSender() { } if (confStr != null && !confStr.isEmpty()) { log.debug("Using client configuration string"); - Sender s = Sender.fromConfig(confStr); - httpTransport = s instanceof LineHttpSender; - return s; + StringSink sink = new StringSink(); + httpTransport = ClientConfUtils.patchConfStr(confStr, sink); + return Sender.fromConfig(sink); } log.debug("Using legacy client configuration"); Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost()); @@ -128,8 +128,8 @@ public void put(Collection collection) { if (httpTransport) { log.debug("Received empty collection, let's flush the buffer"); // Ok, there are no new records to send. Let's flush! Why? - // We do not want locally buffered row to be stuck in the buffer for too long. Increases latency - // between the time the record is produced and the time it is visible in QuestDB. + // We do not want locally buffered row to be stuck in the buffer for too long. It increases + // latency between the time the record is produced and the time it is visible in QuestDB. // If the local buffer is empty then flushing is a cheap no-op. try { sender.flush(); @@ -140,11 +140,6 @@ public void put(Collection collection) { log.debug("Received empty collection, nothing to do"); } return; - } if (httpTransport) { - // there are some records to send. good. - // let's set a timeout so Kafka Connect will call us again in time - // even if there are no new records to send. this gives us a chance to flush the buffer. - context.timeout(allowedLag); } if (log.isDebugEnabled()) { @@ -181,6 +176,13 @@ public void put(Collection collection) { } catch (LineSenderException | HttpClientException e) { onSenderException(e); } + + if (httpTransport) { + // we successfully added some rows to the local buffer. + // let's set a timeout so Kafka Connect will call us again in time even if there are + // no new records to send. this gives us a chance to flush the buffer. + context.timeout(allowedLag); + } } private void onSenderException(Exception e) { diff --git a/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java new file mode 100644 index 0000000..6cfd5d4 --- /dev/null +++ b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java @@ -0,0 +1,55 @@ +package io.questdb.kafka; + +import io.questdb.std.Chars; +import io.questdb.std.str.StringSink; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class ClientConfUtilsTest { + + @Test + public void testHttpTransportIsResolved() { + StringSink sink = new StringSink(); + assertTrue(ClientConfUtils.patchConfStr("http::addr=localhost:9000;", sink)); + assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink)); + assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink)); + assertFalse(ClientConfUtils.patchConfStr("tcp::addr=localhost:9000;", sink)); + assertFalse(ClientConfUtils.patchConfStr("tcps::addr=localhost:9000;", sink)); + } + + @Test + public void testHttpTransportTimeBasedFlushesDisabledByDefault() { + assertConfStringIsPatched("http::addr=localhost:9000;"); + assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;"); + assertConfStringIsPatched("https::addr=localhost:9000;auto_flush_rows=1;"); + assertConfStringIsPatched("https::addr=localhost:9000;auto_flush=on;"); + + assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush_interval=100;"); + assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush=off;"); + assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar"); + assertConfStringIsNotPatched("https::addr"); + assertConfStringIsNotPatched("https"); + assertConfStringIsNotPatched("tcp::addr=localhost:9000;"); + assertConfStringIsNotPatched("tcps::addr=localhost:9000;foo=bar;"); + assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush_rows=1;"); + assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush=on;"); + assertConfStringIsNotPatched("unknown::addr=localhost:9000;auto_flush=on;"); + } + + private static void assertConfStringIsPatched(String confStr) { + StringSink sink = new StringSink(); + ClientConfUtils.patchConfStr(confStr, sink); + + String expected = confStr + "auto_flush_interval=" + Integer.MAX_VALUE + ";"; + assertTrue(Chars.equals(expected, sink), "Conf string = " + confStr + ", expected = " + expected + ", actual = " + sink); + } + + private static void assertConfStringIsNotPatched(String confStr) { + StringSink sink = new StringSink(); + ClientConfUtils.patchConfStr(confStr, sink); + + assertEquals(confStr, sink.toString()); + } + +} \ No newline at end of file diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java index e2f71bd..047a2c2 100644 --- a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -57,7 +57,7 @@ static Map baseConnectorProps(GenericContainer questDBContain props.put("host", ilpIUrl); } else { int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); - confString = "http::addr="+host+":"+ port + ";"; + confString = "http::addr=" + host + ":" + port + ";"; props.put("client.conf.string", confString); } return props; diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index f774b05..988c976 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -64,7 +64,7 @@ public final class QuestDBSinkConnectorEmbeddedTest { @BeforeAll public static void createContainer() { - questDBContainer = newQuestDbConnector(); + questDBContainer = newQuestDbContainer(); } @AfterAll @@ -85,7 +85,7 @@ private static String questDBDirectory() { private static GenericContainer questDBContainer; - private static GenericContainer newQuestDbConnector() { + private static GenericContainer newQuestDbContainer() { FixedHostPortGenericContainer selfGenericContainer = new FixedHostPortGenericContainer<>(OFFICIAL_QUESTDB_DOCKER); if (httpPort != -1) { selfGenericContainer = selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT); @@ -120,7 +120,6 @@ public void setUp() { Map props = new HashMap<>(); props.put("connector.client.config.override.policy", "All"); - props.put("offset.flush.interval.ms", "1000"); connect = new EmbeddedConnectCluster.Builder() .name("questdb-connect-cluster") .workerProps(props) @@ -300,7 +299,7 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw } // restart QuestDB - questDBContainer = newQuestDbConnector(); + questDBContainer = newQuestDbContainer(); for (int i = 0; i < 50; i++) { connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}"); } @@ -541,7 +540,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt private static void restartQuestDB() { questDBContainer.stop(); - questDBContainer = newQuestDbConnector(); + questDBContainer = newQuestDbContainer(); } @ParameterizedTest diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 22e24a4..bf325de 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -297,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) { } private static void startConnector() throws IOException, InterruptedException, URISyntaxException { - String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;"; + String confString = "http::addr=questdb:9000;auto_flush_rows=10000;retry_timeout=60000;"; String payload = "{\"name\":\"my-connector\",\"config\":{" + "\"tasks.max\":\"4\"," +