Skip to content

Commit

Permalink
Moving schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
thburnett committed Dec 18, 2024
1 parent e8548c9 commit 73eaeba
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
10 changes: 9 additions & 1 deletion ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import arrow.continuations.SuspendApp
import arrow.continuations.ktor.server
import arrow.core.raise.result
import arrow.fx.coroutines.resourceScope
import arrow.resilience.Schedule
import dev.reformator.stacktracedecoroutinator.runtime.DecoroutinatorRuntime
import io.ktor.serialization.kotlinx.json.json
import io.ktor.server.application.Application
Expand Down Expand Up @@ -36,6 +37,7 @@ import no.nav.emottak.util.isProdEnv
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toKotlinDuration

val log = LoggerFactory.getLogger("no.nav.emottak.ebms.App")
Expand All @@ -57,14 +59,20 @@ fun main() = SuspendApp {

if (getEnvVar("ASYNC_RECEIVER", "true").toBoolean()) {
log.debug("Starting signal message receiver")
SignalReceiver(config.kafka).schedule()
val signalReceiver = SignalReceiver(config.kafka)
scheduleSignalReceiver(signalReceiver)
}

awaitCancellation()
}
}
}

suspend fun scheduleSignalReceiver(signalReceiver: SignalReceiver, interval: kotlin.time.Duration = 30.seconds) =
Schedule
.spaced<Unit>(interval)
.repeat(signalReceiver::processMessages)

fun Application.ebmsProviderModule() {
val cpaClient = CpaRepoClient(defaultHttpClient())
val validator = DokumentValidator(cpaClient)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package no.nav.emottak.ebms.consumer

import arrow.resilience.Schedule
import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
Expand All @@ -12,8 +11,6 @@ import no.nav.emottak.ebms.configuration.toProperties
import no.nav.emottak.ebms.log
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

class SignalReceiver(
private val kafkaConfig: Kafka
Expand All @@ -28,17 +25,14 @@ class SignalReceiver(
properties = kafkaConfig.toProperties()
)

suspend fun schedule(interval: Duration = 30.seconds) =
Schedule
.spaced<Unit>(interval)
.repeat(this::processMessages)

private suspend fun processMessages() =
suspend fun processMessages() {
log.debug("Receiving signal messages from ${kafkaConfig.incomingSignalTopic}")
KafkaReceiver(settings)
.receive(kafkaConfig.incomingSignalTopic)
.take(10)
.map { it.key() to it.value() }
.collect(::processSignal)
}

private fun processSignal(signal: Pair<Reference, Content>) {
log.info("Got signal with reference <${signal.first.value}> and content: ${String(signal.second.value)}")
Expand Down

0 comments on commit 73eaeba

Please sign in to comment.