Skip to content

Commit

Permalink
validate client configuration string is not used together with the le…
Browse files Browse the repository at this point in the history
…gacy client config options
  • Loading branch information
jerrinot committed Apr 5, 2024
1 parent 4761d5b commit b5d72eb
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 3 deletions.
41 changes: 39 additions & 2 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,46 @@ public ConfigDef config() {
public Config validate(Map<String, String> connectorConfigs) {
String s = connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG);
if (Boolean.parseBoolean(s) && connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG) != null) {
throw new IllegalArgumentException("Cannot use " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG
+ " with " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +". These options are mutually exclusive.");
throw new IllegalArgumentException("Cannot use '" + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG
+ "' with '" + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +"'. These options are mutually exclusive.");
}

validateClientConfiguration(connectorConfigs);
return super.validate(connectorConfigs);
}

private static void validateClientConfiguration(Map<String, String> connectorConfigs) {
String host = connectorConfigs.get(QuestDBSinkConnectorConfig.HOST_CONFIG);
String confString = connectorConfigs.get(QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG);
String envConfString = System.getenv("QDB_CLIENT_CONF");

// cannot set client configuration string via both explicit config and environment variable
if (confString != null && envConfString != null) {
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' or QDB_CLIENT_CONF environment variable must be set. They cannot be used together.");
}

if (confString == null && envConfString == null) {
if (host == null) {
throw new IllegalArgumentException("Either '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' or '" + QuestDBSinkConnectorConfig.HOST_CONFIG + "' must be set.");
}
return; // configuration string is not used, nothing else to validate
}

// configuration string is used, let's validate no other client configuration is set
if (host != null) {
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.HOST_CONFIG + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
}
if (connectorConfigs.get(QuestDBSinkConnectorConfig.TLS) != null) {
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TLS + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
}
if (connectorConfigs.get(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG) != null) {
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
}
if (connectorConfigs.get(QuestDBSinkConnectorConfig.TOKEN) != null) {
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TOKEN + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
}
if (connectorConfigs.get(QuestDBSinkConnectorConfig.USERNAME) != null) {
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.USERNAME + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,43 @@

public class QuestDBSinkConnectorConfigTest {

@Test
public void testClientConfigurationStringCannotBeCombinedWithExplicitClientConfig() {
assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.HOST_CONFIG, "localhost");
assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.USERNAME, "joe");
assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TOKEN, "secret");
assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS, "true");
assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS, "false");
assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "default");
assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "insecure");
}

@Test
public void testEitherHostOrClientConfigStringMustBeSet() {
Map<String, String> config = baseConnectorProps();
QuestDBSinkConnector connector = new QuestDBSinkConnector();
try {
connector.validate(config);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("Either 'client.conf.string' or 'host' must be set.", e.getMessage());
}
}

private void assertCannotBeSetTogetherWithConfigString(String configKey, String configValue) {
Map<String, String> config = baseConnectorProps();
config.put(QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG, "http::addr=localhost;");
config.put(configKey, configValue);

QuestDBSinkConnector connector = new QuestDBSinkConnector();
try {
connector.validate(config);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("Only one of '" + configKey + "' or 'client.conf.string' must be set.", e.getMessage());
}
}

@Test
public void testTimeunitsValidator() {
ConfigDef conf = QuestDBSinkConnectorConfig.conf();
Expand Down Expand Up @@ -52,6 +89,7 @@ public void testTimeunitsRecommender() {
public void testTlsConfig() {
ConfigDef confDef = QuestDBSinkConnectorConfig.conf();
Map<String, String> config = baseConnectorProps();
config.put("client.conf.string", "http::addr=localhost;tls=true");
config.put("tls", "true");
QuestDBSinkConnectorConfig sinkConnectorConfig = new QuestDBSinkConnectorConfig(confDef, config);

Expand Down Expand Up @@ -80,18 +118,21 @@ public void testTlsValidationModeValidation() {
public void testExplicitTablenameValidation() {
ConfigDef confDef = QuestDBSinkConnectorConfig.conf();
Map<String, String> config = baseConnectorProps();
config.put("client.conf.string", "http::addr=localhost;tls=true");
// positive case I - valid explicit table name
ConfigValue configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get();
assertTrue(configValue.errorMessages().isEmpty());

// positive case II - missing explicit table name
config = baseConnectorProps();
config.put("client.conf.string", "http::addr=localhost;tls=true");
config.remove(QuestDBSinkConnectorConfig.TABLE_CONFIG);
configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get();
assertTrue(configValue.errorMessages().isEmpty());

// negative case - invalid characters in explicit table name
config = baseConnectorProps();
config.put("client.conf.string", "http::addr=localhost;tls=true");
config.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "not?valid");
configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get();
assertEquals(1, configValue.errorMessages().size());
Expand All @@ -104,7 +145,6 @@ private Map<String, String> baseConnectorProps() {
props.put("topics", "myTopic");
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
props.put("host", "localhost");
return props;
}
}

0 comments on commit b5d72eb

Please sign in to comment.