From 59daed243e790818e10bc950eed98f195f1fa6a1 Mon Sep 17 00:00:00 2001 From: vincenzo-ingenito Date: Wed, 11 Dec 2024 15:38:38 +0000 Subject: [PATCH] feat: Eventhub --- pom.xml | 12 ++ .../config/kafka/KafkaConsumerCFG.java | 30 +++-- .../kafka/KafkaConsumerPropertiesCFG.java | 31 +---- .../config/kafka/KafkaProducerCFG.java | 32 ++++-- .../kafka/KafkaProducerPropertiesCFG.java | 34 ------ .../config/kafka/KafkaPropertiesCFG.java | 60 ++++++++++ .../CustomAuthenticateCallbackHandler.java | 107 ++++++++++++++++++ .../kafka/oauth2/OAuthBearerTokenImp.java | 41 +++++++ .../service/IKafkaReceiverSRV.java | 2 +- .../service/impl/KafkaReceiverSRV.java | 12 +- .../statusmanager/utility/FileUtility.java | 81 +++++++++++++ src/main/resources/application.properties | 6 +- 12 files changed, 354 insertions(+), 94 deletions(-) create mode 100644 src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaPropertiesCFG.java create mode 100644 src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/oauth2/CustomAuthenticateCallbackHandler.java create mode 100644 src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/oauth2/OAuthBearerTokenImp.java create mode 100644 src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/utility/FileUtility.java diff --git a/pom.xml b/pom.xml index 75b7204..a52e87c 100644 --- a/pom.xml +++ b/pom.xml @@ -163,6 +163,18 @@ + + + com.microsoft.azure + msal4j + 1.17.3 + + + com.microsoft.azure + azure-client-authentication + 1.6.15 + compile + diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaConsumerCFG.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaConsumerCFG.java index 63efa24..6bb00a4 100644 --- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaConsumerCFG.java +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaConsumerCFG.java @@ -32,6 +32,7 @@ import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; +import it.finanze.sanita.fse2.ms.gtw.statusmanager.config.kafka.oauth2.CustomAuthenticateCallbackHandler; import it.finanze.sanita.fse2.ms.gtw.statusmanager.utility.StringUtility; import lombok.extern.slf4j.Slf4j; @@ -47,6 +48,10 @@ public class KafkaConsumerCFG { @Autowired private KafkaTopicCFG topicCFG; + + @Autowired + private KafkaPropertiesCFG kafkaPropsCfg; + /** * Configurazione consumer. @@ -58,7 +63,7 @@ public Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaConsumerPropCFG.getClientId()); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerPropCFG.getConsumerBootstrapServers()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropsCfg.getBootstrapServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerPropCFG.getConsumerGroupId()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConsumerPropCFG.getConsumerKeyDeserializer()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConsumerPropCFG.getConsumerValueDeserializer()); @@ -70,21 +75,28 @@ public Map consumerConfigs() { props.put("security.protocol", kafkaConsumerPropCFG.getProtocol()); } - if(!StringUtility.isNullOrEmpty(kafkaConsumerPropCFG.getMechanism())) { - props.put("sasl.mechanism", kafkaConsumerPropCFG.getMechanism()); + if(!StringUtility.isNullOrEmpty(kafkaPropsCfg.getMechanism())) { + props.put("sasl.mechanism", kafkaPropsCfg.getMechanism()); } - if(!StringUtility.isNullOrEmpty(kafkaConsumerPropCFG.getConfigJaas())) { - props.put("sasl.jaas.config", kafkaConsumerPropCFG.getConfigJaas()); + if(!StringUtility.isNullOrEmpty(kafkaPropsCfg.getConfigJaas())) { + props.put("sasl.jaas.config", kafkaPropsCfg.getConfigJaas()); } - if(!StringUtility.isNullOrEmpty(kafkaConsumerPropCFG.getTrustoreLocation())) { - props.put("ssl.truststore.location", kafkaConsumerPropCFG.getTrustoreLocation()); + if (!StringUtility.isNullOrEmpty(kafkaPropsCfg.getTrustoreLocation())) { + props.put("ssl.truststore.location", kafkaPropsCfg.getTrustoreLocation()); + } + if (kafkaPropsCfg.getTrustorePassword() != null && kafkaPropsCfg.getTrustorePassword().length > 0) { + props.put("ssl.truststore.password", String.valueOf(kafkaPropsCfg.getTrustorePassword())); } - if(!StringUtility.isNullOrEmpty(String.valueOf(kafkaConsumerPropCFG.getTrustorePassword()))) { - props.put("ssl.truststore.password", String.valueOf(kafkaConsumerPropCFG.getTrustorePassword())); + if("OAUTHBEARER".equals(kafkaPropsCfg.getMechanism())) { + props.put("sasl.login.callback.handler.class", CustomAuthenticateCallbackHandler.class); + props.put("kafka.oauth.tenantId", kafkaPropsCfg.getTenantId()); + props.put("kafka.oauth.clientId", kafkaPropsCfg.getClientId()); + props.put("kafka.oauth.pwd", kafkaPropsCfg.getPwd()); } + return props; } diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaConsumerPropertiesCFG.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaConsumerPropertiesCFG.java index 774e9e0..43ac877 100644 --- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaConsumerPropertiesCFG.java +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaConsumerPropertiesCFG.java @@ -49,12 +49,6 @@ public class KafkaConsumerPropertiesCFG { @Value("${kafka.consumer.value-deserializer}") private String consumerValueDeserializer; - /** - * Consumer bootstrap server. - */ - @Value("${kafka.consumer.bootstrap-servers}") - private String consumerBootstrapServers; - /** * Isolation level. */ @@ -85,28 +79,5 @@ public class KafkaConsumerPropertiesCFG { @Value("${kafka.properties.security.protocol}") private String protocol; - /** - * Meccanismo. - */ - @Value("${kafka.properties.sasl.mechanism}") - private String mechanism; - - /** - * Config jaas. - */ - @Value("${kafka.properties.sasl.jaas.config}") - private String configJaas; - - /** - * Truststore location. - */ - @Value("${kafka.properties.ssl.truststore.location}") - private String trustoreLocation; - - /** - * Truststore password. - */ - @Value("${kafka.properties.ssl.truststore.password}") - private transient char[] trustorePassword; - + } diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaProducerCFG.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaProducerCFG.java index 5bdacc0..a4b851e 100644 --- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaProducerCFG.java +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaProducerCFG.java @@ -23,6 +23,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import it.finanze.sanita.fse2.ms.gtw.statusmanager.config.kafka.oauth2.CustomAuthenticateCallbackHandler; import it.finanze.sanita.fse2.ms.gtw.statusmanager.utility.StringUtility; @Configuration @@ -34,6 +35,8 @@ public class KafkaProducerCFG { @Autowired private KafkaProducerPropertiesCFG kafkaProducerPropCFG; + @Autowired + private KafkaPropertiesCFG kafkaPropCFG; /** * Genera configurazione senza transazione. @@ -46,29 +49,36 @@ public Map producerWithoutTransactionConfigs() { props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerPropCFG.getClientId() + "-noTx"); props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerPropCFG.getProducerRetries()); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerPropCFG.getProducerBootstrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropCFG.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getProducerKeySerializer()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerPropCFG.getProducerValueSerializer()); - if(!StringUtility.isNullOrEmpty(kafkaProducerPropCFG.getProtocol())) { - props.put("security.protocol", kafkaProducerPropCFG.getProtocol()); + if(!StringUtility.isNullOrEmpty(kafkaPropCFG.getProtocol())) { + props.put("security.protocol", kafkaPropCFG.getProtocol()); } - if(!StringUtility.isNullOrEmpty(kafkaProducerPropCFG.getMechanism())) { - props.put("sasl.mechanism", kafkaProducerPropCFG.getMechanism()); + if(!StringUtility.isNullOrEmpty(kafkaPropCFG.getMechanism())) { + props.put("sasl.mechanism", kafkaPropCFG.getMechanism()); } - if(!StringUtility.isNullOrEmpty(kafkaProducerPropCFG.getConfigJaas())) { - props.put("sasl.jaas.config", kafkaProducerPropCFG.getConfigJaas()); + if(!StringUtility.isNullOrEmpty(kafkaPropCFG.getConfigJaas())) { + props.put("sasl.jaas.config", kafkaPropCFG.getConfigJaas()); } - if(!StringUtility.isNullOrEmpty(kafkaProducerPropCFG.getTrustoreLocation())) { - props.put("ssl.truststore.location", kafkaProducerPropCFG.getTrustoreLocation()); + if (!StringUtility.isNullOrEmpty(kafkaPropCFG.getTrustoreLocation())) { + props.put("ssl.truststore.location", kafkaPropCFG.getTrustoreLocation()); + } + if (kafkaPropCFG.getTrustorePassword() != null && kafkaPropCFG.getTrustorePassword().length > 0) { + props.put("ssl.truststore.password", String.valueOf(kafkaPropCFG.getTrustorePassword())); } - if(!StringUtility.isNullOrEmpty(String.valueOf(kafkaProducerPropCFG.getTrustorePassword()))) { - props.put("ssl.truststore.password", String.valueOf(kafkaProducerPropCFG.getTrustorePassword())); + if("OAUTHBEARER".equals(kafkaPropCFG.getMechanism())) { + props.put("sasl.login.callback.handler.class", CustomAuthenticateCallbackHandler.class); + props.put("kafka.oauth.tenantId", kafkaPropCFG.getTenantId()); + props.put("kafka.oauth.clientId", kafkaPropCFG.getClientId()); + props.put("kafka.oauth.pwd", kafkaPropCFG.getPwd()); } + return props; } diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaProducerPropertiesCFG.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaProducerPropertiesCFG.java index a9cd288..f5c628b 100644 --- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaProducerPropertiesCFG.java +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaProducerPropertiesCFG.java @@ -47,11 +47,6 @@ public class KafkaProducerPropertiesCFG { @Value("${kafka.producer.value-serializer}") private String producerValueSerializer; - /** - * Producer bootstrap server. - */ - @Value("${kafka.producer.bootstrap-servers}") - private String producerBootstrapServers; /** * Producer trans id. @@ -71,34 +66,5 @@ public class KafkaProducerPropertiesCFG { @Value("${kafka.producer.ack}") private String producerACK; - /** - * Protocol. - */ - @Value("${kafka.properties.security.protocol}") - private String protocol; - /** - * Meccanismo. - */ - @Value("${kafka.properties.sasl.mechanism}") - private String mechanism; - - /** - * Config jass. - */ - @Value("${kafka.properties.sasl.jaas.config}") - private String configJaas; - - /** - * Posizione trust store. - */ - @Value("${kafka.properties.ssl.truststore.location}") - private String trustoreLocation; - - /** - * Password trust store. - */ - @Value("${kafka.properties.ssl.truststore.password}") - private char[] trustorePassword; - } diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaPropertiesCFG.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaPropertiesCFG.java new file mode 100644 index 0000000..84f25a3 --- /dev/null +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/KafkaPropertiesCFG.java @@ -0,0 +1,60 @@ +package it.finanze.sanita.fse2.ms.gtw.statusmanager.config.kafka; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +@Configuration +@Data +public class KafkaPropertiesCFG { + + /** + * Producer bootstrap server. + */ + @Value("${kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${kafka.oauth.tenantId}") + private String tenantId; + + @Value("${kafka.oauth.clientId}") + private String clientId; + + @Value("${kafka.oauth.pwd}") + private String pwd; + + /** + * Protocol. + */ + @Value("${kafka.properties.security.protocol}") + private String protocol; + + /** + * Meccanismo. + */ + @Value("${kafka.properties.sasl.mechanism}") + private String mechanism; + + /** + * Config jass. + */ + @Value("${kafka.properties.sasl.jaas.config}") + private String configJaas; + + /** + * Posizione trust store. + */ + @Value("${kafka.properties.ssl.truststore.location}") + private String trustoreLocation; + + /** + * Password trust store. + */ + @Value("${kafka.properties.ssl.truststore.password}") + private char[] trustorePassword; + +} diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/oauth2/CustomAuthenticateCallbackHandler.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/oauth2/CustomAuthenticateCallbackHandler.java new file mode 100644 index 0000000..016d15e --- /dev/null +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/oauth2/CustomAuthenticateCallbackHandler.java @@ -0,0 +1,107 @@ +//Copyright (c) Microsoft Corporation. All rights reserved. +//Licensed under the MIT License. + +package it.finanze.sanita.fse2.ms.gtw.statusmanager.config.kafka.oauth2; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; + +import com.microsoft.aad.msal4j.ClientCredentialFactory; +import com.microsoft.aad.msal4j.ClientCredentialParameters; +import com.microsoft.aad.msal4j.ConfidentialClientApplication; +import com.microsoft.aad.msal4j.IAuthenticationResult; +import com.microsoft.aad.msal4j.IClientCredential; + +import it.finanze.sanita.fse2.ms.gtw.statusmanager.utility.FileUtility; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler { + + private String tenantId; + + private String clientId; + + private String pwd; + + private ConfidentialClientApplication aadClient; + private ClientCredentialParameters aadParameters; + + @Override + public void configure(Map configs, String mechanism, List jaasConfigEntries) { + String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString(); + + bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", ""); + URI uri = URI.create("https://" + bootstrapServer); + String sbUri = uri.getScheme() + "://" + uri.getHost(); + this.aadParameters = + ClientCredentialParameters.builder(Collections.singleton(sbUri + "/.default")) + .build(); + this.tenantId = "https://login.microsoftonline.com/"+ Arrays.asList(configs.get("kafka.oauth.tenantId")).get(0).toString(); + this.clientId = Arrays.asList(configs.get("kafka.oauth.clientId")).get(0).toString(); + this.pwd = Arrays.asList(configs.get("kafka.oauth.pwd")).get(0).toString(); + + } + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + for (Callback callback: callbacks) { + if (callback instanceof OAuthBearerTokenCallback) { + try { + OAuthBearerToken token = getOAuthBearerToken(); + OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback; + oauthCallback.token(token); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + e.printStackTrace(); + } + } else { + throw new UnsupportedCallbackException(callback); + } + } + } + + private OAuthBearerToken getOAuthBearerToken() throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException { + if (this.aadClient == null) { + synchronized(this) { + if (this.aadClient == null) { + IClientCredential credential = null; + try{ + InputStream certificato = new ByteArrayInputStream(FileUtility.getFileFromInternalResources("client_FSD-SA-0005.pfx")); + credential = ClientCredentialFactory.createFromCertificate(certificato, this.pwd); + }catch(Exception ex) { + System.out.println("Stop"); + } + this.aadClient = ConfidentialClientApplication.builder(this.clientId, credential) + .authority(this.tenantId) + .build(); + } + } + } + + IAuthenticationResult authResult = this.aadClient.acquireToken(this.aadParameters).get(); + log.info("Token oauth2 acquired"); + return new OAuthBearerTokenImp(authResult.accessToken(), authResult.expiresOnDate()); + } + + public void close() throws KafkaException { + // NOOP + } +} \ No newline at end of file diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/oauth2/OAuthBearerTokenImp.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/oauth2/OAuthBearerTokenImp.java new file mode 100644 index 0000000..3260709 --- /dev/null +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/config/kafka/oauth2/OAuthBearerTokenImp.java @@ -0,0 +1,41 @@ +package it.finanze.sanita.fse2.ms.gtw.statusmanager.config.kafka.oauth2; +import java.util.Date; +import java.util.Set; + +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; + + +public class OAuthBearerTokenImp implements OAuthBearerToken { + private String token; + private long lifetimeMs; + + public OAuthBearerTokenImp(final String token, Date expiresOn) { + this.token = token; + this.lifetimeMs = expiresOn.getTime(); + } + + @Override + public String value() { + return this.token; + } + + @Override + public Set scope() { + return null; + } + + @Override + public long lifetimeMs() { + return this.lifetimeMs; + } + + @Override + public String principalName() { + return null; + } + + @Override + public Long startTimeMs() { + return null; + } +} diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/service/IKafkaReceiverSRV.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/service/IKafkaReceiverSRV.java index 073d85d..7752f99 100644 --- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/service/IKafkaReceiverSRV.java +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/service/IKafkaReceiverSRV.java @@ -23,6 +23,6 @@ public interface IKafkaReceiverSRV { */ void listenerGtw(ConsumerRecord cr, MessageHeaders messageHeaders); - void listenerEds(ConsumerRecord cr, MessageHeaders messageHeaders); +// void listenerEds(ConsumerRecord cr, MessageHeaders messageHeaders); } \ No newline at end of file diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/service/impl/KafkaReceiverSRV.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/service/impl/KafkaReceiverSRV.java index 194cd0f..941d9d4 100644 --- a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/service/impl/KafkaReceiverSRV.java +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/service/impl/KafkaReceiverSRV.java @@ -36,12 +36,12 @@ public void listenerGtw(final ConsumerRecord cr, final MessageHe abstractListener(cr); } - @Override - @KafkaListener(topics = "#{'${kafka.statusmanager.eds.topic}'}", clientIdPrefix = "#{'${kafka.client-eds-id}'}", containerFactory = "kafkaListenerDeadLetterContainerFactoryEds", autoStartup = "${event.topic.auto.start}", groupId = "#{'${kafka.consumer.group-id}'}") - public void listenerEds(final ConsumerRecord cr, final MessageHeaders messageHeaders) { - log.info("EDS LISTENER - Consuming transaction event - Message received with key {}", cr.key()); - abstractListener(cr); - } +// @Override +// @KafkaListener(topics = "#{'${kafka.statusmanager.eds.topic}'}", clientIdPrefix = "#{'${kafka.client-eds-id}'}", containerFactory = "kafkaListenerDeadLetterContainerFactoryEds", autoStartup = "${event.topic.auto.start}", groupId = "#{'${kafka.consumer.group-id}'}") +// public void listenerEds(final ConsumerRecord cr, final MessageHeaders messageHeaders) { +// log.info("EDS LISTENER - Consuming transaction event - Message received with key {}", cr.key()); +// abstractListener(cr); +// } private void abstractListener(ConsumerRecord cr) { try { diff --git a/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/utility/FileUtility.java b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/utility/FileUtility.java new file mode 100644 index 0000000..0f69d35 --- /dev/null +++ b/src/main/java/it/finanze/sanita/fse2/ms/gtw/statusmanager/utility/FileUtility.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + * + * Copyright (C) 2023 Ministero della Salute + * + * This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. If not, see . + */ +package it.finanze.sanita.fse2.ms.gtw.statusmanager.utility; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + +import it.finanze.sanita.fse2.ms.gtw.statusmanager.exceptions.BusinessException; +import lombok.extern.slf4j.Slf4j; + +/** + * The Class FileUtils. + * + * Utility to manager file. + */ +@Slf4j +public final class FileUtility { + + /** + * Max size chunk. + */ + private static final int CHUNK_SIZE = 16384; + + /** + * Constructor. + */ + private FileUtility() { + } + + /** + * Metodo per il recupero del contenuto di un file dalla folder interna "/src/main/resources". + * + * @param filename nome del file + * @return contenuto del file + */ + public static byte[] getFileFromInternalResources(final String filename) { + byte[] b = null; + try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(filename)) { + b = getByteFromInputStream(is); + } catch (Exception e) { + log.error("FILE UTILS getFileFromInternalResources(): Errore in fase di recupero del contenuto di un file dalla folder '/src/main/resources'. ", e); + } + return b; + } + + /** + * Recupero contenuto file da input stream. + * + * @param is + * input stream + * @return contenuto file + */ + private static byte[] getByteFromInputStream(final InputStream is) { + byte[] b; + try { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + int nRead; + byte[] data = new byte[CHUNK_SIZE]; + + while ((nRead = is.read(data, 0, data.length)) != -1) { + buffer.write(data, 0, nRead); + } + buffer.flush(); + b = buffer.toByteArray(); + } catch (Exception e) { + log.error("Errore durante il trasform da InputStream a byte[]: ", e); + throw new BusinessException(e); + } + return b; + } + +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index a9974bb..f530389 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -51,7 +51,6 @@ kafka.properties.ssl.truststore.password=${TRUST_JKS_PASSWORD} kafka.client-id=fse-notifier-client kafka.client-eds-id=fse-notifier-client-eds kafka.producer.retries=5 -kafka.producer.bootstrap-servers=${KAFKA_HOST}:${KAFKA_PORT1},${KAFKA_HOST}:${KAFKA_PORT2},${KAFKA_HOST}:${KAFKA_PORT3} kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer kafka.producer.transactional.id=genericLibertyExample.tx. @@ -62,7 +61,6 @@ kafka.producer.ack=all #######KAFKA CONSUMER SETTING################ kafka.consumer.group-id=fse-notifier kafka.consumer.group-id-eds=fse-notifier-eds -kafka.consumer.bootstrap-servers=${KAFKA_HOST}:${KAFKA_PORT1},${KAFKA_HOST}:${KAFKA_PORT2},${KAFKA_HOST}:${KAFKA_PORT3} kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.auto-offset-reset=earliest @@ -74,7 +72,9 @@ kafka.consumer.auto-commit=false event.topic.auto.start=true kafka.statusmanager.topic=MDS-SA-0004_FU_001_STATUS kafka.statusmanager.eds.topic=MDS-SA-0004_FU_002_STATUS - +kafka.oauth.tenantId=${TENANT_ID} +kafka.oauth.appId=${APP_ID} +kafka.oauth.pwd=${PWD} ####### KAFKA DEAD LETTER ##################### kafka.statusmanager.deadletter.topic=MDS-SA-0004_FU_001_STATUS-DLT kafka.statusmanager.eds.deadletter.topic=MDS-SA-0004_FU_002_STATUS-DLT