From e12daacea0adce9958a2ef3269b64d8e1311183d Mon Sep 17 00:00:00 2001 From: Mikael Bjerga Date: Mon, 23 Dec 2024 12:45:58 +0100 Subject: [PATCH 1/2] Bruk ObjectRiver i FeilLytter --- .../inntektsmelding/feilbehandler/App.kt | 2 +- .../feilbehandler/river/FeilLytter.kt | 169 +++++++++++------- .../feilbehandler/river/FeilLytterTest.kt | 2 +- 3 files changed, 103 insertions(+), 70 deletions(-) diff --git a/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/App.kt b/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/App.kt index 61fd2e63e..c4a2eea53 100644 --- a/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/App.kt +++ b/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/App.kt @@ -39,7 +39,7 @@ fun RapidsConnection.createFeilLytter(database: Database): RapidsConnection = fun RapidsConnection.createFeilLytter(repository: PostgresBakgrunnsjobbRepository): RapidsConnection = also { - FeilLytter(it, repository) + FeilLytter(repository).connect(it) val bgService = BakgrunnsjobbService(repository) bgService.registrer(FeilProsessor(it)) bgService.startAsync(true) diff --git a/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/river/FeilLytter.kt b/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/river/FeilLytter.kt index 073105a3a..1b9a514cf 100644 --- a/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/river/FeilLytter.kt +++ b/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/river/FeilLytter.kt @@ -1,37 +1,42 @@ package no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.river -import com.github.navikt.tbd_libs.rapids_and_rivers.JsonMessage -import com.github.navikt.tbd_libs.rapids_and_rivers.River -import com.github.navikt.tbd_libs.rapids_and_rivers_api.MessageContext -import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection import kotlinx.serialization.json.JsonElement import no.nav.hag.utils.bakgrunnsjobb.Bakgrunnsjobb import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbRepository import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbStatus +import no.nav.helsearbeidsgiver.felles.BehovType import no.nav.helsearbeidsgiver.felles.EventName import no.nav.helsearbeidsgiver.felles.Key +import no.nav.helsearbeidsgiver.felles.json.les +import no.nav.helsearbeidsgiver.felles.json.lesOrNull import no.nav.helsearbeidsgiver.felles.json.toJson import no.nav.helsearbeidsgiver.felles.json.toMap +import no.nav.helsearbeidsgiver.felles.json.toPretty import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail +import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver +import no.nav.helsearbeidsgiver.felles.utils.Log import no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.prosessor.FeilProsessor import no.nav.helsearbeidsgiver.utils.json.fromJson import no.nav.helsearbeidsgiver.utils.json.parseJson +import no.nav.helsearbeidsgiver.utils.json.serializer.UuidSerializer import no.nav.helsearbeidsgiver.utils.json.toJson -import no.nav.helsearbeidsgiver.utils.json.toPretty +import no.nav.helsearbeidsgiver.utils.log.logger import no.nav.helsearbeidsgiver.utils.log.sikkerLogger -import java.sql.SQLException import java.util.UUID +data class Melding( + val fail: Fail, +) + class FeilLytter( - rapidsConnection: RapidsConnection, private val repository: BakgrunnsjobbRepository, -) : River.PacketListener { - private val jobbType = FeilProsessor.JOB_TYPE - +) : ObjectRiver() { + private val logger = logger() private val sikkerLogger = sikkerLogger() + private val jobbType = FeilProsessor.JOB_TYPE private val eventerSomHaandteres = - listOf( + setOf( EventName.FORESPOERSEL_MOTTATT, EventName.FORESPOERSEL_BESVART, EventName.FORESPOERSEL_FORKASTET, @@ -44,45 +49,28 @@ class FeilLytter( EventName.SELVBESTEMT_IM_LAGRET, ) - init { - sikkerLogger.info("Starter applikasjon - lytter på innkommende feil!") - River(rapidsConnection) - .apply { - validate { msg -> - msg.demandKey(Key.FAIL.toString()) - } - }.register(this) - } + override fun les(json: Map): Melding = + Melding( + fail = Key.FAIL.les(Fail.serializer(), json), + ) - override fun onPacket( - packet: JsonMessage, - context: MessageContext, - ) { - sikkerLogger.info("Mottok feil: ${packet.toJson().parseJson().toPretty()}") - val fail = - packet - .toJson() - .parseJson() - .toMap()[Key.FAIL] - ?.runCatching { - fromJson(Fail.serializer()) - }?.getOrNull() - - if (fail == null) { - sikkerLogger.warn("Kunne ikke parse feil-objekt, ignorerer...") - return - } + override fun Melding.haandter(json: Map): Map? { + logger.info("Mottok feil.") + sikkerLogger.info("Mottok feil.\n${json.toPretty()}") if (eventSkalHaandteres(fail.utloesendeMelding)) { // slå opp transaksjonID. Hvis den finnes, kan det være en annen feilende melding i samme transaksjon: Lagre i så fall // med egen id. Denne id vil så sendes med som ny transaksjonID ved rekjøring. - val jobbId = fail.kontekstId - val eksisterendeJobb = repository.getById(jobbId) + val eksisterendeJobb = repository.getById(fail.kontekstId) when { // Første gang denne flyten feiler eksisterendeJobb == null -> { - sikkerLogger.info("Lagrer mottatt pakke!") + "Lagrer mottatt pakke.".also { + logger.info(it) + sikkerLogger.info(it) + } + lagre( Bakgrunnsjobb( uuid = fail.kontekstId, @@ -100,54 +88,99 @@ class FeilLytter( // Feil i flyt som tidligere har opplevd annen type feil else -> { - val nyTransaksjonId = UUID.randomUUID() - val utloesendeMeldingMedNyTransaksjonId = fail.utloesendeMelding.plus(Key.KONTEKST_ID to nyTransaksjonId.toJson()) + val nyKontekstId = UUID.randomUUID() + val utloesendeMeldingMedNyKontekstId = fail.utloesendeMelding.plus(Key.KONTEKST_ID to nyKontekstId.toJson()) - sikkerLogger.info("ID $jobbId finnes fra før med annen utløsende melding. Lagrer en ny jobb på ID '$nyTransaksjonId'.") + "ID '${eksisterendeJobb.uuid}' finnes fra før med annen utløsende melding. Lagrer en ny jobb på ID '$nyKontekstId'.".also { + logger.info(it) + sikkerLogger.info(it) + } lagre( Bakgrunnsjobb( - uuid = nyTransaksjonId, + uuid = nyKontekstId, type = jobbType, - data = utloesendeMeldingMedNyTransaksjonId.toJson().toString(), + data = utloesendeMeldingMedNyKontekstId.toJson().toString(), maksAntallForsoek = 10, ), ) } } } + + return null } - private fun oppdater(jobb: Bakgrunnsjobb) { - // Dette må gjøres her fordi jobbene er asynkrone og bakgrunnsjobbService ikke får vite at jobben feiler i disse tilfellene - // BakgrunnsjobbService finnVentende() tar heller ikke hensyn til forsøk, kun status på jobben! - if (jobb.forsoek > jobb.maksAntallForsoek) { - jobb.status = BakgrunnsjobbStatus.STOPPET - sikkerLogger.error("Maks forsøk nådd, stopper jobb med id ${jobb.uuid} permanent!") - } else { - jobb.status = BakgrunnsjobbStatus.FEILET - } - try { - repository.update(jobb) - sikkerLogger.info("Oppdaterte eksisterende jobb med id ${jobb.uuid}") - } catch (ex: SQLException) { - sikkerLogger.error("Oppdatering av jobb med id ${jobb.uuid} feilet!", ex) + override fun Melding.haandterFeil( + json: Map, + error: Throwable, + ): Map? { + "Klarte ikke håndtere fail.".also { + logger.error(it) + sikkerLogger.error(it, error) } + return null } - private fun lagre(jobb: Bakgrunnsjobb) { - try { - repository.save(jobb) - sikkerLogger.info("Lagret ny jobb med id ${jobb.uuid}") - } catch (ex: SQLException) { - sikkerLogger.error("Lagring av jobb med id ${jobb.uuid} feilet!", ex) - } + override fun Melding.loggfelt(): Map { + val eventName = Key.EVENT_NAME.lesOrNull(EventName.serializer(), fail.utloesendeMelding) + val kontekstId = Key.KONTEKST_ID.lesOrNull(UuidSerializer, fail.utloesendeMelding) + val behovType = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding) + + val data = fail.utloesendeMelding[Key.DATA]?.toMap().orEmpty() + val forespoerselId = Key.FORESPOERSEL_ID.lesOrNull(UuidSerializer, data) + val selvbestemtId = Key.SELVBESTEMT_ID.lesOrNull(UuidSerializer, data) + + return listOf( + Log.klasse(this@FeilLytter), + eventName?.let(Log::event), + kontekstId?.let(Log::transaksjonId), + behovType?.let(Log::behov), + forespoerselId?.let(Log::forespoerselId), + selvbestemtId?.let(Log::selvbestemtId), + ).mapNotNull { it } + .toMap() } private fun eventSkalHaandteres(utloesendeMelding: Map): Boolean { val eventFraMelding = utloesendeMelding[Key.EVENT_NAME]?.fromJson(EventName.serializer()) val skalHaandteres = eventerSomHaandteres.contains(eventFraMelding) - sikkerLogger.info("Event: $eventFraMelding skal håndteres: $skalHaandteres") + + "Event '$eventFraMelding' skal håndteres: '$skalHaandteres'.".also { + logger.info(it) + sikkerLogger.info(it) + } + return skalHaandteres } + + private fun lagre(jobb: Bakgrunnsjobb) { + "Lagrer ny jobb med ID '${jobb.uuid}'.".also { + logger.info(it) + sikkerLogger.info(it) + } + repository.save(jobb) + } + + private fun oppdater(jobb: Bakgrunnsjobb) { + // Dette må gjøres her fordi jobbene er asynkrone og bakgrunnsjobbService ikke får vite at jobben feiler i disse tilfellene + // BakgrunnsjobbService finnVentende() tar heller ikke hensyn til forsøk, kun status på jobben! + jobb.status = + if (jobb.forsoek > jobb.maksAntallForsoek) { + "Maks forsøk nådd, stopper jobb med ID '${jobb.uuid}' permanent!".also { + logger.error(it) + sikkerLogger.error(it) + } + BakgrunnsjobbStatus.STOPPET + } else { + BakgrunnsjobbStatus.FEILET + } + + "Oppdaterer eksisterende jobb med ID '${jobb.uuid}'.".also { + logger.info(it) + sikkerLogger.info(it) + } + + repository.update(jobb) + } } diff --git a/apps/feil-behandler/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/river/FeilLytterTest.kt b/apps/feil-behandler/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/river/FeilLytterTest.kt index 63c356891..065ac32cb 100644 --- a/apps/feil-behandler/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/river/FeilLytterTest.kt +++ b/apps/feil-behandler/src/test/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/river/FeilLytterTest.kt @@ -30,7 +30,7 @@ class FeilLytterTest : val rapid = TestRapid() val repository = MockBakgrunnsjobbRepository() - FeilLytter(rapid, repository) + FeilLytter(repository).connect(rapid) afterTest { repository.deleteAll() From 54d02c72ba08b467596308f734b2393e23e5d81c Mon Sep 17 00:00:00 2001 From: Mikael Bjerga Date: Mon, 23 Dec 2024 16:42:42 +0100 Subject: [PATCH 2/2] Logg penere json --- .../feilbehandler/prosessor/FeilProsessor.kt | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/prosessor/FeilProsessor.kt b/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/prosessor/FeilProsessor.kt index dd31b5f10..81644c18e 100644 --- a/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/prosessor/FeilProsessor.kt +++ b/apps/feil-behandler/src/main/kotlin/no/nav/helsearbeidsgiver/inntektsmelding/feilbehandler/prosessor/FeilProsessor.kt @@ -3,11 +3,17 @@ package no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.prosessor import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection import no.nav.hag.utils.bakgrunnsjobb.Bakgrunnsjobb import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbProsesserer +import no.nav.helsearbeidsgiver.utils.json.parseJson +import no.nav.helsearbeidsgiver.utils.json.toPretty +import no.nav.helsearbeidsgiver.utils.log.logger import no.nav.helsearbeidsgiver.utils.log.sikkerLogger class FeilProsessor( private val rapid: RapidsConnection, ) : BakgrunnsjobbProsesserer { + private val logger = logger() + private val sikkerLogger = sikkerLogger() + override val type: String get() = JOB_TYPE @@ -15,11 +21,12 @@ class FeilProsessor( const val JOB_TYPE = "kafka-retry-message" } - private val sikkerLogger = sikkerLogger() - override fun prosesser(jobb: Bakgrunnsjobb) { - sikkerLogger.info("Prosesserer jobb - rekjører melding med id ${jobb.uuid}") - sikkerLogger.debug("Sender melding: ${jobb.data}") + "Prosesserer jobb - rekjører melding med ID '${jobb.uuid}'.".also { + logger.info(it) + sikkerLogger.info(it) + } + sikkerLogger.debug("Sender melding.\n${jobb.data.parseJson().toPretty()}") rapid.publish(jobb.data) } }