From 8afb6876ca1ee0c3cb6d73fd2be76177bff6d77a Mon Sep 17 00:00:00 2001 From: Oleksandr Chmyr Date: Tue, 17 Dec 2024 10:21:01 +0100 Subject: [PATCH] Flyttet sending av acknowledgments til Kafka fra ProcessingService til Routes. Lagt til KafkaTestContainer --- ebms-provider/build.gradle.kts | 1 + .../main/kotlin/no/nav/emottak/ebms/Routes.kt | 24 +++++++++++-- .../no/nav/emottak/ebms/kafka/KafkaClient.kt | 34 +++++++++++-------- .../ebms/processing/ProcessingService.kt | 20 ----------- .../no/nav/emottak/ebms/EbmsRouteAsyncIT.kt | 22 ++++++++++++ .../emottak/ebms/kafka/KafkaTestContainer.kt | 33 ++++++++++++++++++ 6 files changed, 98 insertions(+), 36 deletions(-) create mode 100644 ebms-provider/src/test/kotlin/no/nav/emottak/ebms/kafka/KafkaTestContainer.kt diff --git a/ebms-provider/build.gradle.kts b/ebms-provider/build.gradle.kts index ad9385eb..55bc4760 100644 --- a/ebms-provider/build.gradle.kts +++ b/ebms-provider/build.gradle.kts @@ -74,6 +74,7 @@ dependencies { implementation(libs.token.validation.ktor.v2) implementation(testLibs.postgresql) implementation("org.apache.kafka:kafka-clients:3.9.0") + testImplementation("org.testcontainers:kafka:1.19.0") testImplementation(testLibs.mock.oauth2.server) testImplementation(testLibs.ktor.server.test.host) testImplementation(testLibs.junit.jupiter.api) diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/Routes.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/Routes.kt index 607057f7..79812ccc 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/Routes.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/Routes.kt @@ -14,6 +14,7 @@ import io.ktor.server.routing.post import io.micrometer.prometheus.PrometheusMeterRegistry import kotlinx.serialization.Serializable import no.nav.emottak.constants.SMTPHeaders +import no.nav.emottak.ebms.kafka.kafkaClientObject import no.nav.emottak.ebms.model.signer import no.nav.emottak.ebms.processing.ProcessingService import no.nav.emottak.ebms.sendin.SendInService @@ -30,8 +31,10 @@ import no.nav.emottak.message.model.Payload import no.nav.emottak.message.model.PayloadMessage import no.nav.emottak.message.model.PayloadProcessing import no.nav.emottak.message.model.SignatureDetails +import no.nav.emottak.util.getEnvVar import no.nav.emottak.util.marker import no.nav.emottak.util.retrieveLoggableHeaderPairs +import org.apache.kafka.clients.producer.ProducerRecord import java.util.UUID data class PackageRequest( @@ -288,8 +291,25 @@ fun Route.postEbmsAsync(validator: DokumentValidator, processingService: Process return@post } log.info(ebMSDocument.messageHeader().marker(), "Payload Processed, Generating Acknowledgement...") - ebmsMessage.createAcknowledgment().toEbmsDokument().also { - call.respondEbmsDokument(it) + ebmsMessage.createAcknowledgment().also { + try { + log.debug("Kafka test: Sending acknowledgment to queue") + log.debug("Kafka test: Acknowledgment document: {}", it.dokument.toString()) + + val kafkaProducer = kafkaClientObject.createProducer() + val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "team-emottak.smtp.out.ebxml.signal") + log.debug("Kafka test: Acknowledgment topic: {}", topic) + kafkaProducer.send( + ProducerRecord(topic, it.messageId, it.toEbmsDokument().toString()) + ) + kafkaProducer.flush() + kafkaProducer.close() + } catch (e: Exception) { + log.error("Kafka test: Exception while sending acknowledgment to queue", e) + } + log.debug("Kafka test: Acknowledgment sent to queue") + + call.respondEbmsDokument(it.toEbmsDokument()) return@post } } catch (ex: EbmsException) { diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/kafka/KafkaClient.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/kafka/KafkaClient.kt index 11ba483b..19767b51 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/kafka/KafkaClient.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/kafka/KafkaClient.kt @@ -14,6 +14,8 @@ val kafkaClientObject = KafkaClient() class KafkaClient { + val cluster = getEnvVar("NAIS_CLUSTER_NAME", "local") + private val kafkaBrokers = getEnvVar("KAFKA_BROKERS", "http://localhost:9092") private val keystoreLocation = getEnvVar("KAFKA_KEYSTORE_PATH", "") private val keystorePassword = getEnvVar("KAFKA_CREDSTORE_PASSWORD", "") @@ -28,13 +30,15 @@ class KafkaClient { put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name) // Authentication - put("security.protocol", "SSL") - put("ssl.keystore.type", "PKCS12") - put("ssl.keystore.location", keystoreLocation) - put("ssl.keystore.password", keystorePassword) - put("ssl.truststore.type", "JKS") - put("ssl.truststore.location", truststoreLocation) - put("ssl.truststore.password", truststorePassword) + if (cluster in setOf("dev-fss", "prod-fss")) { + put("security.protocol", "SSL") + put("ssl.keystore.type", "PKCS12") + put("ssl.keystore.location", keystoreLocation) + put("ssl.keystore.password", keystorePassword) + put("ssl.truststore.type", "JKS") + put("ssl.truststore.location", truststoreLocation) + put("ssl.truststore.password", truststorePassword) + } // Performance put(ProducerConfig.BUFFER_MEMORY_CONFIG, "16777216") @@ -55,13 +59,15 @@ class KafkaClient { // put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") // Authentication - put("security.protocol", "SSL") - put("ssl.keystore.type", "PKCS12") - put("ssl.keystore.location", keystoreLocation) - put("ssl.keystore.password", keystorePassword) - put("ssl.truststore.type", "JKS") - put("ssl.truststore.location", truststoreLocation) - put("ssl.truststore.password", truststorePassword) + if (cluster in setOf("dev-fss", "prod-fss")) { + put("security.protocol", "SSL") + put("ssl.keystore.type", "PKCS12") + put("ssl.keystore.location", keystoreLocation) + put("ssl.keystore.password", keystorePassword) + put("ssl.truststore.type", "JKS") + put("ssl.truststore.location", truststoreLocation) + put("ssl.truststore.password", truststorePassword) + } // Performance put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10") diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/ProcessingService.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/ProcessingService.kt index 10e9f0f0..d2b6864a 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/ProcessingService.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/ProcessingService.kt @@ -6,8 +6,6 @@ import io.ktor.http.HttpStatusCode import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import no.nav.emottak.ebms.PayloadProcessingClient -import no.nav.emottak.ebms.kafka.kafkaClientObject -import no.nav.emottak.ebms.log import no.nav.emottak.ebms.logger import no.nav.emottak.ebms.util.marker import no.nav.emottak.melding.feil.EbmsException @@ -21,8 +19,6 @@ import no.nav.emottak.message.model.PayloadMessage import no.nav.emottak.message.model.PayloadProcessing import no.nav.emottak.message.model.PayloadRequest import no.nav.emottak.message.model.PayloadResponse -import no.nav.emottak.util.getEnvVar -import org.apache.kafka.clients.producer.ProducerRecord class ProcessingService(private val httpClient: PayloadProcessingClient) { @@ -85,22 +81,6 @@ class ProcessingService(private val httpClient: PayloadProcessingClient) { } private fun acknowledgment(acknowledgment: Acknowledgment) { - try { - log.debug("Kafka test: Sending acknowledgment to queue") - log.debug("Kafka test: Acknowledgment document: {}", acknowledgment.dokument.toString()) - acknowledgment.dokument.toString() - val kafkaProducer = kafkaClientObject.createProducer() - val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "team-emottak.smtp.out.ebxml.signal") - log.debug("Kafka test: Acknowledgment topic: {}", topic) - kafkaProducer.send( - ProducerRecord(topic, acknowledgment.messageId, acknowledgment.toEbmsDokument().toString()) - ) - kafkaProducer.flush() - kafkaProducer.close() - } catch (e: Exception) { - log.error("Kafka test: Exception while sending acknowledgment to queue", e) - } - log.debug("Kafka test: Acknowledgment sent to queue") } private fun fail(fail: EbmsFail) { diff --git a/ebms-provider/src/test/kotlin/no/nav/emottak/ebms/EbmsRouteAsyncIT.kt b/ebms-provider/src/test/kotlin/no/nav/emottak/ebms/EbmsRouteAsyncIT.kt index 568aa48c..f95a711a 100644 --- a/ebms-provider/src/test/kotlin/no/nav/emottak/ebms/EbmsRouteAsyncIT.kt +++ b/ebms-provider/src/test/kotlin/no/nav/emottak/ebms/EbmsRouteAsyncIT.kt @@ -9,13 +9,17 @@ import io.mockk.coVerify import io.mockk.every import io.mockk.mockkObject import no.nav.emottak.constants.SMTPHeaders +import no.nav.emottak.ebms.kafka.KafkaTestContainer import no.nav.emottak.ebms.validation.MimeHeaders import no.nav.emottak.ebms.validation.SignaturValidator import no.nav.emottak.message.ebxml.acknowledgment import no.nav.emottak.message.ebxml.messageHeader import no.nav.emottak.message.xml.xmlMarshaller +import no.nav.emottak.util.getEnvVar +import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.xmlsoap.schemas.soap.envelope.Envelope @@ -211,4 +215,22 @@ Pg==""" }, """PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4NCjxuczU6RW52ZWxvcGUgeG1sbnM6bnM1PSJodHRwOi8vc2NoZW1hcy54bWxzb2FwLm9yZy9zb2FwL2VudmVsb3BlLyIgeG1sbnM6bnMxPSJodHRwOi8vd3d3Lm9hc2lzLW9wZW4ub3JnL2NvbW1pdHRlZXMvZWJ4bWwtY3BwYS9zY2hlbWEvY3BwLWNwYS0yXzAueHNkIiB4bWxuczpuczI9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkveGxpbmsiIHhtbG5zOm5zMz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC8wOS94bWxkc2lnIyIgeG1sbnM6bnM0PSJodHRwOi8vd3d3Lm9hc2lzLW9wZW4ub3JnL2NvbW1pdHRlZXMvZWJ4bWwtbXNnL3NjaGVtYS9tc2ctaGVhZGVyLTJfMC54c2QiIHhtbG5zOm5zNz0iaHR0cDovL3d3dy53My5vcmcvMjAwOS94bWxkc2lnMTEjIj4NCiAgICA8bnM1OkhlYWRlcj4NCiAgICAgICAgPG5zNDpNZXNzYWdlSGVhZGVyPg0KICAgICAgICAgICAgPG5zNDpGcm9tPg0KICAgICAgICAgICAgICAgIDxuczQ6UGFydHlJZCBuczQ6dHlwZT0iSEVSIj44MTQxMjUzPC9uczQ6UGFydHlJZD4NCiAgICAgICAgICAgICAgICA8bnM0OlJvbGU+RVJST1JfUkVTUE9OREVSPC9uczQ6Um9sZT4NCiAgICAgICAgICAgIDwvbnM0OkZyb20+DQogICAgICAgICAgICA8bnM0OlRvPg0KICAgICAgICAgICAgICAgIDxuczQ6UGFydHlJZCBuczQ6dHlwZT0iSEVSIj43OTc2ODwvbnM0OlBhcnR5SWQ+DQogICAgICAgICAgICAgICAgPG5zNDpSb2xlPkVSUk9SX1JFQ0VJVkVSPC9uczQ6Um9sZT4NCiAgICAgICAgICAgIDwvbnM0OlRvPg0KICAgICAgICAgICAgPG5zNDpDUEFJZD5uYXY6cWFzczozNTA2NTwvbnM0OkNQQUlkPg0KICAgICAgICAgICAgPG5zNDpDb252ZXJzYXRpb25JZD5iZTE5MmQzYS0zNGI1LTQ0OGEtYTM3NC01ZWFiMDUyNGM3NGQ8L25zNDpDb252ZXJzYXRpb25JZD4NCiAgICAgICAgICAgIDxuczQ6U2VydmljZT51cm46b2FzaXM6bmFtZXM6dGM6ZWJ4bWwtbXNnOnNlcnZpY2U8L25zNDpTZXJ2aWNlPg0KICAgICAgICAgICAgPG5zNDpBY3Rpb24+TWVzc2FnZUVycm9yPC9uczQ6QWN0aW9uPg0KICAgICAgICAgICAgPG5zNDpNZXNzYWdlRGF0YT4NCiAgICAgICAgICAgICAgICA8bnM0Ok1lc3NhZ2VJZD43MTA0YWNmOC0yMWU5LTRlZTctYjg5NC1kNDEzYTAwYTg4ODFfUkVTUE9OU0VfUkVTUE9OU0U8L25zNDpNZXNzYWdlSWQ+DQogICAgICAgICAgICAgICAgPG5zNDpUaW1lc3RhbXA+MjAyMy0xMS0xNFQwOTo1OTowMi40NjErMDE6MDA8L25zNDpUaW1lc3RhbXA+DQogICAgICAgICAgICAgICAgPG5zNDpSZWZUb01lc3NhZ2VJZD43MTA0YWNmOC0yMWU5LTRlZTctYjg5NC1kNDEzYTAwYTg4ODFfUkVTUE9OU0U8L25zNDpSZWZUb01lc3NhZ2VJZD4NCiAgICAgICAgICAgIDwvbnM0Ok1lc3NhZ2VEYXRhPg0KICAgICAgICA8L25zNDpNZXNzYWdlSGVhZGVyPg0KICAgICAgICA8bnM0OkVycm9yTGlzdCBuczQ6aGlnaGVzdFNldmVyaXR5PSJFcnJvciIgbnM0OnZlcnNpb249IjIuMCIgbnM1Om11c3RVbmRlcnN0YW5kPSIxIj4NCiAgICAgICAgICAgIDxuczQ6RXJyb3IgbnM0OmVycm9yQ29kZT0iU2VjdXJpdHlGYWlsdXJlIiBuczQ6aWQ9IkVSUk9SX0lEIiBuczQ6c2V2ZXJpdHk9IkVycm9yIj4NCiAgICAgICAgICAgICAgICA8bnM0OkRlc2NyaXB0aW9uIHhtbDpsYW5nPSJubyI+RmVpbCBzaWduYXR1cmU8L25zNDpEZXNjcmlwdGlvbj4NCiAgICAgICAgICAgIDwvbnM0OkVycm9yPg0KICAgICAgICA8L25zNDpFcnJvckxpc3Q+DQogICAgPC9uczU6SGVhZGVyPg0KPC9uczU6RW52ZWxvcGU+""" ) + + companion object { + @JvmStatic + @BeforeAll + fun setup() { + System.setProperty("KAFKA_BROKERS", KafkaTestContainer.bootstrapServers) + KafkaTestContainer.start() + + val topicName = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "team-emottak.smtp.out.ebxml.signal") + KafkaTestContainer.createTopic(topicName) + } + + @JvmStatic + @AfterAll + fun tearDown() { + KafkaTestContainer.stop() + } + } } diff --git a/ebms-provider/src/test/kotlin/no/nav/emottak/ebms/kafka/KafkaTestContainer.kt b/ebms-provider/src/test/kotlin/no/nav/emottak/ebms/kafka/KafkaTestContainer.kt new file mode 100644 index 00000000..df1ab03a --- /dev/null +++ b/ebms-provider/src/test/kotlin/no/nav/emottak/ebms/kafka/KafkaTestContainer.kt @@ -0,0 +1,33 @@ +package no.nav.emottak.ebms.kafka + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName + +object KafkaTestContainer { + private val kafkaContainer: KafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")) + + val bootstrapServers: String + get() = kafkaContainer.bootstrapServers + + fun start() { + kafkaContainer.start() + } + + fun stop() { + kafkaContainer.stop() + } + + fun createTopic(topicName: String, partitions: Int = 1, replicationFactor: Short = 1) { + val config = mapOf( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers + ) + + AdminClient.create(config).use { adminClient -> + val newTopic = NewTopic(topicName, partitions, replicationFactor) + adminClient.createTopics(listOf(newTopic)).all().get() + } + } +}