diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java index 66fc4a6..93f6f10 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java @@ -49,9 +49,46 @@ public ConfigDef config() { public Config validate(Map connectorConfigs) { String s = connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG); if (Boolean.parseBoolean(s) && connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG) != null) { - throw new IllegalArgumentException("Cannot use " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG - + " with " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +". These options are mutually exclusive."); + throw new IllegalArgumentException("Cannot use '" + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG + + "' with '" + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +"'. These options are mutually exclusive."); } + + validateClientConfiguration(connectorConfigs); return super.validate(connectorConfigs); } + + private static void validateClientConfiguration(Map connectorConfigs) { + String host = connectorConfigs.get(QuestDBSinkConnectorConfig.HOST_CONFIG); + String confString = connectorConfigs.get(QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG); + String envConfString = System.getenv("QDB_CLIENT_CONF"); + + // cannot set client configuration string via both explicit config and environment variable + if (confString != null && envConfString != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' or QDB_CLIENT_CONF environment variable must be set. They cannot be used together."); + } + + if (confString == null && envConfString == null) { + if (host == null) { + throw new IllegalArgumentException("Either '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' or '" + QuestDBSinkConnectorConfig.HOST_CONFIG + "' must be set."); + } + return; // configuration string is not used, nothing else to validate + } + + // configuration string is used, let's validate no other client configuration is set + if (host != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.HOST_CONFIG + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + if (connectorConfigs.get(QuestDBSinkConnectorConfig.TLS) != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TLS + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + if (connectorConfigs.get(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG) != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + if (connectorConfigs.get(QuestDBSinkConnectorConfig.TOKEN) != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TOKEN + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + if (connectorConfigs.get(QuestDBSinkConnectorConfig.USERNAME) != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.USERNAME + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + } } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java index 4ce761b..b253a5e 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java @@ -19,6 +19,43 @@ public class QuestDBSinkConnectorConfigTest { + @Test + public void testClientConfigurationStringCannotBeCombinedWithExplicitClientConfig() { + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.HOST_CONFIG, "localhost"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.USERNAME, "joe"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TOKEN, "secret"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS, "true"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS, "false"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "default"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "insecure"); + } + + @Test + public void testEitherHostOrClientConfigStringMustBeSet() { + Map config = baseConnectorProps(); + QuestDBSinkConnector connector = new QuestDBSinkConnector(); + try { + connector.validate(config); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("Either 'client.conf.string' or 'host' must be set.", e.getMessage()); + } + } + + private void assertCannotBeSetTogetherWithConfigString(String configKey, String configValue) { + Map config = baseConnectorProps(); + config.put(QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG, "http::addr=localhost;"); + config.put(configKey, configValue); + + QuestDBSinkConnector connector = new QuestDBSinkConnector(); + try { + connector.validate(config); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("Only one of '" + configKey + "' or 'client.conf.string' must be set.", e.getMessage()); + } + } + @Test public void testTimeunitsValidator() { ConfigDef conf = QuestDBSinkConnectorConfig.conf(); @@ -52,6 +89,7 @@ public void testTimeunitsRecommender() { public void testTlsConfig() { ConfigDef confDef = QuestDBSinkConnectorConfig.conf(); Map config = baseConnectorProps(); + config.put("client.conf.string", "http::addr=localhost;tls=true"); config.put("tls", "true"); QuestDBSinkConnectorConfig sinkConnectorConfig = new QuestDBSinkConnectorConfig(confDef, config); @@ -80,18 +118,21 @@ public void testTlsValidationModeValidation() { public void testExplicitTablenameValidation() { ConfigDef confDef = QuestDBSinkConnectorConfig.conf(); Map config = baseConnectorProps(); + config.put("client.conf.string", "http::addr=localhost;tls=true"); // positive case I - valid explicit table name ConfigValue configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get(); assertTrue(configValue.errorMessages().isEmpty()); // positive case II - missing explicit table name config = baseConnectorProps(); + config.put("client.conf.string", "http::addr=localhost;tls=true"); config.remove(QuestDBSinkConnectorConfig.TABLE_CONFIG); configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get(); assertTrue(configValue.errorMessages().isEmpty()); // negative case - invalid characters in explicit table name config = baseConnectorProps(); + config.put("client.conf.string", "http::addr=localhost;tls=true"); config.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "not?valid"); configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get(); assertEquals(1, configValue.errorMessages().size()); @@ -104,7 +145,6 @@ private Map baseConnectorProps() { props.put("topics", "myTopic"); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - props.put("host", "localhost"); return props; } } \ No newline at end of file diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index e3c6158..bbcaddc 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -617,7 +617,7 @@ public void testKafkaNativeTimestampsAndExplicitDesignatedFieldTimestampMutually connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); fail("Expected ConnectException"); } catch (ConnectException e) { - assertThat(e.getMessage(), containsString("timestamp.field.name with timestamp.kafka.native")); + assertThat(e.getMessage(), containsString("'timestamp.field.name' with 'timestamp.kafka.native'")); } }