Skip to content

Commit

Permalink
Acknowledge offset
Browse files Browse the repository at this point in the history
  • Loading branch information
thburnett committed Dec 19, 2024
1 parent a37fd71 commit e158fc1
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
9 changes: 6 additions & 3 deletions ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
import net.logstash.logback.marker.Markers
Expand Down Expand Up @@ -95,8 +94,12 @@ fun main() = SuspendApp {
KafkaReceiver(receiverSettings)
.receive(kafkaConfig.incomingSignalTopic)
.take(10)
.map { it.key() to it.value() }
.collect(signalProcessor::processSignal)
.collect { record ->
signalProcessor.processSignal(record.key(), record.value())
record.offset.acknowledge().also {
log.debug("Acknowledged topic offset ${record.offset()}")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import no.nav.emottak.ebms.log

class SignalProcessor {

fun processSignal(signal: Pair<String, ByteArray>) {
log.info("Got signal with reference <${signal.first}> and content: ${String(signal.second)}")
fun processSignal(reference: String, content: ByteArray) {
log.info("Got signal with reference <$reference> and content: ${String(content)}")
}
}

0 comments on commit e158fc1

Please sign in to comment.