Skip to content

Commit

Permalink
configurable lag
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed Apr 6, 2024
1 parent cabd273 commit df38d66
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
public static final String CONFIGURATION_STRING_CONFIG = "client.conf.string";
public static final String CONFIGURATION_STRING_DOC = "Configuration string for QuestDB client";

public static final String ALLOWED_LAG_CONFIG = "allowed.lag";
public static final String ALLOWED_LAG_DOC = "The maximum lag in milliseconds allowed for the connector to keep buffered data in memory " +
"if there are no new records in Kafka topics. Higher lag allows more batching and improves throughput, but increase the time " +
"it takes to detect new data in Kafka topics. Low lag reduces the time it takes to detect new data in Kafka topics, but may " +
"reduce throughput. The default value is 1000 ms.";

public static final String RETRY_BACKOFF_MS = "retry.backoff.ms";
private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made";

Expand Down Expand Up @@ -104,13 +110,18 @@ public static ConfigDef conf() {
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC)
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC);
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC)
.define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC);
}

public Password getConfigurationString() {
return getPassword(CONFIGURATION_STRING_CONFIG);
}

public int getAllowedLag() {
return getInt(ALLOWED_LAG_CONFIG);
}

public String getTlsValidationMode() {
return getString(TLS_VALIDATION_MODE_CONFIG).toLowerCase(Locale.ENGLISH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class QuestDBSinkTask extends SinkTask {
private DateFormat dataFormat;
private boolean kafkaTimestampsEnabled;
private boolean httpTransport;
private int allowedLag;

@Override
public String version() {
Expand Down Expand Up @@ -76,6 +77,7 @@ public void start(Map<String, String> map) {
this.timestampColumnName = config.getDesignatedTimestampColumnName();
this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative();
this.timestampUnits = config.getTimestampUnitsOrNull();
this.allowedLag = config.getAllowedLag();
}

private Sender createRawSender() {
Expand Down Expand Up @@ -142,7 +144,7 @@ public void put(Collection<SinkRecord> collection) {
// there are some records to send. good.
// let's set a timeout so Kafka Connect will call us again in time
// even if there are no new records to send. this gives us a chance to flush the buffer.
context.timeout(1000);
context.timeout(allowedLag);
}

if (log.isDebugEnabled()) {
Expand Down

0 comments on commit df38d66

Please sign in to comment.