Skip to content

Commit

Permalink
Merge pull request #90 from navikt/MELDEPLIKT-345
Browse files Browse the repository at this point in the history
Meldeplikt 345
  • Loading branch information
igorweber authored Oct 26, 2022
2 parents b53e9b7 + 3a60edc commit b0a2982
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,155 +2,128 @@ package no.nav.meldeplikt.meldekortservice.config

import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.plugins.observer.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.http.content.*
import io.ktor.util.*
import io.ktor.utils.io.*
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.runBlocking
import no.nav.meldeplikt.meldekortservice.model.database.KallLogg
import no.nav.meldeplikt.meldekortservice.service.DBService
import no.nav.meldeplikt.meldekortservice.utils.*
import no.nav.meldeplikt.meldekortservice.utils.JOURNALPOST_PATH
import no.nav.meldeplikt.meldekortservice.utils.defaultLog
import no.nav.meldeplikt.meldekortservice.utils.getCallId
import no.nav.meldeplikt.meldekortservice.utils.headersToString
import java.time.Instant
import java.time.LocalDateTime
import kotlin.coroutines.CoroutineContext

class OutgoingCallLoggingPlugin(config: OCDLPConfig) {

val dbService: DBService = config.dbs
val kallLoggIdAttr = AttributeKey<Long>("kallLoggId")

class OCDLPConfig {
lateinit var dbs: DBService
}

companion object Plugin : HttpClientPlugin<OCDLPConfig, OutgoingCallLoggingPlugin> {
override val key: AttributeKey<OutgoingCallLoggingPlugin> = AttributeKey("OutgoingCallLoggingPlugin")

override fun prepare(block: OCDLPConfig.() -> Unit): OutgoingCallLoggingPlugin {
val config = OCDLPConfig().apply(block)

return OutgoingCallLoggingPlugin(config)
}

override fun install(plugin: OutgoingCallLoggingPlugin, scope: HttpClient) {

var startTime = LocalDateTime.now()
var kallTid = Instant.now().toEpochMilli()
var responseBody = ""

scope.requestPipeline.intercept(HttpRequestPipeline.State) {
// Det er mulig at vi sender request før vi får noe request fra meldekort-api
// F.eks for å hente noe config eller lignende
// Det betyr at vi kkke har noe callId ennå og da må vi generere den
val callId = getCallId()
startTime = LocalDateTime.now()
kallTid = Instant.now().toEpochMilli()
context.headers.append(HttpHeaders.XRequestId, callId)
class OutgoingCallLoggingPlugin(val dbService: DBService) {

fun intercept(httpClient: HttpClient) {
httpClient.plugin(HttpSend).intercept { requestBuilder ->
// Prepare
val callId = getCallId()
val startTime = LocalDateTime.now()
val kallTid = Instant.now().toEpochMilli()

requestBuilder.headers.append(HttpHeaders.XRequestId, callId)

// Execute call
val originalCall = execute(requestBuilder)

// Save data
val request = originalCall.request
val response = originalCall.response

val responseBody = response.bodyAsText(Charsets.UTF_8)

try {
defaultDbService.lagreKallLogg(
KallLogg(
korrelasjonId = callId,
tidspunkt = startTime,
type = "REST",
kallRetning = "UT",
method = request.method.value,
operation = request.url.encodedPath,
status = response.status.value,
kallTid = Instant.now().toEpochMilli() - kallTid,
request = buildRequest(requestBuilder.executionContext, request),
response = buildResponse(response, responseBody),
logginfo = ""
)
)
} catch (e: Exception) {
defaultLog.error("Kunne ikke lagre kall logg", e)
}

scope.responsePipeline.intercept(HttpResponsePipeline.Receive) { (type, content) ->
if (content !is ByteReadChannel) return@intercept
// Response content can be read only once. Wrap the call with the content we have read
originalCall.wrapWithContent(ByteReadChannel(responseBody.toByteArray()))
}
}

val byteArray = ByteArray(content.availableForRead)
content.readAvailable(byteArray)
val result = ByteReadChannel(byteArray)
val responseContainer = HttpResponseContainer(type, result)
responseBody = String(byteArray, context.response.charset() ?: Charsets.UTF_8)
private fun buildRequest(coroutineContext: CoroutineContext, request: HttpRequest): String {
return StringBuilder().apply {
appendLine("${request.method.value} ${request.url.protocol.name}://${request.url.hostWithPort}${request.url.fullPath}")

proceedWith(responseContainer)
request.headers.forEach { header, values ->
appendLine("$header: ${headersToString(values)}")
}

scope.responsePipeline.intercept(HttpResponsePipeline.After) {
val callId = getCallId()
val request = context.request
val response = context.response

try {
val kallLoggId = plugin.dbService.lagreKallLogg(
KallLogg(
korrelasjonId = callId,
tidspunkt = startTime,
type = "REST",
kallRetning = "UT",
method = request.method.value,
operation = request.url.encodedPath,
status = response.status.value,
kallTid = Instant.now().toEpochMilli() - kallTid,
request = buildRequest(context.coroutineContext, request),
response = buildResponse(response, responseBody),
logginfo = ""
// empty line before body as in HTTP request
appendLine()

if (request.url.encodedPath == JOURNALPOST_PATH) {
appendLine("JOURNALPOST")
} else {
when (request.content) {
is OutgoingContent.ByteArrayContent -> {
append(
String(
(request.content as OutgoingContent.ByteArrayContent).bytes(),
Charsets.UTF_8
)
)
)
response.call.attributes.put(plugin.kallLoggIdAttr, kallLoggId)
} catch (e: Exception) {
defaultLog.error("Kunne ikke lagre kall logg", e)
}
}
}

private fun buildRequest(coroutineContext: CoroutineContext, request: HttpRequest): String {
return StringBuilder().apply {
appendLine("${request.method.value} ${request.url.protocol.name}://${request.url.hostWithPort}${request.url.fullPath}")

request.headers.forEach { header, values ->
appendLine("$header: ${headersToString(values)}")
}
}
is OutgoingContent.WriteChannelContent -> {
val buffer = StringBuilder()
val channel = ByteChannel(true)

// empty line before body as in HTTP request
appendLine()

if (request.url.encodedPath == JOURNALPOST_PATH) {
appendLine("JOURNALPOST")
} else {
when (request.content) {
is OutgoingContent.ByteArrayContent -> {
append(
String(
(request.content as OutgoingContent.ByteArrayContent).bytes(),
Charsets.UTF_8
)
)
}
is OutgoingContent.WriteChannelContent -> {
val buffer = StringBuilder()
val channel = ByteChannel(true)

runBlocking {
GlobalScope.writer(coroutineContext, autoFlush = true) {
(request.content as OutgoingContent.WriteChannelContent).writeTo(channel)
}

while (!channel.isClosedForRead) {
channel.readUTF8LineTo(buffer)
}
runBlocking {
GlobalScope.writer(coroutineContext, autoFlush = true) {
(request.content as OutgoingContent.WriteChannelContent).writeTo(channel)
}

appendLine(buffer.toString())
}
else -> {
appendLine(request.content)
while (!channel.isClosedForRead) {
channel.readUTF8LineTo(buffer)
}
}

appendLine(buffer.toString())
}
else -> {
appendLine(request.content)
}
}
}.toString()
}
}
}.toString()
}

private fun buildResponse(response: HttpResponse, responseBody: String): String {
return StringBuilder().apply {
appendLine("${response.version} ${response.status.value} ${response.status.description}")
private fun buildResponse(response: HttpResponse, responseBody: String): String {
return StringBuilder().apply {
appendLine("${response.version} ${response.status.value} ${response.status.description}")

response.headers.forEach { header, values ->
appendLine("$header: ${headersToString(values)}")
}
response.headers.forEach { header, values ->
appendLine("$header: ${headersToString(values)}")
}

// empty line before body as in HTTP response
appendLine()
// empty line before body as in HTTP response
appendLine()

appendLine(responseBody)
}.toString()
}
appendLine(responseBody)
}.toString()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ fun HttpClientConfig<*>.defaultHttpClientConfig() {
requestTimeoutMillis = 10000 // required for an HTTP call: from sending a request to receiving a response
socketTimeoutMillis = 10000 // of inactivity between two data packets when exchanging data with a server
}
install(OutgoingCallLoggingPlugin) {
dbs = defaultDbService
install("OutgoingCallInterceptor") {
OutgoingCallLoggingPlugin(defaultDbService).intercept(this)
}
expectSuccess = false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
BEGIN
BEGIN
DBMS_SCHEDULER.DROP_JOB(job_name => 'delete_old_kall_logg_partitions');
EXCEPTION
WHEN OTHERS THEN
NULL; -- Ignore exception
END;

DBMS_SCHEDULER.CREATE_JOB(
job_name => 'delete_old_kall_logg_partitions',
job_type => 'STORED_PROCEDURE',
job_action => 'del_old_kall_logg_partitions',
start_date => TIMESTAMP '2022-10-01 05:00:00',
repeat_interval => 'FREQ=DAILY;BYHOUR=05;BYMINUTE=00',
enabled => true
);
END;
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,3 @@ BEGIN
END IF;
END LOOP;
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_JOB (
job_name => 'delete_old_kall_logg_partitions',
job_type => 'STORED_PROCEDURE',
job_action => 'del_old_kall_logg_partitions',
start_date => TIMESTAMP '2022-10-01 05:00:00',
repeat_interval => 'FREQ=DAILY;BYHOUR=05;BYMINUTE=00',
enabled => true);
END;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Created and left empty intentionally
Loading

0 comments on commit b0a2982

Please sign in to comment.