diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt index b1bde453..13cf7ef3 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt @@ -30,7 +30,6 @@ import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.awaitCancellation -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.take import kotlinx.coroutines.launch import net.logstash.logback.marker.Markers @@ -95,8 +94,12 @@ fun main() = SuspendApp { KafkaReceiver(receiverSettings) .receive(kafkaConfig.incomingSignalTopic) .take(10) - .map { it.key() to it.value() } - .collect(signalProcessor::processSignal) + .collect { record -> + signalProcessor.processSignal(record.key(), record.value()) + record.offset.acknowledge().also { + log.debug("Acknowledged topic offset ${record.offset()}") + } + } } } diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/SignalProcessor.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/SignalProcessor.kt index 76ed779e..bca516da 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/SignalProcessor.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/SignalProcessor.kt @@ -4,7 +4,7 @@ import no.nav.emottak.ebms.log class SignalProcessor { - fun processSignal(signal: Pair) { - log.info("Got signal with reference <${signal.first}> and content: ${String(signal.second)}") + fun processSignal(reference: String, content: ByteArray) { + log.info("Got signal with reference <$reference> and content: ${String(content)}") } }