Skip to content

Commit

Permalink
feat: Eventhub
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenzo-ingenito committed Dec 11, 2024
1 parent 6403edf commit 59daed2
Show file tree
Hide file tree
Showing 12 changed files with 354 additions and 94 deletions.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@
</exclusions>
</dependency>

<!-- https://mvnrepository.com/artifact/com.microsoft.azure/msal4j -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>msal4j</artifactId>
<version>1.17.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-client-authentication</artifactId>
<version>1.6.15</version>
<scope>compile</scope>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

import it.finanze.sanita.fse2.ms.gtw.statusmanager.config.kafka.oauth2.CustomAuthenticateCallbackHandler;
import it.finanze.sanita.fse2.ms.gtw.statusmanager.utility.StringUtility;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -47,6 +48,10 @@ public class KafkaConsumerCFG {

@Autowired
private KafkaTopicCFG topicCFG;

@Autowired
private KafkaPropertiesCFG kafkaPropsCfg;


/**
* Configurazione consumer.
Expand All @@ -58,7 +63,7 @@ public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaConsumerPropCFG.getClientId());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerPropCFG.getConsumerBootstrapServers());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropsCfg.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerPropCFG.getConsumerGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConsumerPropCFG.getConsumerKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConsumerPropCFG.getConsumerValueDeserializer());
Expand All @@ -70,21 +75,28 @@ public Map<String, Object> consumerConfigs() {
props.put("security.protocol", kafkaConsumerPropCFG.getProtocol());
}

if(!StringUtility.isNullOrEmpty(kafkaConsumerPropCFG.getMechanism())) {
props.put("sasl.mechanism", kafkaConsumerPropCFG.getMechanism());
if(!StringUtility.isNullOrEmpty(kafkaPropsCfg.getMechanism())) {
props.put("sasl.mechanism", kafkaPropsCfg.getMechanism());
}

if(!StringUtility.isNullOrEmpty(kafkaConsumerPropCFG.getConfigJaas())) {
props.put("sasl.jaas.config", kafkaConsumerPropCFG.getConfigJaas());
if(!StringUtility.isNullOrEmpty(kafkaPropsCfg.getConfigJaas())) {
props.put("sasl.jaas.config", kafkaPropsCfg.getConfigJaas());
}

if(!StringUtility.isNullOrEmpty(kafkaConsumerPropCFG.getTrustoreLocation())) {
props.put("ssl.truststore.location", kafkaConsumerPropCFG.getTrustoreLocation());
if (!StringUtility.isNullOrEmpty(kafkaPropsCfg.getTrustoreLocation())) {
props.put("ssl.truststore.location", kafkaPropsCfg.getTrustoreLocation());
}
if (kafkaPropsCfg.getTrustorePassword() != null && kafkaPropsCfg.getTrustorePassword().length > 0) {
props.put("ssl.truststore.password", String.valueOf(kafkaPropsCfg.getTrustorePassword()));
}

if(!StringUtility.isNullOrEmpty(String.valueOf(kafkaConsumerPropCFG.getTrustorePassword()))) {
props.put("ssl.truststore.password", String.valueOf(kafkaConsumerPropCFG.getTrustorePassword()));
if("OAUTHBEARER".equals(kafkaPropsCfg.getMechanism())) {
props.put("sasl.login.callback.handler.class", CustomAuthenticateCallbackHandler.class);
props.put("kafka.oauth.tenantId", kafkaPropsCfg.getTenantId());
props.put("kafka.oauth.clientId", kafkaPropsCfg.getClientId());
props.put("kafka.oauth.pwd", kafkaPropsCfg.getPwd());
}


return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ public class KafkaConsumerPropertiesCFG {
@Value("${kafka.consumer.value-deserializer}")
private String consumerValueDeserializer;

/**
* Consumer bootstrap server.
*/
@Value("${kafka.consumer.bootstrap-servers}")
private String consumerBootstrapServers;

/**
* Isolation level.
*/
Expand Down Expand Up @@ -85,28 +79,5 @@ public class KafkaConsumerPropertiesCFG {
@Value("${kafka.properties.security.protocol}")
private String protocol;

/**
* Meccanismo.
*/
@Value("${kafka.properties.sasl.mechanism}")
private String mechanism;

/**
* Config jaas.
*/
@Value("${kafka.properties.sasl.jaas.config}")
private String configJaas;

/**
* Truststore location.
*/
@Value("${kafka.properties.ssl.truststore.location}")
private String trustoreLocation;

/**
* Truststore password.
*/
@Value("${kafka.properties.ssl.truststore.password}")
private transient char[] trustorePassword;


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import it.finanze.sanita.fse2.ms.gtw.statusmanager.config.kafka.oauth2.CustomAuthenticateCallbackHandler;
import it.finanze.sanita.fse2.ms.gtw.statusmanager.utility.StringUtility;

@Configuration
Expand All @@ -34,6 +35,8 @@ public class KafkaProducerCFG {
@Autowired
private KafkaProducerPropertiesCFG kafkaProducerPropCFG;

@Autowired
private KafkaPropertiesCFG kafkaPropCFG;

/**
* Genera configurazione senza transazione.
Expand All @@ -46,29 +49,36 @@ public Map<String, Object> producerWithoutTransactionConfigs() {

props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerPropCFG.getClientId() + "-noTx");
props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerPropCFG.getProducerRetries());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerPropCFG.getProducerBootstrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropCFG.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getProducerKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getProducerValueSerializer());

if(!StringUtility.isNullOrEmpty(kafkaProducerPropCFG.getProtocol())) {
props.put("security.protocol", kafkaProducerPropCFG.getProtocol());
if(!StringUtility.isNullOrEmpty(kafkaPropCFG.getProtocol())) {
props.put("security.protocol", kafkaPropCFG.getProtocol());
}

if(!StringUtility.isNullOrEmpty(kafkaProducerPropCFG.getMechanism())) {
props.put("sasl.mechanism", kafkaProducerPropCFG.getMechanism());
if(!StringUtility.isNullOrEmpty(kafkaPropCFG.getMechanism())) {
props.put("sasl.mechanism", kafkaPropCFG.getMechanism());
}

if(!StringUtility.isNullOrEmpty(kafkaProducerPropCFG.getConfigJaas())) {
props.put("sasl.jaas.config", kafkaProducerPropCFG.getConfigJaas());
if(!StringUtility.isNullOrEmpty(kafkaPropCFG.getConfigJaas())) {
props.put("sasl.jaas.config", kafkaPropCFG.getConfigJaas());
}

if(!StringUtility.isNullOrEmpty(kafkaProducerPropCFG.getTrustoreLocation())) {
props.put("ssl.truststore.location", kafkaProducerPropCFG.getTrustoreLocation());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getTrustoreLocation())) {
props.put("ssl.truststore.location", kafkaPropCFG.getTrustoreLocation());
}
if (kafkaPropCFG.getTrustorePassword() != null && kafkaPropCFG.getTrustorePassword().length > 0) {
props.put("ssl.truststore.password", String.valueOf(kafkaPropCFG.getTrustorePassword()));
}

if(!StringUtility.isNullOrEmpty(String.valueOf(kafkaProducerPropCFG.getTrustorePassword()))) {
props.put("ssl.truststore.password", String.valueOf(kafkaProducerPropCFG.getTrustorePassword()));
if("OAUTHBEARER".equals(kafkaPropCFG.getMechanism())) {
props.put("sasl.login.callback.handler.class", CustomAuthenticateCallbackHandler.class);
props.put("kafka.oauth.tenantId", kafkaPropCFG.getTenantId());
props.put("kafka.oauth.clientId", kafkaPropCFG.getClientId());
props.put("kafka.oauth.pwd", kafkaPropCFG.getPwd());
}


return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ public class KafkaProducerPropertiesCFG {
@Value("${kafka.producer.value-serializer}")
private String producerValueSerializer;

/**
* Producer bootstrap server.
*/
@Value("${kafka.producer.bootstrap-servers}")
private String producerBootstrapServers;

/**
* Producer trans id.
Expand All @@ -71,34 +66,5 @@ public class KafkaProducerPropertiesCFG {
@Value("${kafka.producer.ack}")
private String producerACK;

/**
* Protocol.
*/
@Value("${kafka.properties.security.protocol}")
private String protocol;

/**
* Meccanismo.
*/
@Value("${kafka.properties.sasl.mechanism}")
private String mechanism;

/**
* Config jass.
*/
@Value("${kafka.properties.sasl.jaas.config}")
private String configJaas;

/**
* Posizione trust store.
*/
@Value("${kafka.properties.ssl.truststore.location}")
private String trustoreLocation;

/**
* Password trust store.
*/
@Value("${kafka.properties.ssl.truststore.password}")
private char[] trustorePassword;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package it.finanze.sanita.fse2.ms.gtw.statusmanager.config.kafka;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@Configuration
@Data
public class KafkaPropertiesCFG {

/**
* Producer bootstrap server.
*/
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${kafka.oauth.tenantId}")
private String tenantId;

@Value("${kafka.oauth.clientId}")
private String clientId;

@Value("${kafka.oauth.pwd}")
private String pwd;

/**
* Protocol.
*/
@Value("${kafka.properties.security.protocol}")
private String protocol;

/**
* Meccanismo.
*/
@Value("${kafka.properties.sasl.mechanism}")
private String mechanism;

/**
* Config jass.
*/
@Value("${kafka.properties.sasl.jaas.config}")
private String configJaas;

/**
* Posizione trust store.
*/
@Value("${kafka.properties.ssl.truststore.location}")
private String trustoreLocation;

/**
* Password trust store.
*/
@Value("${kafka.properties.ssl.truststore.password}")
private char[] trustorePassword;

}
Loading

0 comments on commit 59daed2

Please sign in to comment.