diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index 73d0710..0ee98e1 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -9,10 +9,7 @@ import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.errors.ConnectException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; public final class QuestDBSinkConnectorConfig extends AbstractConfig { @@ -61,6 +58,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { public static final String TLS = "tls"; public static final String TLS_DOC = "Use TLS for connecting to QuestDB"; + public static final String TLS_VALIDATION_MODE_CONFIG = "tls.validation.mode"; + public static final String TLS_VALIDATION_MODE_DOC = "TLS validation mode. Possible values: default, insecure"; + public static final String RETRY_BACKOFF_MS = "retry.backoff.ms"; private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made"; @@ -109,13 +109,18 @@ public static ConfigDef conf() { .define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC) .define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC) .define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC) - .define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC); + .define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC) + .define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC); } public long getDeduplicationRewindOffset() { return getLong(DEDUPLICATION_REWIND_CONFIG); } + public String getTlsValidationMode() { + return getString(TLS_VALIDATION_MODE_CONFIG).toLowerCase(Locale.ENGLISH); + } + public String getHost() { return getString(HOST_CONFIG); } diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index e7ad8b1..7028c96 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -97,6 +97,9 @@ private Sender createSender() { Sender.LineSenderBuilder builder = Sender.builder().address(config.getHost()); if (config.isTls()) { builder.enableTls(); + if ("insecure".equals(config.getTlsValidationMode())) { + builder.advancedTls().disableCertificateValidation(); + } } if (config.getToken() != null) { String username = config.getUsername(); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java index f1190c0..4ce761b 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java @@ -15,9 +15,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.*; public class QuestDBSinkConnectorConfigTest { @@ -50,6 +48,34 @@ public void testTimeunitsRecommender() { assertEquals(Arrays.asList("auto", "millis", "micros", "nanos"), objects); } + @Test + public void testTlsConfig() { + ConfigDef confDef = QuestDBSinkConnectorConfig.conf(); + Map config = baseConnectorProps(); + config.put("tls", "true"); + QuestDBSinkConnectorConfig sinkConnectorConfig = new QuestDBSinkConnectorConfig(confDef, config); + + assertTrue(sinkConnectorConfig.isTls()); + } + + @Test + public void testTlsValidationModeValidation() { + ConfigDef conf = QuestDBSinkConnectorConfig.conf(); + ConfigDef.ConfigKey configKey = conf.configKeys().get(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG); + + // positive cases + configKey.validator.ensureValid(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "default"); + configKey.validator.ensureValid(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "insecure"); + + // negative cases + try { + configKey.validator.ensureValid(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "foo"); + fail("Expected ConfigException"); + } catch (ConfigException e) { + assertEquals("Invalid value foo for configuration tls.validation.mode: String must be one of: default, insecure", e.getMessage()); + } + } + @Test public void testExplicitTablenameValidation() { ConfigDef confDef = QuestDBSinkConnectorConfig.conf(); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java index dc991c9..9eb6037 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java @@ -8,12 +8,17 @@ import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.LoggerFactory; import org.testcontainers.containers.FixedHostPortGenericContainer; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -33,19 +38,40 @@ public class QuestDBSinkConnectorEmbeddedAuthTest { private static final String TEST_USER_TOKEN = "UvuVb1USHGRRT08gEnwN2zGZrvM4MsLQ5brgF6SVkAw="; private static final String TEST_USER_NAME = "testUser1"; + private final static Network network = Network.newNetwork(); + + @Container + private static final GenericContainer questDBContainer = newQuestDbConnector(); @Container - private static GenericContainer questDBContainer = newQuestDbConnector(); + private static final GenericContainer tlsProxy = newTlsProxyContainer(); private static GenericContainer newQuestDbConnector() { - FixedHostPortGenericContainer container = new FixedHostPortGenericContainer<>("questdb/questdb:7.3"); + FixedHostPortGenericContainer container = new FixedHostPortGenericContainer<>("questdb/questdb:7.3.3"); container.addExposedPort(QuestDBUtils.QUESTDB_HTTP_PORT); container.addExposedPort(QuestDBUtils.QUESTDB_ILP_PORT); container.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main enjoy.*")); - container.withCopyFileToContainer(MountableFile.forClasspathResource("/authDb.txt"), "/var/lib/questdb/conf/authDb.txt"); - container.withEnv("QDB_LINE_TCP_AUTH_DB_PATH", "conf/authDb.txt"); + container.withCopyFileToContainer(MountableFile.forClasspathResource("/authDb.txt"), "/var/lib/questdb/conf/authDb.txt") + .withEnv("QDB_LINE_TCP_AUTH_DB_PATH", "conf/authDb.txt") + .withNetwork(network) + .withNetworkAliases("questdb"); return container.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))); } + public static GenericContainer newTlsProxyContainer() { + return new GenericContainer<>("hitch") + .withCommand("--backend=[questdb]:9009 --write-proxy-v2=off") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("hitch"))) + .withExposedPorts(443) + .withNetwork(network) + .dependsOn(questDBContainer) + .waitingFor(new HostPortWaitStrategy().forPorts(443)); + } + + @AfterAll + public static void stopTLS() { + tlsProxy.stop(); + } + @BeforeEach public void setUp() { topicName = ConnectTestUtils.newTopicName(); @@ -60,13 +86,26 @@ public void setUp() { connect.start(); } - @Test - public void testSmoke() { + @AfterEach + public void tearDown() { + connect.stop(); + } + + @ParameterizedTest(name = "useTls = {0}") + @ValueSource(booleans = {false, true}) + public void testSmoke(boolean useTls) { connect.kafka().createTopic(topicName, 1); Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.USERNAME, TEST_USER_NAME); props.put(QuestDBSinkConnectorConfig.TOKEN, TEST_USER_TOKEN); + if (useTls) { + props.put(QuestDBSinkConnectorConfig.TLS, "true"); + props.put(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "insecure"); + // override the host to point to the TLS proxy + props.put("host", "localhost:" + tlsProxy.getMappedPort(443)); + } + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person")