Skip to content

Commit

Permalink
Merge branch 'develop' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenzo-ingenito committed Dec 20, 2024
2 parents ac3e0e7 + 8a9d2f6 commit 2b7ccc4
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 54 deletions.
26 changes: 23 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,17 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.8.2</version>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
<version>4.8.2</version>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>4.8.2</version>
<version>5.2.1</version>
</dependency>

<!-- KAFKA -->
Expand Down Expand Up @@ -183,6 +183,26 @@
<artifactId>commons-io</artifactId>
<version>2.14.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.microsoft.azure/msal4j -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>msal4j</artifactId>
<version>1.17.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-client-authentication</artifactId>
<version>1.6.15</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.codehaus.janino/janino -->
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.1.7</version>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@ConditionalOnProperty(name = "sasl.mechanism", havingValue = "OAUTHBEARER", matchIfMissing = false)
public class KafkaHealthIndicator implements HealthIndicator {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2.CustomAuthenticateCallbackHandler;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.utility.StringUtility;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -51,9 +51,6 @@ public class KafkaProducerCFG {
@Autowired
private KafkaProducerPropertiesCFG kafkaProducerPropCFG;

@Autowired
private Environment env;


/**
* Kafka producer configurazione.
Expand All @@ -76,21 +73,30 @@ public Map<String, Object> producerConfigs() {
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getProtocol())) {
props.put("security.protocol", kafkaPropCFG.getProtocol());
}

if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getMechanism())) {
props.put("sasl.mechanism", kafkaPropCFG.getMechanism());
}

if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getConfigJaas())) {
props.put("sasl.jaas.config", kafkaPropCFG.getConfigJaas());
}

if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getTrustoreLocation())) {
props.put("ssl.truststore.location", kafkaPropCFG.getTrustoreLocation());
}

if (kafkaPropCFG.getTrustorePassword() != null && kafkaPropCFG.getTrustorePassword().length > 0) {
props.put("ssl.truststore.password", String.valueOf(kafkaPropCFG.getTrustorePassword()));
System.out.println("TRUSTORE PWD:"+String.valueOf(kafkaPropCFG.getTrustorePassword()));
}

if(!StringUtility.isNullOrEmpty(env.getProperty("kafka.properties.request.timeout.ms"))) {
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG , env.getProperty("kafka.properties.request.timeout.ms"));
if("OAUTHBEARER".equals(kafkaPropCFG.getMechanism())) {
props.put("sasl.login.callback.handler.class", CustomAuthenticateCallbackHandler.class);
props.put("kafka.oauth.tenantId", kafkaPropCFG.getTenantId());
props.put("kafka.oauth.appId", kafkaPropCFG.getAppId());
props.put("kafka.oauth.pfxPathName", kafkaPropCFG.getPfxPathName());
props.put("kafka.oauth.pwd", kafkaPropCFG.getPwd());
}

return props;
Expand Down Expand Up @@ -142,22 +148,35 @@ public Map<String, Object> producerWithoutTransactionConfigs() {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getValueSerializer());

if (!StringUtils.isBlank(kafkaPropCFG.getProtocol())) {
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getProtocol())) {
props.put("security.protocol", kafkaPropCFG.getProtocol());
}
if (!StringUtils.isBlank(kafkaPropCFG.getMechanism())) {

if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getMechanism())) {
props.put("sasl.mechanism", kafkaPropCFG.getMechanism());
}
if (!StringUtils.isBlank(kafkaPropCFG.getConfigJaas())) {

if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getConfigJaas())) {
props.put("sasl.jaas.config", kafkaPropCFG.getConfigJaas());
}
if (!StringUtils.isBlank(kafkaPropCFG.getTrustoreLocation())) {

if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getTrustoreLocation())) {
props.put("ssl.truststore.location", kafkaPropCFG.getTrustoreLocation());
}

if (kafkaPropCFG.getTrustorePassword() != null && kafkaPropCFG.getTrustorePassword().length > 0) {
props.put("ssl.truststore.password", String.valueOf(kafkaPropCFG.getTrustorePassword()));
}

if("OAUTHBEARER".equals(kafkaPropCFG.getMechanism())) {
props.put("sasl.login.callback.handler.class", CustomAuthenticateCallbackHandler.class);
props.put("kafka.oauth.tenantId", kafkaPropCFG.getTenantId());
props.put("kafka.oauth.appId", kafkaPropCFG.getAppId());
props.put("kafka.oauth.pfxPathName", kafkaPropCFG.getPfxPathName());
props.put("kafka.oauth.pwd", kafkaPropCFG.getPwd());
}


return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand Down Expand Up @@ -68,17 +69,23 @@ public class KafkaPropertiesCFG {
@Value("${kafka.properties.ssl.truststore.password}")
private transient char[] trustorePassword;

/**
* Enable Ssl flag.
*/
@Value("${kafka.enablessl}")
private boolean enableSSL;
@Value("${kafka.oauth.tenantId}")
private String tenantId;

@Value("${kafka.oauth.appId}")
private String appId;

@Value("${kafka.oauth.pfxPathName}")
private String pfxPathName;

@Value("${kafka.oauth.pwd}")
private String pwd;

@Autowired
private ProfileUtility profileUtility;

@Bean
@ConditionalOnProperty(name = "sasl.mechanism", havingValue = "OAUTHBEARER", matchIfMissing = false)
public AdminClient client() {
Properties configProperties = new Properties();
configProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.

package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;

import com.microsoft.aad.msal4j.ClientCredentialFactory;
import com.microsoft.aad.msal4j.ClientCredentialParameters;
import com.microsoft.aad.msal4j.ConfidentialClientApplication;
import com.microsoft.aad.msal4j.IAuthenticationResult;
import com.microsoft.aad.msal4j.IClientCredential;

import it.finanze.sanita.fse2.ms.gtw.dispatcher.exceptions.BusinessException;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {

private String tenantId;

private String appId;

private String pfxPathName;

private String pwd;

private ConfidentialClientApplication aadClient;
private ClientCredentialParameters aadParameters;

@Override
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();

bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
URI uri = URI.create("https://" + bootstrapServer);
String sbUri = uri.getScheme() + "://" + uri.getHost();
this.aadParameters =
ClientCredentialParameters.builder(Collections.singleton(sbUri + "/.default"))
.build();
this.tenantId = "https://login.microsoftonline.com/"+ Arrays.asList(configs.get("kafka.oauth.tenantId")).get(0).toString();
this.appId = Arrays.asList(configs.get("kafka.oauth.appId")).get(0).toString();
this.pfxPathName = Arrays.asList(configs.get("kafka.oauth.pfxPathName")).get(0).toString();
this.pwd = Arrays.asList(configs.get("kafka.oauth.pwd")).get(0).toString();

}

public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback: callbacks) {
if (callback instanceof OAuthBearerTokenCallback) {
try {
OAuthBearerToken token = getOAuthBearerToken();
OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
oauthCallback.token(token);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
} else {
throw new UnsupportedCallbackException(callback);
}
}
}

private OAuthBearerToken getOAuthBearerToken() throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException {
if (this.aadClient == null) {
synchronized(this) {
if (this.aadClient == null) {
IClientCredential credential = null;
try (FileInputStream certificato = new FileInputStream(new File(pfxPathName))) {
credential = ClientCredentialFactory.createFromCertificate(certificato, this.pwd);
} catch(Exception ex) {
log.error("Error while try to crate credential from certificate");
throw new BusinessException(ex);
}
this.aadClient = ConfidentialClientApplication.builder(this.appId, credential)
.authority(this.tenantId)
.build();
}
}
}

IAuthenticationResult authResult = this.aadClient.acquireToken(this.aadParameters).get();
log.info("Token oauth2 acquired");
return new OAuthBearerTokenImp(authResult.accessToken(), authResult.expiresOnDate());
}

public void close() throws KafkaException {
// NOOP
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;
import java.util.Date;
import java.util.Set;

import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;


public class OAuthBearerTokenImp implements OAuthBearerToken {
private String token;
private long lifetimeMs;

public OAuthBearerTokenImp(final String token, Date expiresOn) {
this.token = token;
this.lifetimeMs = expiresOn.getTime();
}

@Override
public String value() {
return this.token;
}

@Override
public Set<String> scope() {
return null;
}

@Override
public long lifetimeMs() {
return this.lifetimeMs;
}

@Override
public String principalName() {
return null;
}

@Override
public Long startTimeMs() {
return null;
}
}
13 changes: 8 additions & 5 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ data.mongodb.schema-name=FSE_GTW
# KAFKA
#######################################
kafka.bootstrap-servers=localhost:29092
spring.sleuth.messaging.kafka.enabled=true
kafka.properties.security.protocol=
kafka.properties.sasl.mechanism=
kafka.properties.sasl.jaas.config=
spring.sleuth.messaging.kafka.enabled=false
kafka.properties.security.protocol=PLAINTEXT
kafka.properties.sasl.mechanism=PLAINTEXT
kafka.properties.sasl.jaas.config=PLAINTEXT
kafka.properties.ssl.truststore.location=
kafka.properties.ssl.truststore.password=

kafka.oauth.tenantId=
kafka.oauth.appId=
kafka.oauth.pfxPathName=
kafka.oauth.pwd=
#######################################
# Microservices url
#######################################
Expand Down
9 changes: 5 additions & 4 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,19 @@ kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLo
kafka.properties.ssl.truststore.location=/config/resources/security/truststore.jks
kafka.properties.ssl.truststore.password=${TRUST_JKS_PASSWORD}
kafka.properties.request.timeout.ms=
kafka.enablessl=true

kafka.oauth.tenantId=${TENANT_ID}
kafka.oauth.appId=${APP_ID}
kafka.oauth.pfxPathName=${PFX_NAME_RESOURCE_PATH}
kafka.oauth.pwd=${PFX_PASSWORD}
####### KAFKA PRODUCER SETTINGS ################
kafka.producer.client-id=springboot-gtw-dispatcher
kafka.producer.retries=5
kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.producer.transactional.id=gtwdispatcher.tx.
kafka.producer.transactional.id=
kafka.producer.enable.idempotence=true
kafka.producer.ack=all


###### KAFKA CODE ##########
kafka.statusmanager.topic=MDS-SA-0004_FU_001_STATUS
kafka.dispatcher-indexer.base-topic=MDS-SA-0004_FU_001_INI
Expand Down
Loading

0 comments on commit 2b7ccc4

Please sign in to comment.