Skip to content

Commit

Permalink
Automatically set the client.id consumer property when connected to C…
Browse files Browse the repository at this point in the history
…onfluent Cloud (#11)
  • Loading branch information
gfinocchiaro authored Sep 5, 2024
1 parent 0a74982 commit 2ceda1f
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.FETCH_MAX_BYTES_CONFIG;
Expand Down Expand Up @@ -58,7 +59,9 @@
import com.lightstreamer.kafka.common.config.FieldConfigs;
import com.lightstreamer.kafka.common.config.TopicConfigurations.ItemTemplateConfigs;
import com.lightstreamer.kafka.common.config.TopicConfigurations.TopicMappingConfig;
import com.lightstreamer.kafka.common.utils.Split;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;

import java.io.File;
Expand All @@ -71,6 +74,8 @@

public final class ConnectorConfig extends AbstractConfig {

static final String LIGHSTREAMER_CLIENT_ID = "cwc|5795fea5-2ddf-41c7-b44c-c6cb0982d7b|";

public static final String ENABLE = "enable";

public static final String DATA_ADAPTER_NAME = "data_provider.name";
Expand Down Expand Up @@ -112,6 +117,8 @@ public final class ConnectorConfig extends AbstractConfig {
// Kafka consumer specific settings
private static final String CONNECTOR_PREFIX = "consumer.";
public static final String RECORD_CONSUME_FROM = "record.consume.from";
public static final String CONSUMER_CLIENT_ID =
CONNECTOR_PREFIX + CommonClientConfigs.CLIENT_ID_CONFIG;
public static final String CONSUMER_ENABLE_AUTO_COMMIT_CONFIG =
CONNECTOR_PREFIX + ENABLE_AUTO_COMMIT_CONFIG;
public static final String CONSUMER_FETCH_MIN_BYTES_CONFIG =
Expand Down Expand Up @@ -215,6 +222,27 @@ public final class ConnectorConfig extends AbstractConfig {
false,
CONSUME_FROM,
defaultValue(RecordComsumeFrom.LATEST.toString()))
.add(
CONSUMER_CLIENT_ID,
false,
false,
TEXT,
false,
defaultValue(
params -> {
String hostList = params.get(BOOTSTRAP_SERVERS);
if (hostList == null) {
return "";
}
if (Split.byComma(hostList).stream()
.flatMap(s -> Split.asPair(s, ':').stream())
.map(p -> p.key())
.allMatch(
s -> s.endsWith(".confluent.cloud"))) {
return LIGHSTREAMER_CLIENT_ID;
}
return "";
}))
.add(
CONSUMER_ENABLE_AUTO_COMMIT_CONFIG,
false,
Expand Down Expand Up @@ -319,6 +347,7 @@ private Properties initProps() {
NonNullKeyProperties properties = new NonNullKeyProperties();
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, getHostsList(BOOTSTRAP_SERVERS));
properties.setProperty(GROUP_ID_CONFIG, getText(GROUP_ID));
properties.setProperty(CLIENT_ID_CONFIG, getText(CONSUMER_CLIENT_ID));
properties.setProperty(METADATA_MAX_AGE_CONFIG, getInt(CONSUMER_METADATA_MAX_AGE_CONFIG));
properties.setProperty(AUTO_OFFSET_RESET_CONFIG, getRecordConsumeFrom().toPropertyValue());
properties.setProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ public static Optional<Pair> asPair(String splittable) {
return Optional.empty();
}

public static Optional<Pair> asPair(String splittable, char separator) {
List<String> tokens = bySeparator(separator, splittable);
if (tokens.size() == 2) {
String key = tokens.get(0);
String value = tokens.get(1);
if (!(key.isBlank() || value.isBlank())) {
return Optional.of(new Pair(key, value));
}
}
return Optional.empty();
}

public static List<String> byComma(String input) {
return by(COMMA_WITH_WHITESPACE, input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
Expand All @@ -59,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;

public class ConnectorConfigTest {

Expand Down Expand Up @@ -249,6 +251,14 @@ public void shouldReturnConfigSpec() {
assertThat(enableAutoCommit.defaultValue()).isEqualTo("false");
assertThat(enableAutoCommit.type()).isEqualTo(ConfType.BOOL);

ConfParameter clientId = configSpec.getParameter(ConnectorConfig.CONSUMER_CLIENT_ID);
assertThat(clientId.name()).isEqualTo(ConnectorConfig.CONSUMER_CLIENT_ID);
assertThat(clientId.required()).isFalse();
assertThat(clientId.multiple()).isFalse();
assertThat(clientId.mutable()).isFalse();
assertThat(clientId.defaultValue()).isEqualTo("");
assertThat(clientId.type()).isEqualTo(ConfType.TEXT);

ConfParameter consumeEventsFrom =
configSpec.getParameter(ConnectorConfig.RECORD_CONSUME_FROM);
assertThat(consumeEventsFrom.name()).isEqualTo(ConnectorConfig.RECORD_CONSUME_FROM);
Expand Down Expand Up @@ -291,6 +301,7 @@ private Map<String, String> standardParameters() {
standardParams.put(ConnectorConfig.ITEM_INFO_FIELD, "INFO_FIELD");
standardParams.put(ConnectorConfig.ADAPTERS_CONF_ID, "KAFKA");
standardParams.put(ConnectorConfig.DATA_ADAPTER_NAME, "CONNECTOR");
standardParams.put(ConnectorConfig.CONSUMER_CLIENT_ID, "a.client.id"); // Unmodifiable
standardParams.put(ConnectorConfig.CONSUMER_FETCH_MAX_BYTES_CONFIG, "100");
standardParams.put(ConnectorConfig.CONSUMER_FETCH_MAX_WAIT_MS_CONFIG, "200");
standardParams.put(ConnectorConfig.CONSUMER_FETCH_MIN_BYTES_CONFIG, "300");
Expand Down Expand Up @@ -463,6 +474,8 @@ public void shouldRetrieveBaseConsumerProperties() {
.containsAtLeast(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"server:8080,server:8081",
ConsumerConfig.CLIENT_ID_CONFIG,
"",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
Expand Down Expand Up @@ -491,6 +504,48 @@ public void shouldRetrieveBaseConsumerProperties() {
.startsWith("KAFKA-CONNECTOR-");
}

static Stream<String> confluentCloudHostList() {
return Stream.of(
"abc-57rr02.mycloudrovider1.confluent.cloud:9092",
"def-437seq1.mycloudrovider2.confluent.cloud:9092,lopc-32wwg15.mycloudrovider2.confluent.cloud:9092");
}

@ParameterizedTest
@MethodSource("confluentCloudHostList")
public void shouldRetrieveLightstreamreClientIdWhenConnectedToConfluentClod(String hostList) {
Map<String, String> updatedConfig = new HashMap<>(standardParameters());
updatedConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hostList);
ConnectorConfig config = ConnectorConfig.newConfig(adapterDir.toFile(), updatedConfig);
Properties baseConsumerProps = config.baseConsumerProps();
assertThat(baseConsumerProps)
.containsAtLeast(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
hostList,
ConsumerConfig.CLIENT_ID_CONFIG,
ConnectorConfig.LIGHSTREAMER_CLIENT_ID);
}

static Stream<String> partialConfluentCloudHostList() {
return Stream.of(
"def-437seq1.mycloudrovider2.my.com:9092,lopc-32wwg15.mycloudrovider2.confluent.cloud1:9092");
}

@ParameterizedTest
@MethodSource("partialConfluentCloudHostList")
public void shouldNonRetrieveLightstreamreClientIdWhenNotAllHostConnectedToConfluentClod(
String hostList) {
Map<String, String> updatedConfig = new HashMap<>(standardParameters());
updatedConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hostList);
ConnectorConfig config = ConnectorConfig.newConfig(adapterDir.toFile(), updatedConfig);
Properties baseConsumerProps = config.baseConsumerProps();
assertThat(baseConsumerProps)
.containsAtLeast(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
hostList,
ConsumerConfig.CLIENT_ID_CONFIG,
"");
}

@Test
public void shouldExtendBaseConsumerProperties() {
ConnectorConfig config =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,23 @@ void shouldReturnPair(String splittable) {
assertThat(Split.asPair(splittable)).hasValue(new Pair("a", "b"));
}

@ParameterizedTest
@ValueSource(strings = {"a-b", " a-b ", "a- b ", " a-b", " a - b "})
void shouldReturnPairWithNonDefaultSeparator(String splittable) {
assertThat(Split.asPair(splittable, '-')).hasValue(new Pair("a", "b"));
}

@ParameterizedTest
@NullAndEmptySource
@ValueSource(strings = {"a", "a:", " :b ", ": b ", ":", " : "})
void shouldReturnEmptyPair(String splittable) {
assertThat(Split.asPair(splittable)).isEmpty();
}

@ParameterizedTest
@NullAndEmptySource
@ValueSource(strings = {"a", "a-", " -b ", "- b ", "-", " - "})
void shouldReturnEmptyPairWithNonDefaultSeparator(String splittable) {
assertThat(Split.asPair(splittable, '-')).isEmpty();
}
}

0 comments on commit 2ceda1f

Please sign in to comment.