diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt index a6b38316..377fa01c 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt @@ -7,6 +7,7 @@ import arrow.continuations.SuspendApp import arrow.continuations.ktor.server import arrow.core.raise.result import arrow.fx.coroutines.resourceScope +import arrow.resilience.Schedule import dev.reformator.stacktracedecoroutinator.runtime.DecoroutinatorRuntime import io.ktor.serialization.kotlinx.json.json import io.ktor.server.application.Application @@ -36,6 +37,7 @@ import no.nav.emottak.util.isProdEnv import org.slf4j.LoggerFactory import java.time.Duration import java.time.Instant +import kotlin.time.Duration.Companion.seconds import kotlin.time.toKotlinDuration val log = LoggerFactory.getLogger("no.nav.emottak.ebms.App") @@ -57,7 +59,8 @@ fun main() = SuspendApp { if (getEnvVar("ASYNC_RECEIVER", "true").toBoolean()) { log.debug("Starting signal message receiver") - SignalReceiver(config.kafka).schedule() + val signalReceiver = SignalReceiver(config.kafka) + scheduleSignalReceiver(signalReceiver) } awaitCancellation() @@ -65,6 +68,11 @@ fun main() = SuspendApp { } } +suspend fun scheduleSignalReceiver(signalReceiver: SignalReceiver, interval: kotlin.time.Duration = 30.seconds) = + Schedule + .spaced(interval) + .repeat(signalReceiver::processMessages) + fun Application.ebmsProviderModule() { val cpaClient = CpaRepoClient(defaultHttpClient()) val validator = DokumentValidator(cpaClient) diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/consumer/SignalReceiver.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/consumer/SignalReceiver.kt index 4b73c2e0..a12c6d35 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/consumer/SignalReceiver.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/consumer/SignalReceiver.kt @@ -1,6 +1,5 @@ package no.nav.emottak.ebms.consumer -import arrow.resilience.Schedule import io.github.nomisRev.kafka.map import io.github.nomisRev.kafka.receiver.AutoOffsetReset import io.github.nomisRev.kafka.receiver.KafkaReceiver @@ -12,8 +11,6 @@ import no.nav.emottak.ebms.configuration.toProperties import no.nav.emottak.ebms.log import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer -import kotlin.time.Duration -import kotlin.time.Duration.Companion.seconds class SignalReceiver( private val kafkaConfig: Kafka @@ -28,17 +25,14 @@ class SignalReceiver( properties = kafkaConfig.toProperties() ) - suspend fun schedule(interval: Duration = 30.seconds) = - Schedule - .spaced(interval) - .repeat(this::processMessages) - - private suspend fun processMessages() = + suspend fun processMessages() { + log.debug("Receiving signal messages from ${kafkaConfig.incomingSignalTopic}") KafkaReceiver(settings) .receive(kafkaConfig.incomingSignalTopic) .take(10) .map { it.key() to it.value() } .collect(::processSignal) + } private fun processSignal(signal: Pair) { log.info("Got signal with reference <${signal.first.value}> and content: ${String(signal.second.value)}")