Skip to content

Commit

Permalink
Rewrite to cte, return id on save, null on duplicate
Browse files Browse the repository at this point in the history
  • Loading branch information
jacob-meidell committed Nov 21, 2023
1 parent a96914a commit f5fcb8a
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class PersongrunnlagKafkaListener(
deserialize<PersongrunnlagMeldingKafka>(consumerRecord.value()).also { persongrunnlagMelding ->
Mdc.scopedMdc(persongrunnlagMelding.correlationId) { _ ->
Mdc.scopedMdc(persongrunnlagMelding.innlesingId) { _ ->
persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(innhold = persongrunnlagMelding)
)
log.info("Melding prosessert")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,45 +28,35 @@ class PersongrunnlagRepo(
) {
private val log: Logger = LoggerFactory.getLogger(this::class.java)

fun persist(melding: PersongrunnlagMelding.Lest): PersongrunnlagMelding.Mottatt {
/**
* @return Id til rad som ble lagret i databasen - null dersom raden eksisterte fra før (duplikat)
*/
fun lagre(melding: PersongrunnlagMelding.Lest): UUID? {
val keyHolder = GeneratedKeyHolder()
val res = jdbcTemplate.update(
"""insert into melding (melding, correlation_id, innlesing_id, opprettet) values (to_jsonb(:melding::jsonb), :correlation_id, :innlesing_id, :opprettet::timestamptz) on conflict on constraint unique_correlation_innlesing do nothing""",
jdbcTemplate.update(
//language=postgres-psql
"""
|with pg as (insert into melding (melding, correlation_id, innlesing_id, opprettet)
|values (to_jsonb(:melding::jsonb), :correlation_id, :innlesing_id, :opprettet::timestamptz)
|on conflict on constraint unique_correlation_innlesing do nothing
|returning id, to_jsonb(:status::jsonb) as status, to_jsonb(:statushistorikk::jsonb) as statushistorikk)
|insert into melding_status (id, status, statushistorikk)
|select id, status, statushistorikk from pg where id is not null
"""
.trimMargin(),
MapSqlParameterSource(
mapOf<String, Any>(
"melding" to serialize(melding.innhold),
"correlation_id" to melding.correlationId.toString(),
"innlesing_id" to melding.innlesingId.toString(),
"opprettet" to melding.opprettet.toString(),
"status" to serialize(melding.status),
"statushistorikk" to melding.statushistorikk.serializeList(),
),
),
keyHolder
)
return when (res) {
1 -> {
jdbcTemplate.update(
"""insert into melding_status (id, status, statushistorikk) values (:id, to_jsonb(:status::jsonb), to_jsonb(:statushistorikk::jsonb))""",
MapSqlParameterSource(
mapOf<String, Any>(
"id" to keyHolder.keys!!["id"] as UUID,
"status" to serialize(melding.status),
"statushistorikk" to melding.statushistorikk.serializeList(),
),
),
)
find(keyHolder.keys!!["id"] as UUID)
}

0 -> {
log.info("Rad med correlation_id:${melding.correlationId} for innlesing_id:${melding.innlesingId} eksisterer i databasen - hopper over duplikat")
find(melding.correlationId, melding.innlesingId)
}

else -> {
throw RuntimeException("Uventet verdi: $res fra insert-statement")
}
}

return keyHolder.keys?.get("id")?.let { it as UUID }
}

fun updateStatus(melding: PersongrunnlagMelding.Mottatt) {
Expand All @@ -92,21 +82,6 @@ class PersongrunnlagRepo(
).single()
}

private fun find(correlationId: CorrelationId, innlesingId: InnlesingId): PersongrunnlagMelding.Mottatt {
return jdbcTemplate.query(
"""select id from melding where correlation_id = :cid and innlesing_id = :iid""",
mapOf<String, Any>(
"cid" to correlationId.toString(),
"iid" to innlesingId.toString(),
),
ResultSetExtractor { rs ->
if (rs.next()) find(UUID.fromString(rs.getString("id"))) else throw RuntimeException(
"Could not extract resultset"
)
}
)!!
}

/**
* Utformet for å være mekanismen som tilrettelegger for at flere podder kan prosessere data i paralell.
* "select for update skip locked" sørger for at raden som leses av en connection (pod) ikke vil plukkes opp av en
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ProsesseringsParallellitetTest : SpringContextTest.NoKafka() {
inner class GodskrivOpptjening {
@Test
fun `finnNesteUprosesserte låser raden slik at den ikke plukkes opp av andre connections`() {
persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -121,7 +121,7 @@ class ProsesseringsParallellitetTest : SpringContextTest.NoKafka() {
inner class Omsorgsarbeid {
@Test
fun `finnNesteUprosesserte låser raden slik at den ikke plukkes opp av andre connections`() {
persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -174,7 +174,7 @@ class ProsesseringsParallellitetTest : SpringContextTest.NoKafka() {
inner class Oppgave {
@Test
fun `finnNesteUprosesserte låser raden slik at den ikke plukkes opp av andre connections`() {
persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class BrevProsesseringTest(
given(clock.instant()).willReturn(Instant.now().plus(10, ChronoUnit.DAYS))
willAnswer { true }.given(gyldigOpptjeningår).erGyldig(2020)

val (behandling, brev) = persongrunnlagRepo.persist(
val (behandling, brev) = persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -239,7 +239,7 @@ class BrevProsesseringTest(
given(clock.instant()).willReturn(Instant.now().plus(10, ChronoUnit.DAYS))
willAnswer { true }.given(gyldigOpptjeningår).erGyldig(2020)

val (behandling, brev) = persongrunnlagRepo.persist(
val (behandling, brev) = persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ internal class BrevopprettelseTest : SpringContextTest.NoKafka() {
)
)

persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -169,7 +169,7 @@ internal class BrevopprettelseTest : SpringContextTest.NoKafka() {
)
)

persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -261,7 +261,7 @@ internal class BrevopprettelseTest : SpringContextTest.NoKafka() {
)
)

persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -328,7 +328,7 @@ internal class BrevopprettelseTest : SpringContextTest.NoKafka() {
)
)

persongrunnlagRepo.persist(
persongrunnlagRepo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
*/
given(clock.instant()).willReturn(Instant.now().plus(10, ChronoUnit.DAYS))

val melding = repo.persist(
val innlesingId = InnlesingId.generate()
val correlationId = CorrelationId.generate()

repo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand All @@ -112,8 +115,8 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
),
),
rådata = Rådata(),
innlesingId = InnlesingId.generate(),
correlationId = CorrelationId.generate(),
innlesingId = innlesingId,
correlationId = correlationId,
)
),
)
Expand All @@ -122,8 +125,8 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
godskrivOpptjeningRepo.finnNesteUprosesserte()!!.also {
assertInstanceOf(GodskrivOpptjening.Status.Klar::class.java, it.status)
assertEquals(behandling.id, it.behandlingId)
assertEquals(melding.correlationId, it.correlationId)
assertEquals(melding.innlesingId, it.innlesingId)
assertEquals(correlationId, it.correlationId)
assertEquals(innlesingId, it.innlesingId)
assertEquals(behandling.omsorgsyter, it.omsorgsyter)
}

Expand Down Expand Up @@ -172,7 +175,7 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {

willAnswer { true }.given(gyldigOpptjeningår).erGyldig(2020)

val melding = repo.persist(
val melding = repo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand Down Expand Up @@ -212,7 +215,7 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {

assertInstanceOf(
GodskrivOpptjening.Persistent::class.java,
godskrivOpptjeningRepo.findForMelding(melding.id).single()
godskrivOpptjeningRepo.findForMelding(melding!!).single()
).also {
assertInstanceOf(GodskrivOpptjening.Status.Ferdig::class.java, it.status)
}
Expand All @@ -235,7 +238,10 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
*/
given(clock.instant()).willReturn(Instant.now().plus(10, ChronoUnit.DAYS))

val melding = repo.persist(
val innlesingId = InnlesingId.generate()
val correlationId = CorrelationId.generate()

val melding = repo.lagre(
PersongrunnlagMelding.Lest(
innhold = PersongrunnlagMeldingKafka(
omsorgsyter = "12345678910",
Expand All @@ -257,8 +263,8 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
),
),
rådata = Rådata(),
innlesingId = InnlesingId.generate(),
correlationId = CorrelationId.generate(),
innlesingId = innlesingId,
correlationId = correlationId,
)
),
)
Expand All @@ -268,8 +274,8 @@ class GodskrivOpptjeningServiceTest : SpringContextTest.NoKafka() {
godskrivOpptjeningRepo.finnNesteUprosesserte()!!.also {
assertInstanceOf(GodskrivOpptjening.Status.Klar::class.java, it.status)
assertEquals(behandling.id, it.behandlingId)
assertEquals(melding.correlationId, it.correlationId)
assertEquals(melding.innlesingId, it.innlesingId)
assertEquals(correlationId, it.correlationId)
assertEquals(innlesingId, it.innlesingId)
assertEquals(behandling.omsorgsyter, it.omsorgsyter)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ object StatusServiceTest {
}

private fun lagOgLagreMelding(opprettet: Instant = now()): PersongrunnlagMelding.Mottatt {
val melding = personGrunnlagMelding(opprettet)
return personGrunnlagRepo.persist(melding)
return lagreOgHent(personGrunnlagMelding(opprettet))
}

private fun endreStatusTilFeilet(mottatt: PersongrunnlagMelding.Mottatt): PersongrunnlagMelding.Mottatt {
Expand Down Expand Up @@ -207,7 +206,7 @@ object StatusServiceTest {
@Order(2)
fun testOK() {
val melding = personGrunnlagMelding(now())
personGrunnlagRepo.persist(melding)
personGrunnlagRepo.lagre(melding)
val status = statusService.checkStatus()
assertThat(status).isEqualTo(ApplicationStatus.OK)
}
Expand All @@ -216,7 +215,7 @@ object StatusServiceTest {
@Order(3)
fun testForGammeMelding() {
val melding = personGrunnlagMelding(700.daysAgo)
personGrunnlagRepo.persist(melding)
personGrunnlagRepo.lagre(melding)
val status = statusService.checkStatus()
assertThat(status).isEqualTo(ApplicationStatus.Feil("Siste melding er for gammel"))
}
Expand All @@ -243,7 +242,7 @@ object StatusServiceTest {
@Order(6)
fun testGammelOppgaveIkkeFerdig() {
val melding = personGrunnlagMelding(now())
val mottatt = personGrunnlagRepo.persist(melding)
val mottatt = lagreOgHent(melding)

val uuid = UUID.randomUUID()

Expand All @@ -265,7 +264,7 @@ object StatusServiceTest {
val uuid1 = UUID.randomUUID()

val melding = personGrunnlagMelding(now())
val mottatt = personGrunnlagRepo.persist(melding)
val mottatt = lagreOgHent(melding)

lagreDummyBehandling(uuid1, mottatt)
lagreDummyOppgave(uuid1, mottatt, now())
Expand Down Expand Up @@ -297,4 +296,8 @@ object StatusServiceTest {
jdbcTemplate.queryForList("select * from brev_status", emptyMap<String, Any>())
.forEach { println(" brev_status $it") }
}

private fun lagreOgHent(melding: PersongrunnlagMelding.Lest): PersongrunnlagMelding.Mottatt {
return personGrunnlagRepo.lagre(melding).let { personGrunnlagRepo.find(it!!) }
}
}
Loading

0 comments on commit f5fcb8a

Please sign in to comment.