Skip to content

Commit

Permalink
Merge branch 'feature/kakfka-eventhub' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenzo-ingenito committed Dec 16, 2024
2 parents eb59b61 + 251c47b commit ad05625
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 32 deletions.
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,19 @@
<artifactId>commons-io</artifactId>
<version>2.14.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.microsoft.azure/msal4j -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>msal4j</artifactId>
<version>1.17.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-client-authentication</artifactId>
<version>1.6.15</version>
<scope>compile</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@ConditionalOnProperty(name = "sasl.mechanism", havingValue = "OAUTHBEARER", matchIfMissing = false)
public class KafkaHealthIndicator implements HealthIndicator {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2.CustomAuthenticateCallbackHandler;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.utility.StringUtility;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -51,9 +51,6 @@ public class KafkaProducerCFG {
@Autowired
private KafkaProducerPropertiesCFG kafkaProducerPropCFG;

@Autowired
private Environment env;


/**
* Kafka producer configurazione.
Expand All @@ -65,32 +62,52 @@ public Map<String, Object> producerConfigs() {
InetAddress id = getLocalHost();

props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerPropCFG.getClientId() + "-tx" + "-" + id );
System.out.println("CLIENT_ID_CONFIG:"+kafkaProducerPropCFG.getClientId() + "-tx" + "-" + id);
props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerPropCFG.getRetries());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerPropCFG.getProducerBootstrapServers());
System.out.println("BOOTSTRAP:"+kafkaProducerPropCFG.getProducerBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getValueSerializer());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, id + "-" + kafkaProducerPropCFG.getTransactionalId());
System.out.println("TRANSACTION_ID:"+id + "-" + kafkaProducerPropCFG.getTransactionalId());
props.put(ProducerConfig.ACKS_CONFIG,kafkaProducerPropCFG.getAck());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,kafkaProducerPropCFG.getIdempotence());

System.out.println("PROTOCOL:"+kafkaPropCFG.getProtocol());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getProtocol())) {
props.put("security.protocol", kafkaPropCFG.getProtocol());
}

System.out.println("MECHANISM:"+kafkaPropCFG.getMechanism());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getMechanism())) {
props.put("sasl.mechanism", kafkaPropCFG.getMechanism());
}

System.out.println("CONFIG JAAS:"+kafkaPropCFG.getConfigJaas());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getConfigJaas())) {
props.put("sasl.jaas.config", kafkaPropCFG.getConfigJaas());
}

System.out.println("TRUSTORE LOCATION:"+kafkaPropCFG.getTrustoreLocation());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getTrustoreLocation())) {
props.put("ssl.truststore.location", kafkaPropCFG.getTrustoreLocation());
}

if (kafkaPropCFG.getTrustorePassword() != null && kafkaPropCFG.getTrustorePassword().length > 0) {
props.put("ssl.truststore.password", String.valueOf(kafkaPropCFG.getTrustorePassword()));
System.out.println("TRUSTORE PWD:"+String.valueOf(kafkaPropCFG.getTrustorePassword()));
}

if(!StringUtility.isNullOrEmpty(env.getProperty("kafka.properties.request.timeout.ms"))) {
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG , env.getProperty("kafka.properties.request.timeout.ms"));
if("OAUTHBEARER".equals(kafkaPropCFG.getMechanism())) {
props.put("sasl.login.callback.handler.class", CustomAuthenticateCallbackHandler.class);
props.put("kafka.oauth.tenantId", kafkaPropCFG.getTenantId());
System.out.println("TENNT ID:"+kafkaPropCFG.getTenantId());
props.put("kafka.oauth.appId", kafkaPropCFG.getAppId());
System.out.println("APP ID:"+kafkaPropCFG.getAppId());
props.put("kafka.oauth.pfxPathName", kafkaPropCFG.getPfxPathName());
System.out.println("PFX PATHNAME:"+kafkaPropCFG.getPfxPathName());
props.put("kafka.oauth.pwd", kafkaPropCFG.getPwd());
System.out.println("PASS:"+kafkaPropCFG.getPwd());
}

return props;
Expand Down Expand Up @@ -139,25 +156,49 @@ public Map<String, Object> producerWithoutTransactionConfigs() {
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerPropCFG.getClientId()+ "-noTx");
props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerPropCFG.getRetries());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerPropCFG.getProducerBootstrapServers());
System.out.println("BOOTSTRAP:"+kafkaProducerPropCFG.getProducerBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getValueSerializer());

if (!StringUtils.isBlank(kafkaPropCFG.getProtocol())) {

System.out.println("PROTOCOL:"+kafkaPropCFG.getProtocol());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getProtocol())) {
props.put("security.protocol", kafkaPropCFG.getProtocol());
}
if (!StringUtils.isBlank(kafkaPropCFG.getMechanism())) {

System.out.println("MECHANISM:"+kafkaPropCFG.getMechanism());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getMechanism())) {
props.put("sasl.mechanism", kafkaPropCFG.getMechanism());
}
if (!StringUtils.isBlank(kafkaPropCFG.getConfigJaas())) {

System.out.println("CONFIG JAAS:"+kafkaPropCFG.getConfigJaas());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getConfigJaas())) {
props.put("sasl.jaas.config", kafkaPropCFG.getConfigJaas());
}
if (!StringUtils.isBlank(kafkaPropCFG.getTrustoreLocation())) {

System.out.println("TRUSTORE LOCATION:"+kafkaPropCFG.getTrustoreLocation());
if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getTrustoreLocation())) {
props.put("ssl.truststore.location", kafkaPropCFG.getTrustoreLocation());
}

if (kafkaPropCFG.getTrustorePassword() != null && kafkaPropCFG.getTrustorePassword().length > 0) {
props.put("ssl.truststore.password", String.valueOf(kafkaPropCFG.getTrustorePassword()));
System.out.println("TRUSTORE PWD:"+String.valueOf(kafkaPropCFG.getTrustorePassword()));
}

if("OAUTHBEARER".equals(kafkaPropCFG.getMechanism())) {
props.put("sasl.login.callback.handler.class", CustomAuthenticateCallbackHandler.class);
props.put("kafka.oauth.tenantId", kafkaPropCFG.getTenantId());
System.out.println("TENNT ID:"+kafkaPropCFG.getTenantId());
props.put("kafka.oauth.appId", kafkaPropCFG.getAppId());
System.out.println("APP ID:"+kafkaPropCFG.getAppId());
props.put("kafka.oauth.pfxPathName", kafkaPropCFG.getPfxPathName());
System.out.println("PFX PATHNAME:"+kafkaPropCFG.getPfxPathName());
props.put("kafka.oauth.pwd", kafkaPropCFG.getPwd());
System.out.println("PASS:"+kafkaPropCFG.getPwd());
}


return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand Down Expand Up @@ -68,17 +69,23 @@ public class KafkaPropertiesCFG {
@Value("${kafka.properties.ssl.truststore.password}")
private transient char[] trustorePassword;

/**
* Enable Ssl flag.
*/
@Value("${kafka.enablessl}")
private boolean enableSSL;
@Value("${kafka.oauth.tenantId}")
private String tenantId;

@Value("${kafka.oauth.appId}")
private String appId;

@Value("${kafka.oauth.pfxPathName}")
private String pfxPathName;

@Value("${kafka.oauth.pwd}")
private String pwd;

@Autowired
private ProfileUtility profileUtility;

@Bean
@ConditionalOnProperty(name = "sasl.mechanism", havingValue = "OAUTHBEARER", matchIfMissing = false)
public AdminClient client() {
Properties configProperties = new Properties();
configProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.

package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;

import com.microsoft.aad.msal4j.ClientCredentialFactory;
import com.microsoft.aad.msal4j.ClientCredentialParameters;
import com.microsoft.aad.msal4j.ConfidentialClientApplication;
import com.microsoft.aad.msal4j.IAuthenticationResult;
import com.microsoft.aad.msal4j.IClientCredential;

import it.finanze.sanita.fse2.ms.gtw.dispatcher.exceptions.BusinessException;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.utility.FileUtility;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {

private String tenantId;

private String appId;

private String pfxPathName;

private String pwd;

private ConfidentialClientApplication aadClient;
private ClientCredentialParameters aadParameters;

@Override
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();

bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
URI uri = URI.create("https://" + bootstrapServer);
String sbUri = uri.getScheme() + "://" + uri.getHost();
this.aadParameters =
ClientCredentialParameters.builder(Collections.singleton(sbUri + "/.default"))
.build();
this.tenantId = "https://login.microsoftonline.com/"+ Arrays.asList(configs.get("kafka.oauth.tenantId")).get(0).toString();
this.appId = Arrays.asList(configs.get("kafka.oauth.appId")).get(0).toString();
this.pfxPathName = Arrays.asList(configs.get("kafka.oauth.pfxPathName")).get(0).toString();
this.pwd = Arrays.asList(configs.get("kafka.oauth.pwd")).get(0).toString();

}

public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback: callbacks) {
if (callback instanceof OAuthBearerTokenCallback) {
try {
OAuthBearerToken token = getOAuthBearerToken();
OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
oauthCallback.token(token);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
} else {
throw new UnsupportedCallbackException(callback);
}
}
}

private OAuthBearerToken getOAuthBearerToken() throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException {
if (this.aadClient == null) {
synchronized(this) {
if (this.aadClient == null) {
IClientCredential credential = null;
try (FileInputStream certificato = new FileInputStream(new File(pfxPathName))) {
credential = ClientCredentialFactory.createFromCertificate(certificato, this.pwd);
} catch(Exception ex) {
log.error("Error while try to crate credential from certificate");
throw new BusinessException(ex);
}
this.aadClient = ConfidentialClientApplication.builder(this.appId, credential)
.authority(this.tenantId)
.build();
}
}
}

IAuthenticationResult authResult = this.aadClient.acquireToken(this.aadParameters).get();
log.info("Token oauth2 acquired");
return new OAuthBearerTokenImp(authResult.accessToken(), authResult.expiresOnDate());
}

public void close() throws KafkaException {
// NOOP
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package it.finanze.sanita.fse2.ms.gtw.dispatcher.config.kafka.oauth2;
import java.util.Date;
import java.util.Set;

import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;


public class OAuthBearerTokenImp implements OAuthBearerToken {
private String token;
private long lifetimeMs;

public OAuthBearerTokenImp(final String token, Date expiresOn) {
this.token = token;
this.lifetimeMs = expiresOn.getTime();
}

@Override
public String value() {
return this.token;
}

@Override
public Set<String> scope() {
return null;
}

@Override
public long lifetimeMs() {
return this.lifetimeMs;
}

@Override
public String principalName() {
return null;
}

@Override
public Long startTimeMs() {
return null;
}
}
8 changes: 4 additions & 4 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ data.mongodb.schema-name=FSE_GTW
#######################################
# KAFKA
#######################################
kafka.bootstrap-servers=localhost:29092
kafka.bootstrap-servers=localhost:9092
spring.sleuth.messaging.kafka.enabled=true
kafka.properties.security.protocol=
kafka.properties.sasl.mechanism=
kafka.properties.sasl.jaas.config=
kafka.properties.security.protocol=SASL_SSL
kafka.properties.sasl.mechanism=OAUTHBEARER
kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
kafka.properties.ssl.truststore.location=
kafka.properties.ssl.truststore.password=

Expand Down
Loading

0 comments on commit ad05625

Please sign in to comment.