From c6397379d25b4b9233190409aa657f8c32a3df2c Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 24 May 2024 13:14:07 +0200 Subject: [PATCH] use flushing parameters from client.conf.string --- .../io/questdb/kafka/ClientConfUtils.java | 4 +++- .../java/io/questdb/kafka/FlushConfig.java | 2 +- .../io/questdb/kafka/QuestDBSinkTask.java | 19 ++++++++++--------- .../io/questdb/kafka/ClientConfUtilsTest.java | 2 ++ .../io/questdb/kafka/ConnectTestUtils.java | 10 +++++----- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java index 65f0f1d..17846f7 100644 --- a/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java +++ b/connector/src/main/java/io/questdb/kafka/ClientConfUtils.java @@ -8,6 +8,8 @@ import io.questdb.std.str.StringSink; import org.apache.kafka.common.config.ConfigException; +import java.util.concurrent.TimeUnit; + final class ClientConfUtils { private ClientConfUtils() { } @@ -53,7 +55,7 @@ static boolean patchConfStr(String confStr, StringSink sink, FlushConfig flushCo throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_interval disabled"); } try { - flushConfig.autoFlushNanos = Numbers.parseLong(tmpSink); + flushConfig.autoFlushNanos = TimeUnit.MILLISECONDS.toNanos(Numbers.parseLong(tmpSink)); } catch (NumericException e) { throw new ConfigException("Invalid auto_flush_interval value [auto_flush_interval=" + tmpSink + ']'); } diff --git a/connector/src/main/java/io/questdb/kafka/FlushConfig.java b/connector/src/main/java/io/questdb/kafka/FlushConfig.java index 0670a81..190469d 100644 --- a/connector/src/main/java/io/questdb/kafka/FlushConfig.java +++ b/connector/src/main/java/io/questdb/kafka/FlushConfig.java @@ -2,7 +2,7 @@ import java.util.concurrent.TimeUnit; -class FlushConfig { +final class FlushConfig { int autoFlushRows; long autoFlushNanos; diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 4ea4b49..789140c 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -28,7 +28,6 @@ public final class QuestDBSinkTask extends SinkTask { private static final char STRUCT_FIELD_SEPARATOR = '_'; private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key"; private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value"; - private static final long FLUSH_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1); private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class); private Sender sender; @@ -46,8 +45,7 @@ public final class QuestDBSinkTask extends SinkTask { private int allowedLag; private long nextFlushNanos; private int pendingRows; - private final int maxPendingRows = 75_000; - private FlushConfig flushConfig = new FlushConfig(); + private final FlushConfig flushConfig = new FlushConfig(); @Override public String version() { @@ -84,7 +82,7 @@ public void start(Map map) { this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative(); this.timestampUnits = config.getTimestampUnitsOrNull(); this.allowedLag = config.getAllowedLag(); - this.nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; + this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; } private Sender createRawSender() { @@ -98,9 +96,12 @@ private Sender createRawSender() { log.debug("Using client configuration string"); StringSink sink = new StringSink(); httpTransport = ClientConfUtils.patchConfStr(confStr, sink, flushConfig); + if (!httpTransport) { + log.info("Using TCP transport, consider using HTTP transport for improved fault tolerance and error handling"); + } return Sender.fromConfig(sink); } - log.debug("Using legacy client configuration"); + log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.io/docs/third-party-tools/kafka/questdb-kafka/#configuration-options"); Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost()); if (config.isTls()) { builder.enableTls(); @@ -159,9 +160,9 @@ public void put(Collection collection) { } if (httpTransport) { - if (pendingRows >= maxPendingRows) { + if (pendingRows >= flushConfig.autoFlushRows) { log.debug("Flushing data to QuestDB due to auto_flush_rows limit [pending-rows={}, max-pending-rows={}]", - pendingRows, maxPendingRows); + pendingRows, flushConfig.autoFlushRows); flushAndResetCounters(); } else { long remainingNanos = nextFlushNanos - System.nanoTime(); @@ -205,7 +206,7 @@ private void flushAndResetCounters() { log.debug("Flushing data to QuestDB"); try { sender.flush(); - nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; + nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; pendingRows = 0; } catch (LineSenderException | HttpClientException e) { onSenderException(e); @@ -215,7 +216,7 @@ private void flushAndResetCounters() { private void onSenderException(Exception e) { if (httpTransport) { closeSenderSilently(); - nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS; + nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos; pendingRows = 0; throw new ConnectException("Failed to send data to QuestDB", e); } diff --git a/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java index 8cfbcdd..8a122d5 100644 --- a/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java +++ b/connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java @@ -65,6 +65,8 @@ private static void assertConfStringIsPatched(String confStr, String expectedPat ClientConfUtils.patchConfStr(confStr, sink, flushConfig); Assert.assertEquals(expectedPatchedConfStr, sink.toString()); + Assert.assertEquals(expectedMaxPendingRows, flushConfig.autoFlushRows); + Assert.assertEquals(expectedFlushNanos, flushConfig.autoFlushNanos); } private static void assertConfStringIsNotPatched(String confStr) { diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java index 047a2c2..bda04e6 100644 --- a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -52,13 +52,13 @@ static Map baseConnectorProps(GenericContainer questDBContain props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); String confString; - if (!useHttp) { - String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); - props.put("host", ilpIUrl); - } else { + if (useHttp) { int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); - confString = "http::addr=" + host + ":" + port + ";"; + confString = "http::addr=" + host + ":" + port + ";auto_flush_interval=20000;"; props.put("client.conf.string", confString); + } else { + String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + props.put("host", ilpIUrl); } return props; }