Skip to content

Commit

Permalink
support for loading conf string from env. variables
Browse files Browse the repository at this point in the history
also: mark conf string as secret so it's not logged
  • Loading branch information
jerrinot committed Apr 4, 2024
1 parent 8c7087f commit 4761d5b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ public static ConfigDef conf() {
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC)
.define(CONFIGURATION_STRING_CONFIG, Type.STRING, null, Importance.HIGH, CONFIGURATION_STRING_DOC);
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC);
}

public String getConfigurationString() {
return getString(CONFIGURATION_STRING_CONFIG);
public Password getConfigurationString() {
return getPassword(CONFIGURATION_STRING_CONFIG);
}

public String getTlsValidationMode() {
Expand Down
10 changes: 8 additions & 2 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.questdb.std.datetime.millitime.DateFormatUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -79,13 +80,18 @@ public void start(Map<String, String> map) {

private Sender createRawSender() {
log.debug("Creating a new sender");
String confStr = config.getConfigurationString();
Password confStrSecret = config.getConfigurationString();
String confStr = confStrSecret == null ? null : confStrSecret.value();
if (confStr == null || confStr.isEmpty()) {
confStr = System.getenv("QDB_CLIENT_CONF");
}
if (confStr != null && !confStr.isEmpty()) {
log.debug("Using configuration string: {}", confStr);
log.debug("Using client configuration string");
Sender s = Sender.fromConfig(confStr);
httpTransport = s instanceof LineHttpSender;
return s;
}
log.debug("Using legacy client configuration");
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost());
if (config.isTls()) {
builder.enableTls();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class QuestDBSinkConnectorIT {
@Container
private static final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0")
.withNetwork(network)
.withNetworkAliases("questdb")
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")));

Expand All @@ -65,6 +66,7 @@ public class QuestDBSinkConnectorIT {
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
.withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush_rows=1;")
.withNetwork(network)
.withExposedPorts(8083)
.withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar")
Expand Down Expand Up @@ -94,8 +96,7 @@ public void test() throws Exception {
.with("tasks.max", "1")
.with("key.converter", "org.apache.kafka.connect.storage.StringConverter")
.with("value.converter", "org.apache.kafka.connect.storage.StringConverter")
.with("topics", topicName)
.with("client.conf.string", "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":" + QuestDBUtils.QUESTDB_HTTP_PORT + ";auto_flush_rows=1");
.with("topics", topicName);

connectContainer.registerConnector("my-connector", connector);

Expand Down

0 comments on commit 4761d5b

Please sign in to comment.