diff --git a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/config/ConnectorConfig.java b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/config/ConnectorConfig.java index bc6276d7..5153e1f3 100644 --- a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/config/ConnectorConfig.java +++ b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/config/ConnectorConfig.java @@ -29,6 +29,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.FETCH_MAX_BYTES_CONFIG; @@ -58,7 +59,9 @@ import com.lightstreamer.kafka.common.config.FieldConfigs; import com.lightstreamer.kafka.common.config.TopicConfigurations.ItemTemplateConfigs; import com.lightstreamer.kafka.common.config.TopicConfigurations.TopicMappingConfig; +import com.lightstreamer.kafka.common.utils.Split; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; import java.io.File; @@ -71,6 +74,8 @@ public final class ConnectorConfig extends AbstractConfig { + static final String LIGHSTREAMER_CLIENT_ID = "cwc|5795fea5-2ddf-41c7-b44c-c6cb0982d7b|"; + public static final String ENABLE = "enable"; public static final String DATA_ADAPTER_NAME = "data_provider.name"; @@ -112,6 +117,8 @@ public final class ConnectorConfig extends AbstractConfig { // Kafka consumer specific settings private static final String CONNECTOR_PREFIX = "consumer."; public static final String RECORD_CONSUME_FROM = "record.consume.from"; + public static final String CONSUMER_CLIENT_ID = + CONNECTOR_PREFIX + CommonClientConfigs.CLIENT_ID_CONFIG; public static final String CONSUMER_ENABLE_AUTO_COMMIT_CONFIG = CONNECTOR_PREFIX + ENABLE_AUTO_COMMIT_CONFIG; public static final String CONSUMER_FETCH_MIN_BYTES_CONFIG = @@ -215,6 +222,27 @@ public final class ConnectorConfig extends AbstractConfig { false, CONSUME_FROM, defaultValue(RecordComsumeFrom.LATEST.toString())) + .add( + CONSUMER_CLIENT_ID, + false, + false, + TEXT, + false, + defaultValue( + params -> { + String hostList = params.get(BOOTSTRAP_SERVERS); + if (hostList == null) { + return ""; + } + if (Split.byComma(hostList).stream() + .flatMap(s -> Split.asPair(s, ':').stream()) + .map(p -> p.key()) + .allMatch( + s -> s.endsWith(".confluent.cloud"))) { + return LIGHSTREAMER_CLIENT_ID; + } + return ""; + })) .add( CONSUMER_ENABLE_AUTO_COMMIT_CONFIG, false, @@ -319,6 +347,7 @@ private Properties initProps() { NonNullKeyProperties properties = new NonNullKeyProperties(); properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, getHostsList(BOOTSTRAP_SERVERS)); properties.setProperty(GROUP_ID_CONFIG, getText(GROUP_ID)); + properties.setProperty(CLIENT_ID_CONFIG, getText(CONSUMER_CLIENT_ID)); properties.setProperty(METADATA_MAX_AGE_CONFIG, getInt(CONSUMER_METADATA_MAX_AGE_CONFIG)); properties.setProperty(AUTO_OFFSET_RESET_CONFIG, getRecordConsumeFrom().toPropertyValue()); properties.setProperty( diff --git a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/common/utils/Split.java b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/common/utils/Split.java index a279adf5..938a0c94 100644 --- a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/common/utils/Split.java +++ b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/common/utils/Split.java @@ -46,6 +46,18 @@ public static Optional asPair(String splittable) { return Optional.empty(); } + public static Optional asPair(String splittable, char separator) { + List tokens = bySeparator(separator, splittable); + if (tokens.size() == 2) { + String key = tokens.get(0); + String value = tokens.get(1); + if (!(key.isBlank() || value.isBlank())) { + return Optional.of(new Pair(key, value)); + } + } + return Optional.empty(); + } + public static List byComma(String input) { return by(COMMA_WITH_WHITESPACE, input); } diff --git a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java index 0a0c172b..19e82b37 100644 --- a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java +++ b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java @@ -49,6 +49,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; @@ -59,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Stream; public class ConnectorConfigTest { @@ -249,6 +251,14 @@ public void shouldReturnConfigSpec() { assertThat(enableAutoCommit.defaultValue()).isEqualTo("false"); assertThat(enableAutoCommit.type()).isEqualTo(ConfType.BOOL); + ConfParameter clientId = configSpec.getParameter(ConnectorConfig.CONSUMER_CLIENT_ID); + assertThat(clientId.name()).isEqualTo(ConnectorConfig.CONSUMER_CLIENT_ID); + assertThat(clientId.required()).isFalse(); + assertThat(clientId.multiple()).isFalse(); + assertThat(clientId.mutable()).isFalse(); + assertThat(clientId.defaultValue()).isEqualTo(""); + assertThat(clientId.type()).isEqualTo(ConfType.TEXT); + ConfParameter consumeEventsFrom = configSpec.getParameter(ConnectorConfig.RECORD_CONSUME_FROM); assertThat(consumeEventsFrom.name()).isEqualTo(ConnectorConfig.RECORD_CONSUME_FROM); @@ -291,6 +301,7 @@ private Map standardParameters() { standardParams.put(ConnectorConfig.ITEM_INFO_FIELD, "INFO_FIELD"); standardParams.put(ConnectorConfig.ADAPTERS_CONF_ID, "KAFKA"); standardParams.put(ConnectorConfig.DATA_ADAPTER_NAME, "CONNECTOR"); + standardParams.put(ConnectorConfig.CONSUMER_CLIENT_ID, "a.client.id"); // Unmodifiable standardParams.put(ConnectorConfig.CONSUMER_FETCH_MAX_BYTES_CONFIG, "100"); standardParams.put(ConnectorConfig.CONSUMER_FETCH_MAX_WAIT_MS_CONFIG, "200"); standardParams.put(ConnectorConfig.CONSUMER_FETCH_MIN_BYTES_CONFIG, "300"); @@ -463,6 +474,8 @@ public void shouldRetrieveBaseConsumerProperties() { .containsAtLeast( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "server:8080,server:8081", + ConsumerConfig.CLIENT_ID_CONFIG, + "", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, @@ -491,6 +504,48 @@ public void shouldRetrieveBaseConsumerProperties() { .startsWith("KAFKA-CONNECTOR-"); } + static Stream confluentCloudHostList() { + return Stream.of( + "abc-57rr02.mycloudrovider1.confluent.cloud:9092", + "def-437seq1.mycloudrovider2.confluent.cloud:9092,lopc-32wwg15.mycloudrovider2.confluent.cloud:9092"); + } + + @ParameterizedTest + @MethodSource("confluentCloudHostList") + public void shouldRetrieveLightstreamreClientIdWhenConnectedToConfluentClod(String hostList) { + Map updatedConfig = new HashMap<>(standardParameters()); + updatedConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hostList); + ConnectorConfig config = ConnectorConfig.newConfig(adapterDir.toFile(), updatedConfig); + Properties baseConsumerProps = config.baseConsumerProps(); + assertThat(baseConsumerProps) + .containsAtLeast( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + hostList, + ConsumerConfig.CLIENT_ID_CONFIG, + ConnectorConfig.LIGHSTREAMER_CLIENT_ID); + } + + static Stream partialConfluentCloudHostList() { + return Stream.of( + "def-437seq1.mycloudrovider2.my.com:9092,lopc-32wwg15.mycloudrovider2.confluent.cloud1:9092"); + } + + @ParameterizedTest + @MethodSource("partialConfluentCloudHostList") + public void shouldNonRetrieveLightstreamreClientIdWhenNotAllHostConnectedToConfluentClod( + String hostList) { + Map updatedConfig = new HashMap<>(standardParameters()); + updatedConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hostList); + ConnectorConfig config = ConnectorConfig.newConfig(adapterDir.toFile(), updatedConfig); + Properties baseConsumerProps = config.baseConsumerProps(); + assertThat(baseConsumerProps) + .containsAtLeast( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + hostList, + ConsumerConfig.CLIENT_ID_CONFIG, + ""); + } + @Test public void shouldExtendBaseConsumerProperties() { ConnectorConfig config = diff --git a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/common/utils/SplitTest.java b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/common/utils/SplitTest.java index abb365de..0e9aa2a0 100644 --- a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/common/utils/SplitTest.java +++ b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/common/utils/SplitTest.java @@ -94,10 +94,23 @@ void shouldReturnPair(String splittable) { assertThat(Split.asPair(splittable)).hasValue(new Pair("a", "b")); } + @ParameterizedTest + @ValueSource(strings = {"a-b", " a-b ", "a- b ", " a-b", " a - b "}) + void shouldReturnPairWithNonDefaultSeparator(String splittable) { + assertThat(Split.asPair(splittable, '-')).hasValue(new Pair("a", "b")); + } + @ParameterizedTest @NullAndEmptySource @ValueSource(strings = {"a", "a:", " :b ", ": b ", ":", " : "}) void shouldReturnEmptyPair(String splittable) { assertThat(Split.asPair(splittable)).isEmpty(); } + + @ParameterizedTest + @NullAndEmptySource + @ValueSource(strings = {"a", "a-", " -b ", "- b ", "-", " - "}) + void shouldReturnEmptyPairWithNonDefaultSeparator(String splittable) { + assertThat(Split.asPair(splittable, '-')).isEmpty(); + } }