diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index be12cfe..44f21ec 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -64,6 +64,12 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { public static final String CONFIGURATION_STRING_CONFIG = "client.conf.string"; public static final String CONFIGURATION_STRING_DOC = "Configuration string for QuestDB client"; + public static final String ALLOWED_LAG_CONFIG = "allowed.lag"; + public static final String ALLOWED_LAG_DOC = "The maximum lag in milliseconds allowed for the connector to keep buffered data in memory " + + "if there are no new records in Kafka topics. Higher lag allows more batching and improves throughput, but increase the time " + + "it takes to detect new data in Kafka topics. Low lag reduces the time it takes to detect new data in Kafka topics, but may " + + "reduce throughput. The default value is 1000 ms."; + public static final String RETRY_BACKOFF_MS = "retry.backoff.ms"; private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made"; @@ -104,13 +110,18 @@ public static ConfigDef conf() { .define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC) .define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC) .define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC) - .define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC); + .define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC) + .define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC); } public Password getConfigurationString() { return getPassword(CONFIGURATION_STRING_CONFIG); } + public int getAllowedLag() { + return getInt(ALLOWED_LAG_CONFIG); + } + public String getTlsValidationMode() { return getString(TLS_VALIDATION_MODE_CONFIG).toLowerCase(Locale.ENGLISH); } diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 8542bb5..12f9ce1 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -42,6 +42,7 @@ public final class QuestDBSinkTask extends SinkTask { private DateFormat dataFormat; private boolean kafkaTimestampsEnabled; private boolean httpTransport; + private int allowedLag; @Override public String version() { @@ -76,6 +77,7 @@ public void start(Map map) { this.timestampColumnName = config.getDesignatedTimestampColumnName(); this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative(); this.timestampUnits = config.getTimestampUnitsOrNull(); + this.allowedLag = config.getAllowedLag(); } private Sender createRawSender() { @@ -123,9 +125,28 @@ private Sender createSender() { @Override public void put(Collection collection) { if (collection.isEmpty()) { - log.debug("Received empty collection, ignoring"); + if (httpTransport) { + log.debug("Received empty collection, let's flush the buffer"); + // Ok, there are no new records to send. Let's flush! Why? + // We do not want locally buffered row to be stuck in the buffer for too long. Increases latency + // between the time the record is produced and the time it is visible in QuestDB. + // If the local buffer is empty then flushing is a cheap no-op. + try { + sender.flush(); + } catch (LineSenderException | HttpClientException e) { + onSenderException(e); + } + } else { + log.debug("Received empty collection, nothing to do"); + } return; + } if (httpTransport) { + // there are some records to send. good. + // let's set a timeout so Kafka Connect will call us again in time + // even if there are no new records to send. this gives us a chance to flush the buffer. + context.timeout(allowedLag); } + if (log.isDebugEnabled()) { SinkRecord record = collection.iterator().next(); log.debug("Received {} records. First record kafka coordinates:({}-{}-{}). ", diff --git a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java index fae941b..f4b67c7 100644 --- a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java +++ b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java @@ -66,7 +66,7 @@ public class QuestDBSinkConnectorIT { .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1") .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1") .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1") - .withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush_rows=1;") + .withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush=off;") // intentionally disabled auto-flush .withNetwork(network) .withExposedPorts(8083) .withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar") @@ -97,6 +97,7 @@ public void test() throws Exception { .with("key.converter", "org.apache.kafka.connect.storage.StringConverter") .with("value.converter", "org.apache.kafka.connect.storage.StringConverter") .with("topics", topicName); + // ilp client conf string set via environment variable connectContainer.registerConnector("my-connector", connector); diff --git a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java index 17ecb7c..327a3a5 100644 --- a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java +++ b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java @@ -70,13 +70,11 @@ public class DebeziumIT { private final GenericContainer questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0") .withNetwork(network) .withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) - .withEnv("QDB_CAIRO_COMMIT_LAG", "100") - .withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI"); + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))); private ConnectorConfiguration newQuestSinkBaseConfig(String questTableName) { - String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;auto_flush_rows=1000;"; + String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;"; return ConnectorConfiguration.create() .with("connector.class", QuestDBSinkConnector.class.getName()) .with("client.conf.string", confString) diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 009ac11..22e24a4 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -172,7 +172,6 @@ private static GenericContainer newConnectContainer(int id) { .withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka0:9092") .withEnv("CONNECT_GROUP_ID", "test") .withEnv("CONNECT_OFFSET_FLUSH_INTERVAL_MS", "5000") - .withEnv("CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY", "All") .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic") .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic") .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic") @@ -298,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) { } private static void startConnector() throws IOException, InterruptedException, URISyntaxException { - String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=1200000;retry_timeout=60000;"; + String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;"; String payload = "{\"name\":\"my-connector\",\"config\":{" + "\"tasks.max\":\"4\"," +