Skip to content

Commit

Permalink
expose TLS validation mode
Browse files Browse the repository at this point in the history
also add a TLS test
  • Loading branch information
jerrinot committed Nov 3, 2023
1 parent 2f573ca commit 42d3c8b
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.errors.ConnectException;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;

public final class QuestDBSinkConnectorConfig extends AbstractConfig {
Expand Down Expand Up @@ -61,6 +58,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
public static final String TLS = "tls";
public static final String TLS_DOC = "Use TLS for connecting to QuestDB";

public static final String TLS_VALIDATION_MODE_CONFIG = "tls.validation.mode";
public static final String TLS_VALIDATION_MODE_DOC = "TLS validation mode. Possible values: default, insecure";

public static final String RETRY_BACKOFF_MS = "retry.backoff.ms";
private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made";

Expand Down Expand Up @@ -109,13 +109,18 @@ public static ConfigDef conf() {
.define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
.define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC);
.define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC)
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC);
}

public long getDeduplicationRewindOffset() {
return getLong(DEDUPLICATION_REWIND_CONFIG);
}

public String getTlsValidationMode() {
return getString(TLS_VALIDATION_MODE_CONFIG).toLowerCase(Locale.ENGLISH);
}

public String getHost() {
return getString(HOST_CONFIG);
}
Expand Down
3 changes: 3 additions & 0 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ private Sender createSender() {
Sender.LineSenderBuilder builder = Sender.builder().address(config.getHost());
if (config.isTls()) {
builder.enableTls();
if ("insecure".equals(config.getTlsValidationMode())) {
builder.advancedTls().disableCertificateValidation();
}
}
if (config.getToken() != null) {
String username = config.getUsername();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.*;

public class QuestDBSinkConnectorConfigTest {

Expand Down Expand Up @@ -50,6 +48,34 @@ public void testTimeunitsRecommender() {
assertEquals(Arrays.asList("auto", "millis", "micros", "nanos"), objects);
}

@Test
public void testTlsConfig() {
ConfigDef confDef = QuestDBSinkConnectorConfig.conf();
Map<String, String> config = baseConnectorProps();
config.put("tls", "true");
QuestDBSinkConnectorConfig sinkConnectorConfig = new QuestDBSinkConnectorConfig(confDef, config);

assertTrue(sinkConnectorConfig.isTls());
}

@Test
public void testTlsValidationModeValidation() {
ConfigDef conf = QuestDBSinkConnectorConfig.conf();
ConfigDef.ConfigKey configKey = conf.configKeys().get(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG);

// positive cases
configKey.validator.ensureValid(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "default");
configKey.validator.ensureValid(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "insecure");

// negative cases
try {
configKey.validator.ensureValid(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "foo");
fail("Expected ConfigException");
} catch (ConfigException e) {
assertEquals("Invalid value foo for configuration tls.validation.mode: String must be one of: default, insecure", e.getMessage());
}
}

@Test
public void testExplicitTablenameValidation() {
ConfigDef confDef = QuestDBSinkConnectorConfig.conf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
Expand All @@ -33,19 +38,40 @@ public class QuestDBSinkConnectorEmbeddedAuthTest {
private static final String TEST_USER_TOKEN = "UvuVb1USHGRRT08gEnwN2zGZrvM4MsLQ5brgF6SVkAw=";
private static final String TEST_USER_NAME = "testUser1";

private final static Network network = Network.newNetwork();

@Container
private static final GenericContainer<?> questDBContainer = newQuestDbConnector();
@Container
private static GenericContainer<?> questDBContainer = newQuestDbConnector();
private static final GenericContainer<?> tlsProxy = newTlsProxyContainer();

private static GenericContainer<?> newQuestDbConnector() {
FixedHostPortGenericContainer<?> container = new FixedHostPortGenericContainer<>("questdb/questdb:7.3");
FixedHostPortGenericContainer<?> container = new FixedHostPortGenericContainer<>("questdb/questdb:7.3.3");
container.addExposedPort(QuestDBUtils.QUESTDB_HTTP_PORT);
container.addExposedPort(QuestDBUtils.QUESTDB_ILP_PORT);
container.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main enjoy.*"));
container.withCopyFileToContainer(MountableFile.forClasspathResource("/authDb.txt"), "/var/lib/questdb/conf/authDb.txt");
container.withEnv("QDB_LINE_TCP_AUTH_DB_PATH", "conf/authDb.txt");
container.withCopyFileToContainer(MountableFile.forClasspathResource("/authDb.txt"), "/var/lib/questdb/conf/authDb.txt")
.withEnv("QDB_LINE_TCP_AUTH_DB_PATH", "conf/authDb.txt")
.withNetwork(network)
.withNetworkAliases("questdb");
return container.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")));
}

public static GenericContainer<?> newTlsProxyContainer() {
return new GenericContainer<>("hitch")
.withCommand("--backend=[questdb]:9009 --write-proxy-v2=off")
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("hitch")))
.withExposedPorts(443)
.withNetwork(network)
.dependsOn(questDBContainer)
.waitingFor(new HostPortWaitStrategy().forPorts(443));
}

@AfterAll
public static void stopTLS() {
tlsProxy.stop();
}

@BeforeEach
public void setUp() {
topicName = ConnectTestUtils.newTopicName();
Expand All @@ -60,13 +86,26 @@ public void setUp() {
connect.start();
}

@Test
public void testSmoke() {
@AfterEach
public void tearDown() {
connect.stop();
}

@ParameterizedTest(name = "useTls = {0}")
@ValueSource(booleans = {false, true})
public void testSmoke(boolean useTls) {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName);
props.put(QuestDBSinkConnectorConfig.USERNAME, TEST_USER_NAME);
props.put(QuestDBSinkConnectorConfig.TOKEN, TEST_USER_TOKEN);

if (useTls) {
props.put(QuestDBSinkConnectorConfig.TLS, "true");
props.put(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "insecure");
// override the host to point to the TLS proxy
props.put("host", "localhost:" + tlsProxy.getMappedPort(443));
}

connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
Schema schema = SchemaBuilder.struct().name("com.example.Person")
Expand Down

0 comments on commit 42d3c8b

Please sign in to comment.