Skip to content

Commit

Permalink
Merge pull request #11 from navikt/dedup_pg_melding
Browse files Browse the repository at this point in the history
Dedup meldinger
  • Loading branch information
jacob-meidell authored Nov 23, 2023
2 parents 9a554de + d739754 commit 971f435
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class PersongrunnlagKafkaListener(
deserialize<PersongrunnlagMeldingKafka>(consumerRecord.value()).also { persongrunnlagMelding ->
Mdc.scopedMdc(persongrunnlagMelding.correlationId) { _ ->
Mdc.scopedMdc(persongrunnlagMelding.innlesingId) { _ ->
persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(innhold = persongrunnlagMelding)
)
log.info("Melding prosessert")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import no.nav.pensjon.opptjening.omsorgsopptjening.felles.deserialize
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.deserializeList
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.serialize
import no.nav.pensjon.opptjening.omsorgsopptjening.felles.serializeList
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.jdbc.core.RowMapper
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
Expand All @@ -21,42 +23,41 @@ class PersongrunnlagRepo(
private val jdbcTemplate: NamedParameterJdbcTemplate,
private val clock: Clock = Clock.systemUTC()
) {
private val log: Logger = LoggerFactory.getLogger(this::class.java)

fun persist(melding: PersongrunnlagMelding.Lest): PersongrunnlagMelding.Mottatt {
/**
* @return Id til rad som ble lagret i databasen - null dersom raden eksisterte fra før (duplikat)
*/
fun lagre(melding: PersongrunnlagMelding.Lest): UUID? {
val keyHolder = GeneratedKeyHolder()
jdbcTemplate.update(
"""insert into melding (melding, correlation_id, innlesing_id, opprettet) values (to_jsonb(:melding::jsonb), :correlation_id, :innlesing_id, :opprettet::timestamptz)""",
//language=postgres-psql
"""
|with pg as (insert into melding (melding, correlation_id, innlesing_id, opprettet)
|values (to_jsonb(:melding::jsonb), :correlation_id, :innlesing_id, :opprettet::timestamptz)
|on conflict on constraint unique_correlation_innlesing do nothing
|returning id, to_jsonb(:status::jsonb) as status, to_jsonb(:statushistorikk::jsonb) as statushistorikk, :statusType as statusType, :karanteneTil::timestamptz as karanteneTil)
|insert into melding_status (id, status, statushistorikk, status_type, karantene_til)
|select id, status, statushistorikk, statusType, karanteneTil from pg where id is not null
|returning (select id from pg)
"""
.trimMargin(),
MapSqlParameterSource(
mapOf<String, Any>(
mapOf<String, Any?>(
"melding" to serialize(melding.innhold),
"correlation_id" to melding.correlationId.toString(),
"innlesing_id" to melding.innlesingId.toString(),
"opprettet" to melding.opprettet.toString(),
),
),
keyHolder
)
jdbcTemplate.update(
"""insert into melding_status (id, status, statushistorikk, status_type, karantene_til) values (:id, to_jsonb(:status::jsonb), to_jsonb(:statushistorikk::jsonb),:statusType, :karanteneTil)""",
MapSqlParameterSource(
mapOf<String, Any?>(
"id" to keyHolder.keys!!["id"] as UUID,
"status" to serialize(melding.status),
"statusType" to when(melding.status) {
is PersongrunnlagMelding.Status.Feilet -> "Feilet"
is PersongrunnlagMelding.Status.Ferdig -> "Ferdig"
is PersongrunnlagMelding.Status.Klar -> "Klar"
is PersongrunnlagMelding.Status.Retry -> "Retry"
},
"karanteneTil" to when (val m = melding.status) {
is PersongrunnlagMelding.Status.Retry -> m.karanteneTil
else -> null
},
"statusType" to "Klar",
"karanteneTil" to null,
"statushistorikk" to melding.statushistorikk.serializeList(),
),
),
keyHolder
)
return find(keyHolder.keys!!["id"] as UUID)
return keyHolder.getKeyAs(UUID::class.java)
.also { if (it == null) log.info("Ingen primærnøkkel returnert fra insert, meldingen med correlationId:${melding.correlationId}, innlesingId:${melding.innlesingId} er et duplikat") }
}

fun updateStatus(melding: PersongrunnlagMelding.Mottatt) {
Expand All @@ -66,7 +67,7 @@ class PersongrunnlagRepo(
mapOf<String, Any?>(
"id" to melding.id,
"status" to serialize(melding.status),
"statusType" to when(melding.status) {
"statusType" to when (melding.status) {
is PersongrunnlagMelding.Status.Feilet -> "Feilet"
is PersongrunnlagMelding.Status.Ferdig -> "Ferdig"
is PersongrunnlagMelding.Status.Klar -> "Klar"
Expand Down Expand Up @@ -188,8 +189,7 @@ class PersongrunnlagRepo(
}
}

internal class SimpleStringMapper : RowMapper<String>
{
internal class SimpleStringMapper : RowMapper<String> {
override fun mapRow(rs: ResultSet, rowNum: Int): String {
return rs.toString()
}
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/db/migration/V16__dedup_melding.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table melding add constraint unique_correlation_innlesing unique (correlation_id, innlesing_id);
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ProsesseringsParallellitetTest : SpringContextTest.NoKafka() {
inner class GodskrivOpptjening {
@Test
fun `finnNesteUprosesserte låser raden slik at den ikke plukkes opp av andre connections`() {
persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -121,7 +121,7 @@ class ProsesseringsParallellitetTest : SpringContextTest.NoKafka() {
inner class Omsorgsarbeid {
@Test
fun `finnNesteUprosesserte låser raden slik at den ikke plukkes opp av andre connections`() {
persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -174,7 +174,7 @@ class ProsesseringsParallellitetTest : SpringContextTest.NoKafka() {
inner class Oppgave {
@Test
fun `finnNesteUprosesserte låser raden slik at den ikke plukkes opp av andre connections`() {
persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class BrevProsesseringTest(
given(clock.instant()).willReturn(Instant.now().plus(10, ChronoUnit.DAYS))
willAnswer { true }.given(gyldigOpptjeningår).erGyldig(2020)

val (behandling, brev) = persongrunnlagRepo.persist(
val (behandling, brev) = persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -239,7 +239,7 @@ class BrevProsesseringTest(
given(clock.instant()).willReturn(Instant.now().plus(10, ChronoUnit.DAYS))
willAnswer { true }.given(gyldigOpptjeningår).erGyldig(2020)

val (behandling, brev) = persongrunnlagRepo.persist(
val (behandling, brev) = persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ internal class BrevopprettelseTest : SpringContextTest.NoKafka() {
)
)

persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -169,7 +169,7 @@ internal class BrevopprettelseTest : SpringContextTest.NoKafka() {
)
)

persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -261,7 +261,7 @@ internal class BrevopprettelseTest : SpringContextTest.NoKafka() {
)
)

persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -328,7 +328,7 @@ internal class BrevopprettelseTest : SpringContextTest.NoKafka() {
)
)

persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
*/
given(clock.instant()).willReturn(Instant.now().plus(10, ChronoUnit.DAYS))

val melding = repo.persist(
val innlesingId = InnlesingId.generate()
val correlationId = CorrelationId.generate()

repo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand All @@ -112,8 +115,8 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
),
),
rådata = Rådata(),
innlesingId = InnlesingId.generate(),
correlationId = CorrelationId.generate(),
innlesingId = innlesingId,
correlationId = correlationId,
)
),
)
Expand All @@ -122,8 +125,8 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
godskrivOpptjeningRepo.finnNesteUprosesserte()!!.also {
assertInstanceOf(GodskrivOpptjening.Status.Klar::class.java, it.status)
assertEquals(behandling.id, it.behandlingId)
assertEquals(melding.correlationId, it.correlationId)
assertEquals(melding.innlesingId, it.innlesingId)
assertEquals(correlationId, it.correlationId)
assertEquals(innlesingId, it.innlesingId)
assertEquals(behandling.omsorgsyter, it.omsorgsyter)
}

Expand Down Expand Up @@ -172,7 +175,7 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {

willAnswer { true }.given(gyldigOpptjeningår).erGyldig(2020)

val melding = repo.persist(
val melding = repo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -212,7 +215,7 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {

assertInstanceOf(
GodskrivOpptjening.Persistent::class.java,
godskrivOpptjeningRepo.findForMelding(melding.id).single()
godskrivOpptjeningRepo.findForMelding(melding!!).single()
).also {
assertInstanceOf(GodskrivOpptjening.Status.Ferdig::class.java, it.status)
}
Expand All @@ -235,7 +238,10 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
*/
given(clock.instant()).willReturn(Instant.now().plus(10, ChronoUnit.DAYS))

val melding = repo.persist(
val innlesingId = InnlesingId.generate()
val correlationId = CorrelationId.generate()

val melding = repo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand All @@ -257,8 +263,8 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
),
),
rådata = Rådata(),
innlesingId = InnlesingId.generate(),
correlationId = CorrelationId.generate(),
innlesingId = innlesingId,
correlationId = correlationId,
)
),
)
Expand All @@ -268,8 +274,8 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
godskrivOpptjeningRepo.finnNesteUprosesserte()!!.also {
assertInstanceOf(GodskrivOpptjening.Status.Klar::class.java, it.status)
assertEquals(behandling.id, it.behandlingId)
assertEquals(melding.correlationId, it.correlationId)
assertEquals(melding.innlesingId, it.innlesingId)
assertEquals(correlationId, it.correlationId)
assertEquals(innlesingId, it.innlesingId)
assertEquals(behandling.omsorgsyter, it.omsorgsyter)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ object StatusServiceTest {
}

private fun lagOgLagreMelding(opprettet: Instant = now()): PersongrunnlagMelding.Mottatt {
val melding = personGrunnlagMelding(opprettet)
return personGrunnlagRepo.persist(melding)
return lagreOgHent(personGrunnlagMelding(opprettet))
}

private fun endreStatusTilFeilet(mottatt: PersongrunnlagMelding.Mottatt): PersongrunnlagMelding.Mottatt {
Expand Down Expand Up @@ -207,7 +206,7 @@ object StatusServiceTest {
@Order(2)
fun testOK() {
val melding = personGrunnlagMelding(now())
personGrunnlagRepo.persist(melding)
personGrunnlagRepo.lagre(melding)
val status = statusService.checkStatus()
assertThat(status).isEqualTo(ApplicationStatus.OK)
}
Expand All @@ -216,7 +215,7 @@ object StatusServiceTest {
@Order(3)
fun testForGammeMelding() {
val melding = personGrunnlagMelding(700.daysAgo)
personGrunnlagRepo.persist(melding)
personGrunnlagRepo.lagre(melding)
val status = statusService.checkStatus()
assertThat(status).isEqualTo(ApplicationStatus.Feil("Siste melding er for gammel"))
}
Expand All @@ -243,7 +242,7 @@ object StatusServiceTest {
@Order(6)
fun testGammelOppgaveIkkeFerdig() {
val melding = personGrunnlagMelding(now())
val mottatt = personGrunnlagRepo.persist(melding)
val mottatt = lagreOgHent(melding)

val uuid = UUID.randomUUID()

Expand All @@ -265,7 +264,7 @@ object StatusServiceTest {
val uuid1 = UUID.randomUUID()

val melding = personGrunnlagMelding(now())
val mottatt = personGrunnlagRepo.persist(melding)
val mottatt = lagreOgHent(melding)

lagreDummyBehandling(uuid1, mottatt)
lagreDummyOppgave(uuid1, mottatt, now())
Expand Down Expand Up @@ -297,4 +296,8 @@ object StatusServiceTest {
jdbcTemplate.queryForList("select * from brev_status", emptyMap<String, Any>())
.forEach { println(" brev_status $it") }
}

private fun lagreOgHent(melding: PersongrunnlagMelding.Lest): PersongrunnlagMelding.Mottatt {
return personGrunnlagRepo.lagre(melding).let { personGrunnlagRepo.find(it!!) }
}
}
Loading

0 comments on commit 971f435

Please sign in to comment.