Skip to content

Commit

Permalink
Gjort KafkaClien singeltone
Browse files Browse the repository at this point in the history
  • Loading branch information
OleksandrChmyrNAV committed Dec 4, 2024
1 parent 39bba0d commit e818317
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
8 changes: 3 additions & 5 deletions ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import net.logstash.logback.marker.Markers
import no.nav.emottak.constants.SMTPHeaders
import no.nav.emottak.ebms.kafka.KafkaClient
import no.nav.emottak.ebms.kafka.kafkaClientObject
import no.nav.emottak.ebms.processing.ProcessingService
import no.nav.emottak.ebms.sendin.SendInService
import no.nav.emottak.ebms.validation.DokumentValidator
Expand Down Expand Up @@ -53,8 +53,7 @@ fun Application.ebmsProviderModule() {
val validator = DokumentValidator(cpaClient)

val processingClient = PayloadProcessingClient(scopedAuthHttpClient(EBMS_PAYLOAD_SCOPE))
val kafkaClient = KafkaClient()
val processing = ProcessingService(processingClient, kafkaClient)
val processing = ProcessingService(processingClient)

val sendInClient = SendInClient(scopedAuthHttpClient(EBMS_SEND_IN_SCOPE))
val sendInService = SendInService(sendInClient)
Expand All @@ -74,8 +73,7 @@ fun Application.ebmsProviderModule() {
get("/kafkatest") {
log.debug("Kafka test: start")
try {
val client = KafkaClient()
val consumer = client.createConsumer()
val consumer = kafkaClientObject.createConsumer()
val topic = "ebxml-acknowledgments"

consumer.subscribe(listOf(topic))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.*

val kafkaClientObject = KafkaClient()

class KafkaClient {

private val kafkaBrokers = getEnvVar("KAFKA_BROKERS", "http://localhost:9092")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import no.nav.emottak.ebms.PayloadProcessingClient
import no.nav.emottak.ebms.kafka.KafkaClient
import no.nav.emottak.ebms.kafka.kafkaClientObject
import no.nav.emottak.ebms.log
import no.nav.emottak.ebms.logger
import no.nav.emottak.ebms.util.marker
Expand All @@ -24,7 +24,7 @@ import no.nav.emottak.message.model.PayloadResponse
import no.nav.emottak.util.getEnvVar
import org.apache.kafka.clients.producer.ProducerRecord

class ProcessingService(private val httpClient: PayloadProcessingClient, private val kafkaClient: KafkaClient) {
class ProcessingService(private val httpClient: PayloadProcessingClient) {

private suspend fun processMessage(
payloadMessage: PayloadMessage,
Expand Down Expand Up @@ -89,7 +89,7 @@ class ProcessingService(private val httpClient: PayloadProcessingClient, private
log.debug("Sending acknowledgment to queue")
log.debug("Acknowledgment document: {}", acknowledgment.dokument.toString())
acknowledgment.dokument.toString()
val kafkaProducer = kafkaClient.createProducer()
val kafkaProducer = kafkaClientObject.createProducer()
val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "emottak-acknowledgments")
kafkaProducer.send(
ProducerRecord(topic, acknowledgment.messageId, acknowledgment.toEbmsDokument().toString())
Expand Down

0 comments on commit e818317

Please sign in to comment.