Skip to content

Commit

Permalink
Flyttet sending av acknowledgments til Kafka fra ProcessingService ti…
Browse files Browse the repository at this point in the history
…l Routes.

Lagt til KafkaTestContainer
  • Loading branch information
OleksandrChmyrNAV committed Dec 17, 2024
1 parent 41ed359 commit 8afb687
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 36 deletions.
1 change: 1 addition & 0 deletions ebms-provider/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ dependencies {
implementation(libs.token.validation.ktor.v2)
implementation(testLibs.postgresql)
implementation("org.apache.kafka:kafka-clients:3.9.0")
testImplementation("org.testcontainers:kafka:1.19.0")
testImplementation(testLibs.mock.oauth2.server)
testImplementation(testLibs.ktor.server.test.host)
testImplementation(testLibs.junit.jupiter.api)
Expand Down
24 changes: 22 additions & 2 deletions ebms-provider/src/main/kotlin/no/nav/emottak/ebms/Routes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.ktor.server.routing.post
import io.micrometer.prometheus.PrometheusMeterRegistry
import kotlinx.serialization.Serializable
import no.nav.emottak.constants.SMTPHeaders
import no.nav.emottak.ebms.kafka.kafkaClientObject
import no.nav.emottak.ebms.model.signer
import no.nav.emottak.ebms.processing.ProcessingService
import no.nav.emottak.ebms.sendin.SendInService
Expand All @@ -30,8 +31,10 @@ import no.nav.emottak.message.model.Payload
import no.nav.emottak.message.model.PayloadMessage
import no.nav.emottak.message.model.PayloadProcessing
import no.nav.emottak.message.model.SignatureDetails
import no.nav.emottak.util.getEnvVar
import no.nav.emottak.util.marker
import no.nav.emottak.util.retrieveLoggableHeaderPairs
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.UUID

data class PackageRequest(
Expand Down Expand Up @@ -288,8 +291,25 @@ fun Route.postEbmsAsync(validator: DokumentValidator, processingService: Process
return@post
}
log.info(ebMSDocument.messageHeader().marker(), "Payload Processed, Generating Acknowledgement...")
ebmsMessage.createAcknowledgment().toEbmsDokument().also {
call.respondEbmsDokument(it)
ebmsMessage.createAcknowledgment().also {
try {
log.debug("Kafka test: Sending acknowledgment to queue")
log.debug("Kafka test: Acknowledgment document: {}", it.dokument.toString())

val kafkaProducer = kafkaClientObject.createProducer()
val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "team-emottak.smtp.out.ebxml.signal")
log.debug("Kafka test: Acknowledgment topic: {}", topic)
kafkaProducer.send(
ProducerRecord(topic, it.messageId, it.toEbmsDokument().toString())
)
kafkaProducer.flush()
kafkaProducer.close()
} catch (e: Exception) {
log.error("Kafka test: Exception while sending acknowledgment to queue", e)
}
log.debug("Kafka test: Acknowledgment sent to queue")

call.respondEbmsDokument(it.toEbmsDokument())
return@post
}
} catch (ex: EbmsException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ val kafkaClientObject = KafkaClient()

class KafkaClient {

val cluster = getEnvVar("NAIS_CLUSTER_NAME", "local")

private val kafkaBrokers = getEnvVar("KAFKA_BROKERS", "http://localhost:9092")
private val keystoreLocation = getEnvVar("KAFKA_KEYSTORE_PATH", "")
private val keystorePassword = getEnvVar("KAFKA_CREDSTORE_PASSWORD", "")
Expand All @@ -28,13 +30,15 @@ class KafkaClient {
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)

// Authentication
put("security.protocol", "SSL")
put("ssl.keystore.type", "PKCS12")
put("ssl.keystore.location", keystoreLocation)
put("ssl.keystore.password", keystorePassword)
put("ssl.truststore.type", "JKS")
put("ssl.truststore.location", truststoreLocation)
put("ssl.truststore.password", truststorePassword)
if (cluster in setOf("dev-fss", "prod-fss")) {
put("security.protocol", "SSL")
put("ssl.keystore.type", "PKCS12")
put("ssl.keystore.location", keystoreLocation)
put("ssl.keystore.password", keystorePassword)
put("ssl.truststore.type", "JKS")
put("ssl.truststore.location", truststoreLocation)
put("ssl.truststore.password", truststorePassword)
}

// Performance
put(ProducerConfig.BUFFER_MEMORY_CONFIG, "16777216")
Expand All @@ -55,13 +59,15 @@ class KafkaClient {
// put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

// Authentication
put("security.protocol", "SSL")
put("ssl.keystore.type", "PKCS12")
put("ssl.keystore.location", keystoreLocation)
put("ssl.keystore.password", keystorePassword)
put("ssl.truststore.type", "JKS")
put("ssl.truststore.location", truststoreLocation)
put("ssl.truststore.password", truststorePassword)
if (cluster in setOf("dev-fss", "prod-fss")) {
put("security.protocol", "SSL")
put("ssl.keystore.type", "PKCS12")
put("ssl.keystore.location", keystoreLocation)
put("ssl.keystore.password", keystorePassword)
put("ssl.truststore.type", "JKS")
put("ssl.truststore.location", truststoreLocation)
put("ssl.truststore.password", truststorePassword)
}

// Performance
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import no.nav.emottak.ebms.PayloadProcessingClient
import no.nav.emottak.ebms.kafka.kafkaClientObject
import no.nav.emottak.ebms.log
import no.nav.emottak.ebms.logger
import no.nav.emottak.ebms.util.marker
import no.nav.emottak.melding.feil.EbmsException
Expand All @@ -21,8 +19,6 @@ import no.nav.emottak.message.model.PayloadMessage
import no.nav.emottak.message.model.PayloadProcessing
import no.nav.emottak.message.model.PayloadRequest
import no.nav.emottak.message.model.PayloadResponse
import no.nav.emottak.util.getEnvVar
import org.apache.kafka.clients.producer.ProducerRecord

class ProcessingService(private val httpClient: PayloadProcessingClient) {

Expand Down Expand Up @@ -85,22 +81,6 @@ class ProcessingService(private val httpClient: PayloadProcessingClient) {
}

private fun acknowledgment(acknowledgment: Acknowledgment) {
try {
log.debug("Kafka test: Sending acknowledgment to queue")
log.debug("Kafka test: Acknowledgment document: {}", acknowledgment.dokument.toString())
acknowledgment.dokument.toString()
val kafkaProducer = kafkaClientObject.createProducer()
val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "team-emottak.smtp.out.ebxml.signal")
log.debug("Kafka test: Acknowledgment topic: {}", topic)
kafkaProducer.send(
ProducerRecord(topic, acknowledgment.messageId, acknowledgment.toEbmsDokument().toString())
)
kafkaProducer.flush()
kafkaProducer.close()
} catch (e: Exception) {
log.error("Kafka test: Exception while sending acknowledgment to queue", e)
}
log.debug("Kafka test: Acknowledgment sent to queue")
}

private fun fail(fail: EbmsFail) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import io.mockk.coVerify
import io.mockk.every
import io.mockk.mockkObject
import no.nav.emottak.constants.SMTPHeaders
import no.nav.emottak.ebms.kafka.KafkaTestContainer
import no.nav.emottak.ebms.validation.MimeHeaders
import no.nav.emottak.ebms.validation.SignaturValidator
import no.nav.emottak.message.ebxml.acknowledgment
import no.nav.emottak.message.ebxml.messageHeader
import no.nav.emottak.message.xml.xmlMarshaller
import no.nav.emottak.util.getEnvVar
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.xmlsoap.schemas.soap.envelope.Envelope

Expand Down Expand Up @@ -211,4 +215,22 @@ Pg=="""
},
"""PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4NCjxuczU6RW52ZWxvcGUgeG1sbnM6bnM1PSJodHRwOi8vc2NoZW1hcy54bWxzb2FwLm9yZy9zb2FwL2VudmVsb3BlLyIgeG1sbnM6bnMxPSJodHRwOi8vd3d3Lm9hc2lzLW9wZW4ub3JnL2NvbW1pdHRlZXMvZWJ4bWwtY3BwYS9zY2hlbWEvY3BwLWNwYS0yXzAueHNkIiB4bWxuczpuczI9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkveGxpbmsiIHhtbG5zOm5zMz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC8wOS94bWxkc2lnIyIgeG1sbnM6bnM0PSJodHRwOi8vd3d3Lm9hc2lzLW9wZW4ub3JnL2NvbW1pdHRlZXMvZWJ4bWwtbXNnL3NjaGVtYS9tc2ctaGVhZGVyLTJfMC54c2QiIHhtbG5zOm5zNz0iaHR0cDovL3d3dy53My5vcmcvMjAwOS94bWxkc2lnMTEjIj4NCiAgICA8bnM1OkhlYWRlcj4NCiAgICAgICAgPG5zNDpNZXNzYWdlSGVhZGVyPg0KICAgICAgICAgICAgPG5zNDpGcm9tPg0KICAgICAgICAgICAgICAgIDxuczQ6UGFydHlJZCBuczQ6dHlwZT0iSEVSIj44MTQxMjUzPC9uczQ6UGFydHlJZD4NCiAgICAgICAgICAgICAgICA8bnM0OlJvbGU+RVJST1JfUkVTUE9OREVSPC9uczQ6Um9sZT4NCiAgICAgICAgICAgIDwvbnM0OkZyb20+DQogICAgICAgICAgICA8bnM0OlRvPg0KICAgICAgICAgICAgICAgIDxuczQ6UGFydHlJZCBuczQ6dHlwZT0iSEVSIj43OTc2ODwvbnM0OlBhcnR5SWQ+DQogICAgICAgICAgICAgICAgPG5zNDpSb2xlPkVSUk9SX1JFQ0VJVkVSPC9uczQ6Um9sZT4NCiAgICAgICAgICAgIDwvbnM0OlRvPg0KICAgICAgICAgICAgPG5zNDpDUEFJZD5uYXY6cWFzczozNTA2NTwvbnM0OkNQQUlkPg0KICAgICAgICAgICAgPG5zNDpDb252ZXJzYXRpb25JZD5iZTE5MmQzYS0zNGI1LTQ0OGEtYTM3NC01ZWFiMDUyNGM3NGQ8L25zNDpDb252ZXJzYXRpb25JZD4NCiAgICAgICAgICAgIDxuczQ6U2VydmljZT51cm46b2FzaXM6bmFtZXM6dGM6ZWJ4bWwtbXNnOnNlcnZpY2U8L25zNDpTZXJ2aWNlPg0KICAgICAgICAgICAgPG5zNDpBY3Rpb24+TWVzc2FnZUVycm9yPC9uczQ6QWN0aW9uPg0KICAgICAgICAgICAgPG5zNDpNZXNzYWdlRGF0YT4NCiAgICAgICAgICAgICAgICA8bnM0Ok1lc3NhZ2VJZD43MTA0YWNmOC0yMWU5LTRlZTctYjg5NC1kNDEzYTAwYTg4ODFfUkVTUE9OU0VfUkVTUE9OU0U8L25zNDpNZXNzYWdlSWQ+DQogICAgICAgICAgICAgICAgPG5zNDpUaW1lc3RhbXA+MjAyMy0xMS0xNFQwOTo1OTowMi40NjErMDE6MDA8L25zNDpUaW1lc3RhbXA+DQogICAgICAgICAgICAgICAgPG5zNDpSZWZUb01lc3NhZ2VJZD43MTA0YWNmOC0yMWU5LTRlZTctYjg5NC1kNDEzYTAwYTg4ODFfUkVTUE9OU0U8L25zNDpSZWZUb01lc3NhZ2VJZD4NCiAgICAgICAgICAgIDwvbnM0Ok1lc3NhZ2VEYXRhPg0KICAgICAgICA8L25zNDpNZXNzYWdlSGVhZGVyPg0KICAgICAgICA8bnM0OkVycm9yTGlzdCBuczQ6aGlnaGVzdFNldmVyaXR5PSJFcnJvciIgbnM0OnZlcnNpb249IjIuMCIgbnM1Om11c3RVbmRlcnN0YW5kPSIxIj4NCiAgICAgICAgICAgIDxuczQ6RXJyb3IgbnM0OmVycm9yQ29kZT0iU2VjdXJpdHlGYWlsdXJlIiBuczQ6aWQ9IkVSUk9SX0lEIiBuczQ6c2V2ZXJpdHk9IkVycm9yIj4NCiAgICAgICAgICAgICAgICA8bnM0OkRlc2NyaXB0aW9uIHhtbDpsYW5nPSJubyI+RmVpbCBzaWduYXR1cmU8L25zNDpEZXNjcmlwdGlvbj4NCiAgICAgICAgICAgIDwvbnM0OkVycm9yPg0KICAgICAgICA8L25zNDpFcnJvckxpc3Q+DQogICAgPC9uczU6SGVhZGVyPg0KPC9uczU6RW52ZWxvcGU+"""
)

companion object {
@JvmStatic
@BeforeAll
fun setup() {
System.setProperty("KAFKA_BROKERS", KafkaTestContainer.bootstrapServers)
KafkaTestContainer.start()

val topicName = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "team-emottak.smtp.out.ebxml.signal")
KafkaTestContainer.createTopic(topicName)
}

@JvmStatic
@AfterAll
fun tearDown() {
KafkaTestContainer.stop()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package no.nav.emottak.ebms.kafka

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName

object KafkaTestContainer {
private val kafkaContainer: KafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"))

val bootstrapServers: String
get() = kafkaContainer.bootstrapServers

fun start() {
kafkaContainer.start()
}

fun stop() {
kafkaContainer.stop()
}

fun createTopic(topicName: String, partitions: Int = 1, replicationFactor: Short = 1) {
val config = mapOf(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers
)

AdminClient.create(config).use { adminClient ->
val newTopic = NewTopic(topicName, partitions, replicationFactor)
adminClient.createTopics(listOf(newTopic)).all().get()
}
}
}

0 comments on commit 8afb687

Please sign in to comment.