Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: decrease latency when some rows are locally buffered #17

Merged
merged 2 commits into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
public static final String CONFIGURATION_STRING_CONFIG = "client.conf.string";
public static final String CONFIGURATION_STRING_DOC = "Configuration string for QuestDB client";

public static final String ALLOWED_LAG_CONFIG = "allowed.lag";
public static final String ALLOWED_LAG_DOC = "The maximum lag in milliseconds allowed for the connector to keep buffered data in memory " +
"if there are no new records in Kafka topics. Higher lag allows more batching and improves throughput, but increase the time " +
"it takes to detect new data in Kafka topics. Low lag reduces the time it takes to detect new data in Kafka topics, but may " +
"reduce throughput. The default value is 1000 ms.";

public static final String RETRY_BACKOFF_MS = "retry.backoff.ms";
private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made";

Expand Down Expand Up @@ -104,13 +110,18 @@ public static ConfigDef conf() {
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC)
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC);
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC)
.define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC);
}

public Password getConfigurationString() {
return getPassword(CONFIGURATION_STRING_CONFIG);
}

public int getAllowedLag() {
return getInt(ALLOWED_LAG_CONFIG);
}

public String getTlsValidationMode() {
return getString(TLS_VALIDATION_MODE_CONFIG).toLowerCase(Locale.ENGLISH);
}
Expand Down
23 changes: 22 additions & 1 deletion connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class QuestDBSinkTask extends SinkTask {
private DateFormat dataFormat;
private boolean kafkaTimestampsEnabled;
private boolean httpTransport;
private int allowedLag;

@Override
public String version() {
Expand Down Expand Up @@ -76,6 +77,7 @@ public void start(Map<String, String> map) {
this.timestampColumnName = config.getDesignatedTimestampColumnName();
this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative();
this.timestampUnits = config.getTimestampUnitsOrNull();
this.allowedLag = config.getAllowedLag();
}

private Sender createRawSender() {
Expand Down Expand Up @@ -123,9 +125,28 @@ private Sender createSender() {
@Override
public void put(Collection<SinkRecord> collection) {
if (collection.isEmpty()) {
log.debug("Received empty collection, ignoring");
if (httpTransport) {
log.debug("Received empty collection, let's flush the buffer");
// Ok, there are no new records to send. Let's flush! Why?
// We do not want locally buffered row to be stuck in the buffer for too long. Increases latency
// between the time the record is produced and the time it is visible in QuestDB.
// If the local buffer is empty then flushing is a cheap no-op.
try {
sender.flush();
} catch (LineSenderException | HttpClientException e) {
onSenderException(e);
}
} else {
log.debug("Received empty collection, nothing to do");
}
return;
} if (httpTransport) {
// there are some records to send. good.
// let's set a timeout so Kafka Connect will call us again in time
// even if there are no new records to send. this gives us a chance to flush the buffer.
context.timeout(allowedLag);
}

if (log.isDebugEnabled()) {
SinkRecord record = collection.iterator().next();
log.debug("Received {} records. First record kafka coordinates:({}-{}-{}). ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class QuestDBSinkConnectorIT {
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush_rows=1;")
.withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush=off;") // intentionally disabled auto-flush
.withNetwork(network)
.withExposedPorts(8083)
.withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar")
Expand Down Expand Up @@ -97,6 +97,7 @@ public void test() throws Exception {
.with("key.converter", "org.apache.kafka.connect.storage.StringConverter")
.with("value.converter", "org.apache.kafka.connect.storage.StringConverter")
.with("topics", topicName);
// ilp client conf string set via environment variable

connectContainer.registerConnector("my-connector", connector);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ public class DebeziumIT {
private final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0")
.withNetwork(network)
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
.withEnv("QDB_CAIRO_COMMIT_LAG", "100")
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")));


private ConnectorConfiguration newQuestSinkBaseConfig(String questTableName) {
String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;auto_flush_rows=1000;";
String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;";
return ConnectorConfiguration.create()
.with("connector.class", QuestDBSinkConnector.class.getName())
.with("client.conf.string", confString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ private static GenericContainer<?> newConnectContainer(int id) {
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka0:9092")
.withEnv("CONNECT_GROUP_ID", "test")
.withEnv("CONNECT_OFFSET_FLUSH_INTERVAL_MS", "5000")
.withEnv("CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY", "All")
.withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic")
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic")
.withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic")
Expand Down Expand Up @@ -298,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) {
}

private static void startConnector() throws IOException, InterruptedException, URISyntaxException {
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=1200000;retry_timeout=60000;";
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;";

String payload = "{\"name\":\"my-connector\",\"config\":{" +
"\"tasks.max\":\"4\"," +
Expand Down
Loading