Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gfinocchiaro committed Feb 22, 2024
1 parent fe94bc6 commit 41e761b
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 68 deletions.
5 changes: 4 additions & 1 deletion src/clients/kafka-producer.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS org.apache.kafka:kafka-clients:3.3.1
//REPOS central,confluent=https://packages.confluent.io/maven
//DEPS org.apache.kafka:kafka-clients:7.5.3-ccs
//DEPS io.confluent:kafka-avro-serializer:7.5.3
//DEPS org.slf4j:slf4j-api:2.0.10
//DEPS org.slf4j:slf4j-reload4j:2.0.10
//DEPS info.picocli:picocli:4.7.5
//SOURCES producer/Producer.java


import producer.Producer;
import picocli.CommandLine;

Expand Down
44 changes: 24 additions & 20 deletions src/clients/producer/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package producer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;

import picocli.CommandLine.Option;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.SecureRandom;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,6 +45,13 @@ public class Producer implements Runnable {
@Option(names = "--topic", description = "The target topic", required = true)
private String topic;

@Option(
names = "--config-path",
description = "The configuration file path",
required = false,
defaultValue = "src/clients/producer/simple-config.properties")
private String configPath;

@Option(
names = "--period",
description = "The interval in ms between two successive executions",
Expand All @@ -55,22 +63,15 @@ public void run() {
// BasicConfigurator.configure();
// Create producer configs
Properties properties = new Properties();
System.out.println(Paths.get(".").toAbsolutePath());
try (InputStream is = Files.newInputStream(Paths.get(this.configPath))) {
properties.load(is);
} catch (IOException e) {
throw new RuntimeException(e);
}

properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.setProperty(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "secrets/kafka.client.truststore.jks");
// properties.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
// properties.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// ssl.truststore.password=test1234
// Create and start the producer.
properties.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.setProperty(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ) {
int key = 0;
Expand All @@ -81,7 +82,7 @@ public void run() {
.mapToObj(Character::toString)
.collect(Collectors.joining());

String keyString = null; // String.valueOf(key++);
String keyString = String.valueOf(key++);
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(this.topic, keyString, message);
producer.send(
Expand All @@ -94,8 +95,11 @@ public void onCompletion(RecordMetadata metadata, Exception e) {
return;
}
System.out.printf(
"Sent record [%s]%n to topic [%s] and partition [%d]%n",
record.value(), record.topic(), record.partition());
"Sent record [key=%s,value=%s]%n to topic [%s]]%n",
record.key(),
record.value(),
record.topic(),
record.partition());
}
});
TimeUnit.MILLISECONDS.sleep(this.periodMs);
Expand Down
3 changes: 3 additions & 0 deletions src/clients/producer/avro-config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url=http://localhost:8081
2 changes: 2 additions & 0 deletions src/clients/producer/simple-config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
29 changes: 16 additions & 13 deletions src/connector/dist/adapters.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@
<param name="record.extraction.error.strategy">FORCE_UNSUBSCRIPTION</param>
-->

<param name="encryption.enabled">false</param>
<param name="encryption.enabled">false</param>

<!-- TOPIC MAPPING SECTION -->

<!-- Define a "sample" item-template, which is simply made of the "sample" item name to be used by the Lighstreamer Client subscription. -->
Expand All @@ -89,7 +86,7 @@
<param name="map.sample-topic.to">item-template.sample1,item-template.sample2</param>
<!--<param name="map.sample-topic2.to">item-template.sample2</param>-->

<param name="encryption.enable">true</param>
<!-- <param name="encryption.enable">true</param> -->
<!--<param name="encryption.security.protocol">SSL</param>-->
<!-- <param name="encryption.truststore.path">secrets/kafka.client.truststore.jks1</param>
<param name="encryption.truststore.password">password</param> -->
Expand Down Expand Up @@ -123,28 +120,30 @@
<adapter_class>com.lightstreamer.kafka_connector.adapters.ConnectorDataAdapter</adapter_class>

<!-- The Kafka cluster address -->
<param name="bootstrap.servers">pkc-z9doz.eu-west-1.aws.confluent.cloud:9092</param>
<!--<param name="bootstrap.servers">pkc-z9doz.eu-west-1.aws.confluent.cloud:9092</param>-->
<param name="bootstrap.servers">localhost:9092</param>

<param name="group.id">test</param>

<param name="enabled">true</param> -->
<param name="enabled">true</param>

<!-- TOPIC MAPPING SECTION -->

<!-- Define a "sample" item-template, which is simply made of the "sample" item name to be used by the Lighstreamer Client subscription. -->
<!-- <param name="item-template.sample1">sample-#{partition=PARTITION}</param>
<param name="item-template.sample2">sample</param> -->
<param name="item-template.sample2">sample-#{key=KEY}</param>
<param name="item-template.sample1">sample-#{key=KEY}</param>
<param name="item-template.sample2">sample-#{value=VALUE}</param>

<!-- Map the Kafka topic "sample-topic" to the previous defined "sample" item template. -->
<param name="map.topic_0.to">item-template.sample1,item-template.sample2</param>
<param name="map.avro-topic-1.to">item-template.sample1</param>

<param name="value.evaluator.type">AVRO</param>
<param name="value.evaluator.schema.registry">AVRO</param>
<param name="value.evaluator.schema.registry.enable">enabled</param>

<param name="schema.registry.url">https://localhost:8081</param>

<param name="schema.registry.encryption.truststore.path">secrets/kafka.client.truststore.jks</param>
<!--<param name="schema.registry.encryption.truststore.path">secrets/kafka.client.truststore.jks</param>
<param name="schema.registry.encryption.truststore.password">password</param>
<param name="schema.registry.encryption.truststore.password">password</param>
Expand All @@ -155,11 +154,15 @@
<param name="schema.registry.basic.authentication.enabled">true</param>
<param name="schema.registry.basic.authentication.username">true</param>
<param name="schema.registry.basic.authentication.password">true</param>
-->


<!--
<param name="encryption.enabled">true</param>
<!-- <param name="encryption.truststore.path">secrets/kafka.client.truststore.jks1</param>
<param name="encryption.truststore.password">password</param> -->
<param name="encryption.truststore.path">secrets/kafka.client.truststore.jks1</param>
<param name="encryption.truststore.password">password</param>
-->


<!-- <param name="authentication.enabled">true</param>
<param name="authentication.username">R5QP6BKBEHPRHEI5</param>
Expand All @@ -182,7 +185,7 @@
<!-- Extraction of the record offset mapped to the field "offset". -->
<param name="field.offset">#{OFFSET}</param>

<!-- </data_provider> -->
</data_provider>

</adapters_conf>

Original file line number Diff line number Diff line change
Expand Up @@ -276,29 +276,25 @@ public ConnectorConfig(Map<String, String> configs) {

@Override
protected final void postValidate() throws ConfigException {
if (getKeyEvaluator().equals(EvaluatorType.AVRO)) {
if (!isSchemaRegistryEnabledForKey()) {
checkAvroSchemaConfig(true);
checkAvroSchemaConfig(false);
}

private void checkAvroSchemaConfig(boolean isKey) {
String schemaPathKey = isKey ? KEY_EVALUATOR_SCHEMA_PATH : VALUE_EVALUATOR_SCHEMA_PATH;
String evaluatorKey = isKey ? KEY_EVALUATOR_TYPE : VALUE_EVALUATOR_TYPE;
String schemaEnabledKey =
isKey
? KEY_EVALUATOR_SCHEMA_REGISTRY_ENABLE
: VALUE_EVALUATOR_SCHEMA_REGISTRY_ENABLE;
if (getEvaluator(evaluatorKey).equals(EvaluatorType.AVRO)) {
if (!getBoolean(schemaEnabledKey)) {
try {
getFile(KEY_EVALUATOR_SCHEMA_PATH, true);
getFile(schemaPathKey, true);
} catch (ConfigException ce) {
throw new ConfigException(
"Specify a valid value either for [%s] or [%s]"
.formatted(
KEY_EVALUATOR_SCHEMA_PATH, SchemaRegistryConfigs.URL));
}
}
}

if (getValueEvaluator().equals(EvaluatorType.AVRO)) {
if (!isSchemaRegistryEnabledForValue()) {
try {
getFile(VALUE_EVALUATOR_SCHEMA_PATH, true);
} catch (ConfigException ce) {
throw new ConfigException(
"Specify a valid value either for [%s] or [%s]"
.formatted(
VALUE_EVALUATOR_SCHEMA_PATH,
SchemaRegistryConfigs.URL));
.formatted(schemaPathKey, schemaEnabledKey));
}
}
}
Expand Down Expand Up @@ -371,6 +367,10 @@ public final EvaluatorType getValueEvaluator() {
return EvaluatorType.valueOf(get(VALUE_EVALUATOR_TYPE, EVALUATOR, false));
}

public final EvaluatorType getEvaluator(String configKey) {
return EvaluatorType.valueOf(get(configKey, EVALUATOR, false));
}

public final RecordErrorHandlingStrategy getRecordExtractionErrorHandlingStrategy() {
return RecordErrorHandlingStrategy.valueOf(
get(RECORD_EXTRACTION_ERROR_HANDLING_STRATEGY, ERROR_STRATEGY, false));
Expand Down Expand Up @@ -662,7 +662,7 @@ public String schemaRegistrySslProvider() {

public boolean isSchemaRegistryHostNameVerificationEnabled() {
checkSchemaRegistryEncryptionEnabled();
return getBoolean(SchemaRegistryConfigs.ENABLE_HOSTNAME_VERIFICATION);
return getBoolean(SchemaRegistryConfigs.HOSTNAME_VERIFICATION_ENANLE);
}

public String schemaRegistryKeyPassword() {
Expand All @@ -672,13 +672,14 @@ public String schemaRegistryKeyPassword() {

public boolean isSchemaRegistryKeystoreEnabled() {
checkSchemaRegistryEncryptionEnabled();
return getBoolean(SchemaRegistryConfigs.ENABLE_MTLS);
return getBoolean(SchemaRegistryConfigs.KEYSTORE_ENABLE);
}

private void checkSchemaRegistryKeystoreEnabled() {
if (!isSchemaRegistryKeystoreEnabled()) {
throw new ConfigException(
"Parameter [%s] is not enabled".formatted(SchemaRegistryConfigs.ENABLE_MTLS));
"Parameter [%s] is not enabled"
.formatted(SchemaRegistryConfigs.KEYSTORE_ENABLE));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public class SchemaRegistryConfigs {
public static final String TRUSTSTORE_PATH = nse(TlsConfigs.TRUSTSTORE_PATH);
public static final String TRUSTSTORE_PASSWORD = nse(TlsConfigs.TRUSTSTORE_PASSWORD);

public static final String ENABLE_MTLS = nse(TlsConfigs.KESYTORE_ENABLE);
public static final String KEYSTORE_ENABLE = nse(TlsConfigs.KESYTORE_ENABLE);

public static final String ENABLE_HOSTNAME_VERIFICATION =
public static final String HOSTNAME_VERIFICATION_ENANLE =
nse(TlsConfigs.HOSTNAME_VERIFICATION_ENABLE);

public static final String SSL_CIPHER_SUITES = nse(TlsConfigs.SSL_CIPHER_SUITES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
public class KeystoreConfigs {

public static String KEYSTORE_TYPE = "keystore.type";

public static String KEYSTORE_PATH = "keystore.path";

public static String KEYSTORE_PASSWORD = "keystore.password";

public static String KEY_PASSWORD = "keystore.key.password";

private static ConfigsSpec CONFIG_SPEC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public void shouldFailDueToMissingSchemaPathForAvro() {
ConfigException.class, () -> ConnectorConfigProvider.minimalWith(configs));
assertThat(ce.getMessage())
.isEqualTo(
"Specify a valid value either for [key.evaluator.schema.path] or [schema.registry.url]");
"Specify a valid value either for [key.evaluator.schema.path] or [key.evaluator.schema.registry.enable]");

Map<String, String> configs2 = new HashMap<>();
configs2.put(ConnectorConfig.VALUE_EVALUATOR_TYPE, "AVRO");
Expand All @@ -652,7 +652,7 @@ public void shouldFailDueToMissingSchemaPathForAvro() {
ConfigException.class, () -> ConnectorConfigProvider.minimalWith(configs2));
assertThat(ce.getMessage())
.isEqualTo(
"Specify a valid value either for [value.evaluator.schema.path] or [schema.registry.url]");
"Specify a valid value either for [value.evaluator.schema.path] or [value.evaluator.schema.registry.enable]");
}

@Test
Expand Down Expand Up @@ -1555,7 +1555,7 @@ public void shouldOverrideSchemaRegistryEncryptionSettings() {
updatedConfig.put(
SchemaRegistryConfigs.SSL_CIPHER_SUITES,
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA");
updatedConfig.put(SchemaRegistryConfigs.ENABLE_HOSTNAME_VERIFICATION, "true");
updatedConfig.put(SchemaRegistryConfigs.HOSTNAME_VERIFICATION_ENANLE, "true");

ConnectorConfig config = ConnectorConfig.newConfig(adapterDir.toFile(), updatedConfig);

Expand Down Expand Up @@ -1600,7 +1600,7 @@ public void shouldGetDefaultSchemaRegistryKeystoreSettings() {
Map<String, String> updatedConfig = new HashMap<>(standardParameters());
updatedConfig.put(ConnectorConfig.VALUE_EVALUATOR_SCHEMA_REGISTRY_ENABLE, "true");
updatedConfig.put(SchemaRegistryConfigs.URL, "https://localhost:8080");
updatedConfig.put(SchemaRegistryConfigs.ENABLE_MTLS, "true");
updatedConfig.put(SchemaRegistryConfigs.KEYSTORE_ENABLE, "true");
updatedConfig.put(
SchemaRegistryConfigs.KEYSTORE_PATH, keyStoreFile.getFileName().toString());
updatedConfig.put(SchemaRegistryConfigs.KEYSTORE_PASSWORD, "keystore-password");
Expand Down Expand Up @@ -1631,7 +1631,7 @@ public void shouldOverrideSchemaRegistryKeystoreSettings() {
Map<String, String> updatedConfig = new HashMap<>(standardParameters());
updatedConfig.put(ConnectorConfig.VALUE_EVALUATOR_SCHEMA_REGISTRY_ENABLE, "true");
updatedConfig.put(SchemaRegistryConfigs.URL, "https://localhost:8080");
updatedConfig.put(SchemaRegistryConfigs.ENABLE_MTLS, "true");
updatedConfig.put(SchemaRegistryConfigs.KEYSTORE_ENABLE, "true");
updatedConfig.put(SchemaRegistryConfigs.KEYSTORE_TYPE, "PKCS12");
updatedConfig.put(
SchemaRegistryConfigs.KEYSTORE_PATH, keyStoreFile.getFileName().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ void shouldReturnConfigSpec() {
assertThat(trustStorePassword.type()).isEqualTo(ConfType.TEXT);

ConfParameter enableHostNameVerification =
configSpec.getParameter(SchemaRegistryConfigs.ENABLE_HOSTNAME_VERIFICATION);
configSpec.getParameter(SchemaRegistryConfigs.HOSTNAME_VERIFICATION_ENANLE);
assertThat(enableHostNameVerification.name())
.isEqualTo(SchemaRegistryConfigs.ENABLE_HOSTNAME_VERIFICATION);
.isEqualTo(SchemaRegistryConfigs.HOSTNAME_VERIFICATION_ENANLE);
assertThat(enableHostNameVerification.required()).isFalse();
assertThat(enableHostNameVerification.multiple()).isFalse();
assertThat(enableHostNameVerification.mutable()).isTrue();
Expand Down

0 comments on commit 41e761b

Please sign in to comment.