Skip to content

Commit

Permalink
Simple error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
thburnett committed Dec 18, 2024
1 parent 73eaeba commit 56b919b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 32 deletions.
19 changes: 18 additions & 1 deletion ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import arrow.core.raise.result
import arrow.fx.coroutines.resourceScope
import arrow.resilience.Schedule
import dev.reformator.stacktracedecoroutinator.runtime.DecoroutinatorRuntime
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.ktor.serialization.kotlinx.json.json
import io.ktor.server.application.Application
import io.ktor.server.application.call
Expand All @@ -28,12 +31,15 @@ import kotlinx.coroutines.awaitCancellation
import net.logstash.logback.marker.Markers
import no.nav.emottak.constants.SMTPHeaders
import no.nav.emottak.ebms.configuration.config
import no.nav.emottak.ebms.configuration.toProperties
import no.nav.emottak.ebms.consumer.SignalReceiver
import no.nav.emottak.ebms.processing.ProcessingService
import no.nav.emottak.ebms.sendin.SendInService
import no.nav.emottak.ebms.validation.DokumentValidator
import no.nav.emottak.util.getEnvVar
import no.nav.emottak.util.isProdEnv
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
Expand All @@ -59,7 +65,18 @@ fun main() = SuspendApp {

if (getEnvVar("ASYNC_RECEIVER", "true").toBoolean()) {
log.debug("Starting signal message receiver")
val signalReceiver = SignalReceiver(config.kafka)
val kafkaConfig = config.kafka
val receiverSettings: ReceiverSettings<String, ByteArray> =
ReceiverSettings(
bootstrapServers = kafkaConfig.bootstrapServers,
keyDeserializer = StringDeserializer(),
valueDeserializer = ByteArrayDeserializer(),
groupId = kafkaConfig.groupId,
autoOffsetReset = AutoOffsetReset.Earliest, // TODO set this to something else
properties = kafkaConfig.toProperties()
)
val kafkaReceiver = KafkaReceiver(receiverSettings)
val signalReceiver = SignalReceiver(kafkaReceiver, kafkaConfig.incomingSignalTopic)
scheduleSignalReceiver(signalReceiver)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,32 @@
package no.nav.emottak.ebms.consumer

import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.flow.count
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import no.nav.emottak.ebms.configuration.Kafka
import no.nav.emottak.ebms.configuration.toProperties
import no.nav.emottak.ebms.log
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer

class SignalReceiver(
private val kafkaConfig: Kafka
private val kafkaReceiver: KafkaReceiver<String, ByteArray>,
private val kafkaSignalTopic: String
) {
private val settings: ReceiverSettings<Reference, Content> =
ReceiverSettings(
bootstrapServers = kafkaConfig.bootstrapServers,
keyDeserializer = StringDeserializer().map(::Reference),
valueDeserializer = ByteArrayDeserializer().map(::Content),
groupId = kafkaConfig.groupId,
autoOffsetReset = AutoOffsetReset.Earliest, // TODO set this to something else
properties = kafkaConfig.toProperties()
)

suspend fun processMessages() {
log.debug("Receiving signal messages from ${kafkaConfig.incomingSignalTopic}")
KafkaReceiver(settings)
.receive(kafkaConfig.incomingSignalTopic)
.take(10)
.map { it.key() to it.value() }
.collect(::processSignal)
log.debug("Receiving signal messages from $kafkaSignalTopic")
runCatching {
kafkaReceiver
.receive(kafkaSignalTopic).also {
log.info("Received signal messages (${it.count()})")
}
.take(10)
.map { it.key() to it.value() }
.collect(this::processSignal)
}.onFailure {
log.error("Error receiving signal messages", it)
}
}

private fun processSignal(signal: Pair<Reference, Content>) {
log.info("Got signal with reference <${signal.first.value}> and content: ${String(signal.second.value)}")
private fun processSignal(signal: Pair<String, ByteArray>) {
log.info("Got signal with reference <${signal.first}> and content: ${String(signal.second)}")
}
}

@JvmInline
value class Reference(val value: String)

@JvmInline
value class Content(val value: ByteArray)

0 comments on commit 56b919b

Please sign in to comment.