From e158fc174b3d36bc843a6f31d25ec41252af1229 Mon Sep 17 00:00:00 2001 From: Thomas Burnett Date: Thu, 19 Dec 2024 14:19:04 +0100 Subject: [PATCH] Acknowledge offset --- ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt | 9 ++++++--- .../no/nav/emottak/ebms/processing/SignalProcessor.kt | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt index b1bde453..13cf7ef3 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt @@ -30,7 +30,6 @@ import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.awaitCancellation -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.take import kotlinx.coroutines.launch import net.logstash.logback.marker.Markers @@ -95,8 +94,12 @@ fun main() = SuspendApp { KafkaReceiver(receiverSettings) .receive(kafkaConfig.incomingSignalTopic) .take(10) - .map { it.key() to it.value() } - .collect(signalProcessor::processSignal) + .collect { record -> + signalProcessor.processSignal(record.key(), record.value()) + record.offset.acknowledge().also { + log.debug("Acknowledged topic offset ${record.offset()}") + } + } } } diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/SignalProcessor.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/SignalProcessor.kt index 76ed779e..bca516da 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/SignalProcessor.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/SignalProcessor.kt @@ -4,7 +4,7 @@ import no.nav.emottak.ebms.log class SignalProcessor { - fun processSignal(signal: Pair) { - log.info("Got signal with reference <${signal.first}> and content: ${String(signal.second)}") + fun processSignal(reference: String, content: ByteArray) { + log.info("Got signal with reference <$reference> and content: ${String(content)}") } }