Skip to content

Commit

Permalink
Dev/cpa sync fix (#36)
Browse files Browse the repository at this point in the history
* Remove unused "upsert" header when upserting CPA

* Rework and refactor CpaSync

Includes same functionality except:
 - Reads CPA ID from file content, not from filename.
 - When finding duplicate CPA IDs from NFS, throw exception and cancel sync.
 - Skips upsert if DB entry is newer than NFS entry.
  • Loading branch information
GardOS authored Aug 27, 2024
1 parent 2cb7f40 commit 67745be
Show file tree
Hide file tree
Showing 4 changed files with 881 additions and 243 deletions.
6 changes: 2 additions & 4 deletions smtp-listeners/src/main/kotlin/no/nav/emottak/HttpClients.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import no.nav.emottak.smtp.log
import java.net.InetSocketAddress
import java.net.Proxy
import java.net.URL
import java.time.Instant

val URL_CPA_REPO_BASE = getEnvVar("URL_CPA_REPO", "http://cpa-repo.team-emottak.svc.nais.local")
val URL_CPA_REPO_PUT = "$URL_CPA_REPO_BASE/cpa".also { log.info("CPA REPO PUT URL: [$it]") }
Expand Down Expand Up @@ -120,11 +119,10 @@ suspend fun HttpClient.getCPATimestamps() =
this.get(URL_CPA_REPO_TIMESTAMPS).bodyAsText()
)

suspend fun HttpClient.putCPAinCPARepo(cpaFile: String, lastModified: Instant) =
suspend fun HttpClient.putCPAinCPARepo(cpaFile: String, lastModified: String) =
this.post(URL_CPA_REPO_PUT) {
headers {
header("updated_date", lastModified.toString())
header("upsert", "true") // Upsert kan nok alltid brukes (?)
header("updated_date", lastModified)
}
setBody(cpaFile)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,100 +12,93 @@ import org.slf4j.LoggerFactory
import java.time.Instant
import java.time.temporal.ChronoUnit

class CpaSyncService(private val cpaRepoClient: HttpClient, private val nfsConnector: NFSConnector) {
data class NfsCpa(val id: String, val timestamp: String, val content: String)

class CpaSyncService(private val cpaRepoClient: HttpClient, private val nfsConnector: NFSConnector) {
private val log: Logger = LoggerFactory.getLogger("no.nav.emottak.smtp.cpasync")

suspend fun sync() {
return runCatching {
val cpaTimestamps = cpaRepoClient.getCPATimestamps()
processAndSyncEntries(cpaTimestamps)
val dbCpaMap = cpaRepoClient.getCPATimestamps()
val nfsCpaMap = getNfsCpaMap()
upsertFreshCpa(nfsCpaMap, dbCpaMap)
deleteStaleCpa(nfsCpaMap.keys, dbCpaMap)
}.onFailure {
logFailure(it)
}.getOrThrow()
}

private suspend fun processAndSyncEntries(cpaTimestamps: Map<String, String>) {
nfsConnector.use { connector ->
val staleCpaTimestamps = connector.folder().asSequence()
internal fun getNfsCpaMap(): Map<String, NfsCpa> {
return nfsConnector.use { connector ->
connector.folder().asSequence()
.filter { entry -> isXmlFileEntry(entry) }
.fold(cpaTimestamps) { accumulatedCpaTimestamps, entry ->
val filename = entry.filename
val lastModified = getLastModifiedInstant(entry.attrs.mTime.toLong())
val shouldSkip = shouldSkipFile(filename, lastModified, accumulatedCpaTimestamps)
if (!shouldSkip) {
runCatching {
log.info("Fetching file $filename")
val cpaFileContent = connector.file("/outbound/cpa/$filename").use {
String(it.readAllBytes())
}
log.info("Uploading $filename")
cpaRepoClient.putCPAinCPARepo(cpaFileContent, lastModified)
}.onFailure {
log.error("Error uploading $filename to cpa-repo: ${it.message}", it)
}
}

filterStaleCpaTimestamps(filename, lastModified, accumulatedCpaTimestamps)
.fold(mutableMapOf()) { accumulator, nfsCpaFile ->
val nfsCpa = getNfsCpa(connector, nfsCpaFile)

val existingEntry = accumulator.put(nfsCpa.id, nfsCpa)
require(existingEntry == null) { "NFS contains duplicate CPA IDs. Aborting sync." }

accumulator
}
}
}

deleteStaleCpaEntries(staleCpaTimestamps)
internal fun isXmlFileEntry(entry: ChannelSftp.LsEntry): Boolean {
if (entry.filename.endsWith(".xml")) {
return true
}
log.warn("${entry.filename} is ignored. Invalid file ending")
return false
}

internal fun getLastModifiedInstant(mTimeInSeconds: Long): Instant {
return Instant.ofEpochSecond(mTimeInSeconds).truncatedTo(ChronoUnit.SECONDS)
internal fun getNfsCpa(connector: NFSConnector, nfsCpaFile: ChannelSftp.LsEntry): NfsCpa {
val timestamp = getLastModified(nfsCpaFile.attrs.mTime.toLong())
val cpaContent = fetchNfsCpaContent(connector, nfsCpaFile)
val cpaId = getCpaIdFromCpaContent(cpaContent)
require(cpaId != null) {
"Regex to find CPA ID in file ${nfsCpaFile.filename} did not find any match. " +
"File corrupted or wrongful regex. Aborting sync."
}

return NfsCpa(cpaId, timestamp, cpaContent)
}

private fun fetchNfsCpaContent(nfsConnector: NFSConnector, nfsCpaFile: ChannelSftp.LsEntry): String {
return nfsConnector.file("/outbound/cpa/${nfsCpaFile.filename}").use {
String(it.readAllBytes())
}
}

private fun isXmlFileEntry(entry: ChannelSftp.LsEntry) = if (!entry.filename.endsWith(".xml")) {
log.warn("${entry.filename} is ignored")
false
} else {
true
private fun getCpaIdFromCpaContent(cpaContent: String): String? {
return Regex("cppa:cpaid=\"(?<cpaId>.+?)\"")
.find(cpaContent)?.groups?.get("cpaId")?.value
}

private fun filterStaleCpaTimestamps(
filename: String,
lastModified: Instant,
cpaTimestamps: Map<String, String>
): Map<String, String> {
return cpaTimestamps.filter { (cpaId, timestamp) -> isStaleCpa(cpaId, filename, timestamp, lastModified) }
internal fun getLastModified(mTimeInSeconds: Long): String {
return Instant.ofEpochSecond(mTimeInSeconds).truncatedTo(ChronoUnit.SECONDS).toString()
}

private fun isStaleCpa(
cpaId: String,
filename: String,
timestamp: String,
lastModified: Instant
): Boolean {
val formattedCpaId = cpaId.replace(":", ".") + ".xml"
return if (filename == formattedCpaId) {
if (timestamp != lastModified.toString()) {
log.info("$filename has different timestamp, should be updated")
private suspend fun upsertFreshCpa(nfsCpaMap: Map<String, NfsCpa>, dbCpaMap: Map<String, String>) {
nfsCpaMap.forEach { entry ->
if (shouldUpsertCpa(entry.value.timestamp, dbCpaMap[entry.key])) {
log.info("Upserting new/modified CPA: ${entry.key} - ${entry.value.timestamp}")
cpaRepoClient.putCPAinCPARepo(entry.value.content, entry.value.timestamp)
} else {
log.info("Skipping upsert for unmodified CPA: ${entry.key} - ${entry.value.timestamp}")
}
false
} else {
true // the file will be deleted
}
}

internal fun shouldSkipFile(
filename: String,
lastModified: Instant,
cpaTimestamps: Map<String, String>
): Boolean {
return cpaTimestamps
.filterKeys { cpaId -> filename == cpaId.replace(":", ".") + ".xml" }
.filterValues { timestamp -> lastModified.toString() == timestamp }
.ifEmpty {
log.info("Could not find matching timestamp for file $filename with lastModified timestamp $lastModified")
return false
}.any()
internal fun shouldUpsertCpa(nfsTimestamp: String, dbTimestamp: String?): Boolean {
if (dbTimestamp == null) return true
return Instant.parse(nfsTimestamp) > Instant.parse(dbTimestamp)
}

internal suspend fun deleteStaleCpaEntries(cpaTimestamps: Map<String, String>) {
cpaTimestamps.forEach { (cpaId) ->
cpaRepoClient.deleteCPAinCPARepo(cpaId)
private suspend fun deleteStaleCpa(nfsCpaIds: Set<String>, dbCpaMap: Map<String, String>) {
val staleCpa = dbCpaMap - nfsCpaIds
staleCpa.forEach { entry ->
log.info("Deleting stale entry: ${entry.key} - ${entry.value}")
cpaRepoClient.deleteCPAinCPARepo(entry.key)
}
}

Expand Down
Loading

0 comments on commit 67745be

Please sign in to comment.