diff --git a/README.md b/README.md
index 1ab4f6fb..27d98c75 100644
--- a/README.md
+++ b/README.md
@@ -638,24 +638,24 @@ Example:
authorized-kafka-user-password
```
-###### `SCRAM-256`
+###### `SCRAM_256`
Example:
```xml
true
-SCRAM-256
+SCRAM_256
authorized-kafka-usee
authorized-kafka-user-password
```
-###### `SCRAM-512`
+###### `SCRAM_512`
Example:
```xml
true
-SCRAM-512
+SCRAM_512
authorized-kafka-username
authorized-kafka-username-password
```
diff --git a/kafka-connector-project/kafka-connector-utils/build.gradle b/kafka-connector-project/kafka-connector-utils/build.gradle
index ba8494f7..4b52dfbf 100644
--- a/kafka-connector-project/kafka-connector-utils/build.gradle
+++ b/kafka-connector-project/kafka-connector-utils/build.gradle
@@ -40,5 +40,5 @@ task distribuiteConsumer(type: Copy) {
}
clean {
- delete "$rootDir/$consumerDeployDirName"
+ delete "$rootDir/$consumerDeployDirName"
}
diff --git a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/config/BrokerAuthenticationConfigs.java b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/config/BrokerAuthenticationConfigs.java
index 70e44ccb..8d975fd0 100644
--- a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/config/BrokerAuthenticationConfigs.java
+++ b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/config/BrokerAuthenticationConfigs.java
@@ -72,7 +72,7 @@ static Properties addAuthentication(ConnectorConfig config) {
NonNullKeyProperties props = new NonNullKeyProperties();
if (config.isAuthenticationEnabled()) {
SaslMechanism mechanism = config.authenticationMechanism();
- props.setProperty(SaslConfigs.SASL_MECHANISM, mechanism.toString());
+ props.setProperty(SaslConfigs.SASL_MECHANISM, mechanism.toProperty());
props.setProperty(SaslConfigs.SASL_JAAS_CONFIG, configuredWith(config));
if (config.isGssapiEnabled()) {
props.setProperty(
diff --git a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/config/specs/ConfigTypes.java b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/config/specs/ConfigTypes.java
index 9032f51d..9e6a5fb0 100644
--- a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/config/specs/ConfigTypes.java
+++ b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka_connector/adapters/config/specs/ConfigTypes.java
@@ -112,6 +112,11 @@ public String loginModule() {
public String loginModule() {
return "org.apache.kafka.common.security.scram.ScramLoginModule";
}
+
+ @Override
+ public String toProperty() {
+ return "SCRAM-SHA-256";
+ }
},
SCRAM_512 {
@@ -119,6 +124,11 @@ public String loginModule() {
public String loginModule() {
return "org.apache.kafka.common.security.scram.ScramLoginModule";
}
+
+ @Override
+ public String toProperty() {
+ return "SCRAM-SHA-512";
+ }
},
GSSAPI {
@@ -135,6 +145,10 @@ public static Set names() {
public String loginModule() {
return "";
}
+
+ public String toProperty() {
+ return toString();
+ }
}
public enum RecordComsumeFrom {
diff --git a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka_connector/adapters/config/ConnectorConfigTest.java b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka_connector/adapters/config/ConnectorConfigTest.java
index 89c59879..538d92ed 100644
--- a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka_connector/adapters/config/ConnectorConfigTest.java
+++ b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka_connector/adapters/config/ConnectorConfigTest.java
@@ -57,7 +57,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-import java.util.stream.Stream;
public class ConnectorConfigTest {
@@ -1205,10 +1204,7 @@ public void shouldGetDefaultAuthenticationSettings() {
@Test
public void shouldOverrideAuthenticationSettings() {
// Sasl mechanisms under test
- List mechanisms =
- Stream.of(SaslMechanism.SCRAM_256, SaslMechanism.SCRAM_512)
- .map(Object::toString)
- .toList();
+ List mechanisms = List.of(SaslMechanism.SCRAM_256, SaslMechanism.SCRAM_512);
for (boolean encrypted : List.of(true, false)) {
Map updatedConfig = new HashMap<>(standardParameters());
@@ -1217,13 +1213,14 @@ public void shouldOverrideAuthenticationSettings() {
if (encrypted) {
updatedConfig.putAll(encryptionParameters());
}
- for (String mechanism : mechanisms) {
- updatedConfig.put(BrokerAuthenticationConfigs.SASL_MECHANISM, mechanism);
+ for (SaslMechanism mechanism : mechanisms) {
+ updatedConfig.put(BrokerAuthenticationConfigs.SASL_MECHANISM, mechanism.toString());
ConnectorConfig config =
ConnectorConfig.newConfig(adapterDir.toFile(), updatedConfig);
assertThat(config.isAuthenticationEnabled()).isTrue();
- assertThat(config.authenticationMechanism().toString()).isEqualTo(mechanism);
+ assertThat(config.authenticationMechanism().toString())
+ .isEqualTo(mechanism.toString());
assertThat(config.authenticationUsername()).isEqualTo("sasl-username");
assertThat(config.authenticationPassword()).isEqualTo("sasl-password");
@@ -1235,7 +1232,7 @@ public void shouldOverrideAuthenticationSettings() {
? SecurityProtocol.SASL_SSL.toString()
: SecurityProtocol.SASL_PLAINTEXT.toString(),
SaslConfigs.SASL_MECHANISM,
- mechanism,
+ mechanism.toProperty(),
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required username='sasl-username' password='sasl-password';");
}