Skip to content

Commit

Permalink
use flushing parameters from client.conf.string
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed May 24, 2024
1 parent 7b387b6 commit c639737
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.questdb.std.str.StringSink;
import org.apache.kafka.common.config.ConfigException;

import java.util.concurrent.TimeUnit;

final class ClientConfUtils {
private ClientConfUtils() {
}
Expand Down Expand Up @@ -53,7 +55,7 @@ static boolean patchConfStr(String confStr, StringSink sink, FlushConfig flushCo
throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_interval disabled");
}
try {
flushConfig.autoFlushNanos = Numbers.parseLong(tmpSink);
flushConfig.autoFlushNanos = TimeUnit.MILLISECONDS.toNanos(Numbers.parseLong(tmpSink));
} catch (NumericException e) {
throw new ConfigException("Invalid auto_flush_interval value [auto_flush_interval=" + tmpSink + ']');
}
Expand Down
2 changes: 1 addition & 1 deletion connector/src/main/java/io/questdb/kafka/FlushConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.util.concurrent.TimeUnit;

class FlushConfig {
final class FlushConfig {
int autoFlushRows;
long autoFlushNanos;

Expand Down
19 changes: 10 additions & 9 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public final class QuestDBSinkTask extends SinkTask {
private static final char STRUCT_FIELD_SEPARATOR = '_';
private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key";
private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value";
private static final long FLUSH_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);

private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class);
private Sender sender;
Expand All @@ -46,8 +45,7 @@ public final class QuestDBSinkTask extends SinkTask {
private int allowedLag;
private long nextFlushNanos;
private int pendingRows;
private final int maxPendingRows = 75_000;
private FlushConfig flushConfig = new FlushConfig();
private final FlushConfig flushConfig = new FlushConfig();

@Override
public String version() {
Expand Down Expand Up @@ -84,7 +82,7 @@ public void start(Map<String, String> map) {
this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative();
this.timestampUnits = config.getTimestampUnitsOrNull();
this.allowedLag = config.getAllowedLag();
this.nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
}

private Sender createRawSender() {
Expand All @@ -98,9 +96,12 @@ private Sender createRawSender() {
log.debug("Using client configuration string");
StringSink sink = new StringSink();
httpTransport = ClientConfUtils.patchConfStr(confStr, sink, flushConfig);
if (!httpTransport) {
log.info("Using TCP transport, consider using HTTP transport for improved fault tolerance and error handling");
}
return Sender.fromConfig(sink);
}
log.debug("Using legacy client configuration");
log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.io/docs/third-party-tools/kafka/questdb-kafka/#configuration-options");
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost());
if (config.isTls()) {
builder.enableTls();
Expand Down Expand Up @@ -159,9 +160,9 @@ public void put(Collection<SinkRecord> collection) {
}

if (httpTransport) {
if (pendingRows >= maxPendingRows) {
if (pendingRows >= flushConfig.autoFlushRows) {
log.debug("Flushing data to QuestDB due to auto_flush_rows limit [pending-rows={}, max-pending-rows={}]",
pendingRows, maxPendingRows);
pendingRows, flushConfig.autoFlushRows);
flushAndResetCounters();
} else {
long remainingNanos = nextFlushNanos - System.nanoTime();
Expand Down Expand Up @@ -205,7 +206,7 @@ private void flushAndResetCounters() {
log.debug("Flushing data to QuestDB");
try {
sender.flush();
nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
} catch (LineSenderException | HttpClientException e) {
onSenderException(e);
Expand All @@ -215,7 +216,7 @@ private void flushAndResetCounters() {
private void onSenderException(Exception e) {
if (httpTransport) {
closeSenderSilently();
nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
throw new ConnectException("Failed to send data to QuestDB", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ private static void assertConfStringIsPatched(String confStr, String expectedPat
ClientConfUtils.patchConfStr(confStr, sink, flushConfig);

Assert.assertEquals(expectedPatchedConfStr, sink.toString());
Assert.assertEquals(expectedMaxPendingRows, flushConfig.autoFlushRows);
Assert.assertEquals(expectedFlushNanos, flushConfig.autoFlushNanos);
}

private static void assertConfStringIsNotPatched(String confStr) {
Expand Down
10 changes: 5 additions & 5 deletions connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ static Map<String, String> baseConnectorProps(GenericContainer<?> questDBContain
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());

String confString;
if (!useHttp) {
String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT);
props.put("host", ilpIUrl);
} else {
if (useHttp) {
int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT);
confString = "http::addr=" + host + ":" + port + ";";
confString = "http::addr=" + host + ":" + port + ";auto_flush_interval=20000;";
props.put("client.conf.string", confString);
} else {
String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT);
props.put("host", ilpIUrl);
}
return props;
}
Expand Down

0 comments on commit c639737

Please sign in to comment.