Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bruk ObjectRiver i FeilLytter #816

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fun RapidsConnection.createFeilLytter(database: Database): RapidsConnection =

fun RapidsConnection.createFeilLytter(repository: PostgresBakgrunnsjobbRepository): RapidsConnection =
also {
FeilLytter(it, repository)
FeilLytter(repository).connect(it)
val bgService = BakgrunnsjobbService(repository)
bgService.registrer(FeilProsessor(it))
bgService.startAsync(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,30 @@ package no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.prosessor
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import no.nav.hag.utils.bakgrunnsjobb.Bakgrunnsjobb
import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbProsesserer
import no.nav.helsearbeidsgiver.utils.json.parseJson
import no.nav.helsearbeidsgiver.utils.json.toPretty
import no.nav.helsearbeidsgiver.utils.log.logger
import no.nav.helsearbeidsgiver.utils.log.sikkerLogger

class FeilProsessor(
private val rapid: RapidsConnection,
) : BakgrunnsjobbProsesserer {
private val logger = logger()
private val sikkerLogger = sikkerLogger()

override val type: String
get() = JOB_TYPE

companion object {
const val JOB_TYPE = "kafka-retry-message"
}

private val sikkerLogger = sikkerLogger()

override fun prosesser(jobb: Bakgrunnsjobb) {
sikkerLogger.info("Prosesserer jobb - rekjører melding med id ${jobb.uuid}")
sikkerLogger.debug("Sender melding: ${jobb.data}")
"Prosesserer jobb - rekjører melding med ID '${jobb.uuid}'.".also {
logger.info(it)
sikkerLogger.info(it)
}
sikkerLogger.debug("Sender melding.\n${jobb.data.parseJson().toPretty()}")
rapid.publish(jobb.data)
}
}
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
package no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.river

import com.github.navikt.tbd_libs.rapids_and_rivers.JsonMessage
import com.github.navikt.tbd_libs.rapids_and_rivers.River
import com.github.navikt.tbd_libs.rapids_and_rivers_api.MessageContext
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import kotlinx.serialization.json.JsonElement
import no.nav.hag.utils.bakgrunnsjobb.Bakgrunnsjobb
import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbRepository
import no.nav.hag.utils.bakgrunnsjobb.BakgrunnsjobbStatus
import no.nav.helsearbeidsgiver.felles.BehovType
import no.nav.helsearbeidsgiver.felles.EventName
import no.nav.helsearbeidsgiver.felles.Key
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.lesOrNull
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.json.toPretty
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
import no.nav.helsearbeidsgiver.inntektsmelding.feilbehandler.prosessor.FeilProsessor
import no.nav.helsearbeidsgiver.utils.json.fromJson
import no.nav.helsearbeidsgiver.utils.json.parseJson
import no.nav.helsearbeidsgiver.utils.json.serializer.UuidSerializer
import no.nav.helsearbeidsgiver.utils.json.toJson
import no.nav.helsearbeidsgiver.utils.json.toPretty
import no.nav.helsearbeidsgiver.utils.log.logger
import no.nav.helsearbeidsgiver.utils.log.sikkerLogger
import java.sql.SQLException
import java.util.UUID

data class Melding(
val fail: Fail,
)

class FeilLytter(
rapidsConnection: RapidsConnection,
private val repository: BakgrunnsjobbRepository,
) : River.PacketListener {
private val jobbType = FeilProsessor.JOB_TYPE

) : ObjectRiver<Melding>() {
private val logger = logger()
private val sikkerLogger = sikkerLogger()

private val jobbType = FeilProsessor.JOB_TYPE
private val eventerSomHaandteres =
listOf(
setOf(
EventName.FORESPOERSEL_MOTTATT,
EventName.FORESPOERSEL_BESVART,
EventName.FORESPOERSEL_FORKASTET,
Expand All @@ -44,45 +49,28 @@ class FeilLytter(
EventName.SELVBESTEMT_IM_LAGRET,
)

init {
sikkerLogger.info("Starter applikasjon - lytter på innkommende feil!")
River(rapidsConnection)
.apply {
validate { msg ->
msg.demandKey(Key.FAIL.toString())
}
}.register(this)
}
override fun les(json: Map<Key, JsonElement>): Melding =
Melding(
fail = Key.FAIL.les(Fail.serializer(), json),
)

override fun onPacket(
packet: JsonMessage,
context: MessageContext,
) {
sikkerLogger.info("Mottok feil: ${packet.toJson().parseJson().toPretty()}")
val fail =
packet
.toJson()
.parseJson()
.toMap()[Key.FAIL]
?.runCatching {
fromJson(Fail.serializer())
}?.getOrNull()

if (fail == null) {
sikkerLogger.warn("Kunne ikke parse feil-objekt, ignorerer...")
return
}
override fun Melding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement>? {
logger.info("Mottok feil.")
sikkerLogger.info("Mottok feil.\n${json.toPretty()}")

if (eventSkalHaandteres(fail.utloesendeMelding)) {
// slå opp transaksjonID. Hvis den finnes, kan det være en annen feilende melding i samme transaksjon: Lagre i så fall
// med egen id. Denne id vil så sendes med som ny transaksjonID ved rekjøring.
val jobbId = fail.kontekstId
val eksisterendeJobb = repository.getById(jobbId)
val eksisterendeJobb = repository.getById(fail.kontekstId)

when {
// Første gang denne flyten feiler
eksisterendeJobb == null -> {
sikkerLogger.info("Lagrer mottatt pakke!")
"Lagrer mottatt pakke.".also {
logger.info(it)
sikkerLogger.info(it)
}

lagre(
Bakgrunnsjobb(
uuid = fail.kontekstId,
Expand All @@ -100,54 +88,99 @@ class FeilLytter(

// Feil i flyt som tidligere har opplevd annen type feil
else -> {
val nyTransaksjonId = UUID.randomUUID()
val utloesendeMeldingMedNyTransaksjonId = fail.utloesendeMelding.plus(Key.KONTEKST_ID to nyTransaksjonId.toJson())
val nyKontekstId = UUID.randomUUID()
val utloesendeMeldingMedNyKontekstId = fail.utloesendeMelding.plus(Key.KONTEKST_ID to nyKontekstId.toJson())

sikkerLogger.info("ID $jobbId finnes fra før med annen utløsende melding. Lagrer en ny jobb på ID '$nyTransaksjonId'.")
"ID '${eksisterendeJobb.uuid}' finnes fra før med annen utløsende melding. Lagrer en ny jobb på ID '$nyKontekstId'.".also {
logger.info(it)
sikkerLogger.info(it)
}

lagre(
Bakgrunnsjobb(
uuid = nyTransaksjonId,
uuid = nyKontekstId,
type = jobbType,
data = utloesendeMeldingMedNyTransaksjonId.toJson().toString(),
data = utloesendeMeldingMedNyKontekstId.toJson().toString(),
maksAntallForsoek = 10,
),
)
}
}
}

return null
}

private fun oppdater(jobb: Bakgrunnsjobb) {
// Dette må gjøres her fordi jobbene er asynkrone og bakgrunnsjobbService ikke får vite at jobben feiler i disse tilfellene
// BakgrunnsjobbService finnVentende() tar heller ikke hensyn til forsøk, kun status på jobben!
if (jobb.forsoek > jobb.maksAntallForsoek) {
jobb.status = BakgrunnsjobbStatus.STOPPET
sikkerLogger.error("Maks forsøk nådd, stopper jobb med id ${jobb.uuid} permanent!")
} else {
jobb.status = BakgrunnsjobbStatus.FEILET
}
try {
repository.update(jobb)
sikkerLogger.info("Oppdaterte eksisterende jobb med id ${jobb.uuid}")
} catch (ex: SQLException) {
sikkerLogger.error("Oppdatering av jobb med id ${jobb.uuid} feilet!", ex)
override fun Melding.haandterFeil(
json: Map<Key, JsonElement>,
error: Throwable,
): Map<Key, JsonElement>? {
"Klarte ikke håndtere fail.".also {
logger.error(it)
sikkerLogger.error(it, error)
}
return null
}

private fun lagre(jobb: Bakgrunnsjobb) {
try {
repository.save(jobb)
sikkerLogger.info("Lagret ny jobb med id ${jobb.uuid}")
} catch (ex: SQLException) {
sikkerLogger.error("Lagring av jobb med id ${jobb.uuid} feilet!", ex)
}
override fun Melding.loggfelt(): Map<String, String> {
val eventName = Key.EVENT_NAME.lesOrNull(EventName.serializer(), fail.utloesendeMelding)
val kontekstId = Key.KONTEKST_ID.lesOrNull(UuidSerializer, fail.utloesendeMelding)
val behovType = Key.BEHOV.lesOrNull(BehovType.serializer(), fail.utloesendeMelding)

val data = fail.utloesendeMelding[Key.DATA]?.toMap().orEmpty()
val forespoerselId = Key.FORESPOERSEL_ID.lesOrNull(UuidSerializer, data)
val selvbestemtId = Key.SELVBESTEMT_ID.lesOrNull(UuidSerializer, data)

return listOf(
Log.klasse(this@FeilLytter),
eventName?.let(Log::event),
kontekstId?.let(Log::transaksjonId),
behovType?.let(Log::behov),
forespoerselId?.let(Log::forespoerselId),
selvbestemtId?.let(Log::selvbestemtId),
).mapNotNull { it }
.toMap()
}

private fun eventSkalHaandteres(utloesendeMelding: Map<Key, JsonElement>): Boolean {
val eventFraMelding = utloesendeMelding[Key.EVENT_NAME]?.fromJson(EventName.serializer())
val skalHaandteres = eventerSomHaandteres.contains(eventFraMelding)
sikkerLogger.info("Event: $eventFraMelding skal håndteres: $skalHaandteres")

"Event '$eventFraMelding' skal håndteres: '$skalHaandteres'.".also {
logger.info(it)
sikkerLogger.info(it)
}

return skalHaandteres
}

private fun lagre(jobb: Bakgrunnsjobb) {
"Lagrer ny jobb med ID '${jobb.uuid}'.".also {
logger.info(it)
sikkerLogger.info(it)
}
repository.save(jobb)
}

private fun oppdater(jobb: Bakgrunnsjobb) {
// Dette må gjøres her fordi jobbene er asynkrone og bakgrunnsjobbService ikke får vite at jobben feiler i disse tilfellene
// BakgrunnsjobbService finnVentende() tar heller ikke hensyn til forsøk, kun status på jobben!
jobb.status =
if (jobb.forsoek > jobb.maksAntallForsoek) {
"Maks forsøk nådd, stopper jobb med ID '${jobb.uuid}' permanent!".also {
logger.error(it)
sikkerLogger.error(it)
}
BakgrunnsjobbStatus.STOPPET
} else {
BakgrunnsjobbStatus.FEILET
}

"Oppdaterer eksisterende jobb med ID '${jobb.uuid}'.".also {
logger.info(it)
sikkerLogger.info(it)
}

repository.update(jobb)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FeilLytterTest :
val rapid = TestRapid()
val repository = MockBakgrunnsjobbRepository()

FeilLytter(rapid, repository)
FeilLytter(repository).connect(rapid)

afterTest {
repository.deleteAll()
Expand Down
Loading