From 0d19abf700ba5cc4dfc4631eab27af62bff89ffc Mon Sep 17 00:00:00 2001 From: Oleksandr Dzhychko Date: Fri, 8 Mar 2024 08:49:18 +0100 Subject: [PATCH 1/4] perf(model-server): introduce and use suspendable transactions --- .../model/server/handlers/HistoryHandler.kt | 15 +++-- .../handlers/KeyValueLikeModelServer.kt | 13 ++-- .../server/handlers/RepositoriesManager.kt | 65 ++++++++++--------- .../model/server/store/IStoreClient.kt | 6 ++ .../model/server/templates/PageWithMenuBar.kt | 1 + .../modelix/modelql/server/ModelQLServer.kt | 2 +- 6 files changed, 61 insertions(+), 41 deletions(-) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt index bac97d6f86..d603baeb07 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/HistoryHandler.kt @@ -68,6 +68,8 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager: val params = call.request.queryParameters val limit = toInt(params["limit"], 500) val skip = toInt(params["skip"], 0) + val latestVersion = repositoriesManager.getVersion(branch) + checkNotNull(latestVersion) { "Branch not found: $branch" } call.respondHtmlTemplate(PageWithMenuBar("repos/", "../../..")) { headContent { style { @@ -80,7 +82,7 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager: repositoryPageStyle() } bodyContent { - buildRepositoryPage(branch, params["head"], skip, limit) + buildRepositoryPage(branch, latestVersion, params["head"], skip, limit) } } } @@ -105,7 +107,7 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager: } } - fun revert(repositoryAndBranch: BranchReference, from: String?, to: String?, author: String?) { + suspend fun revert(repositoryAndBranch: BranchReference, from: String?, to: String?, author: String?) { val version = repositoriesManager.getVersion(repositoryAndBranch) ?: throw RuntimeException("Branch doesn't exist: $repositoryAndBranch") val branch = OTBranch(PBranch(version.tree, client.idGenerator), client.idGenerator, client.storeCache!!) branch.runWriteT { t -> @@ -160,8 +162,13 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager: } } - private fun FlowContent.buildRepositoryPage(repositoryAndBranch: BranchReference, headHash: String?, skip: Int, limit: Int) { - val latestVersion = repositoriesManager.getVersion(repositoryAndBranch) ?: throw RuntimeException("Branch not found: $repositoryAndBranch") + private fun FlowContent.buildRepositoryPage( + repositoryAndBranch: BranchReference, + latestVersion: CLVersion, + headHash: String?, + skip: Int, + limit: Int, + ) { val headVersion = if (headHash == null || headHash.length == 0) latestVersion else CLVersion(headHash, client.storeCache!!) var rowIndex = 0 h1 { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt index 2b2b65ca3c..625567a6fe 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt @@ -27,6 +27,7 @@ import io.ktor.server.resources.put import io.ktor.server.response.respondText import io.ktor.server.routing.routing import io.ktor.util.pipeline.PipelineContext +import kotlinx.coroutines.runBlocking import kotlinx.html.br import kotlinx.html.div import kotlinx.html.h1 @@ -48,12 +49,12 @@ import org.modelix.model.lazy.RepositoryId import org.modelix.model.persistent.HashUtil import org.modelix.model.server.store.IStoreClient import org.modelix.model.server.store.pollEntry +import org.modelix.model.server.store.runTransactionSuspendable import org.modelix.model.server.templates.PageWithMenuBar import org.slf4j.LoggerFactory import java.io.IOException import java.util.* import java.util.regex.Pattern -import kotlin.collections.LinkedHashMap val PERMISSION_MODEL_SERVER = "model-server".asResource() val MODEL_SERVER_ENTRY = KeycloakResourceType("model-server-entry", KeycloakScope.READ_WRITE_DELETE) @@ -85,7 +86,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { // request to initialize it lazily, would make the code less robust. // Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need // the special conditions in the affected requests to be updated. - repositoriesManager.maybeInitAndGetSeverId() + runBlocking { repositoriesManager.maybeInitAndGetSeverId() } application.apply { modelServerModule() } @@ -283,7 +284,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { return result } - protected fun CallContext.putEntries(newEntries: Map) { + protected suspend fun CallContext.putEntries(newEntries: Map) { val referencedKeys: MutableSet = HashSet() for ((key, value) in newEntries) { checkKeyPermission(key, EPermissionType.WRITE) @@ -336,14 +337,14 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { HashUtil.checkObjectHashes(hashedObjects) - repositoriesManager.client.store.runTransaction { + repositoriesManager.client.store.runTransactionSuspendable { storeClient.putAll(hashedObjects) storeClient.putAll(userDefinedEntries) for ((branch, value) in branchChanges) { if (value == null) { - repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName)) + runBlocking { repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName)) } } else { - repositoriesManager.mergeChanges(branch, value) + runBlocking { repositoriesManager.mergeChanges(branch, value) } } } } diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 2cd5436c31..e7bf4c525b 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -42,12 +42,27 @@ import org.modelix.model.persistent.CPVersion import org.modelix.model.server.store.IStoreClient import org.modelix.model.server.store.LocalModelClient import org.modelix.model.server.store.pollEntry +import org.modelix.model.server.store.runTransactionSuspendable import java.lang.ref.SoftReference import java.util.UUID class RepositoriesManager(val client: LocalModelClient) { init { + fun migrateLegacyRepositoriesList() { + val legacyRepositories = listLegacyRepositories().groupBy { it.repositoryId } + if (legacyRepositories.isNotEmpty()) { + // To not use `runTransactionSuspendable` like everywhere else, + // because this is blocking initialization code anyways. + store.runTransaction { + ensureRepositoriesAreInList(legacyRepositories.keys) + for ((legacyRepository, legacyBranches) in legacyRepositories) { + ensureBranchesAreInList(legacyRepository, legacyBranches.map { it.branchName }.toSet()) + } + } + } + } + migrateLegacyRepositoriesList() } @@ -75,8 +90,8 @@ class RepositoriesManager(val client: LocalModelClient) { * If the server ID was created previously but is only stored under a legacy database key, * it also gets stored under the current and all legacy database keys. */ - fun maybeInitAndGetSeverId(): String { - return store.runTransaction { + suspend fun maybeInitAndGetSeverId(): String { + return store.runTransactionSuspendable { var serverId = store[SERVER_ID_KEY] if (serverId == null) { serverId = store[LEGACY_SERVER_ID_KEY2] @@ -102,9 +117,9 @@ class RepositoriesManager(val client: LocalModelClient) { private fun repositoryExists(repositoryId: RepositoryId) = getRepositories().contains(repositoryId) - fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true): CLVersion { + suspend fun createRepository(repositoryId: RepositoryId, userName: String?, useRoleIds: Boolean = true): CLVersion { var initialVersion: CLVersion? = null - store.runTransaction { + store.runTransactionSuspendable { val masterBranch = repositoryId.getBranchReference() if (repositoryExists(repositoryId)) throw RepositoryAlreadyExistsException(repositoryId.id) val existingRepositories = getRepositories() @@ -160,10 +175,10 @@ class RepositoriesManager(val client: LocalModelClient) { } } - fun removeRepository(repository: RepositoryId): Boolean { - return store.runTransaction { + suspend fun removeRepository(repository: RepositoryId): Boolean { + return store.runTransactionSuspendable { if (!repositoryExists(repository)) { - return@runTransaction false + return@runTransactionSuspendable false } for (branchName in getBranchNames(repository)) { @@ -178,9 +193,9 @@ class RepositoriesManager(val client: LocalModelClient) { } } - fun removeBranches(repository: RepositoryId, branchNames: Set) { + suspend fun removeBranches(repository: RepositoryId, branchNames: Set) { if (branchNames.isEmpty()) return - store.runTransaction { + store.runTransactionSuspendable { val key = branchListKey(repository) val existingBranches = store[key]?.lines()?.toSet() ?: emptySet() val remainingBranches = existingBranches - branchNames @@ -191,11 +206,10 @@ class RepositoriesManager(val client: LocalModelClient) { } } - fun mergeChanges(branch: BranchReference, newVersionHash: String): String { + suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String { var result: String? = null - store.runTransaction { - val branchKey = branchKey(branch) - val headHash = getVersionHash(branch) + store.runTransactionSuspendable { + val headHash = getVersionHashInsideTransaction(branch) val mergedHash = if (headHash == null) { newVersionHash } else { @@ -217,17 +231,20 @@ class RepositoriesManager(val client: LocalModelClient) { return result!! } - fun getVersion(branch: BranchReference): CLVersion? { + suspend fun getVersion(branch: BranchReference): CLVersion? { return getVersionHash(branch)?.let { CLVersion.loadFromHash(it, client.storeCache) } } - fun getVersionHash(branch: BranchReference): String? { - return store.runTransaction { - store[branchKey(branch)] - ?: store[legacyBranchKey(branch)]?.also { store.put(branchKey(branch), it, true) } + suspend fun getVersionHash(branch: BranchReference): String? { + return store.runTransactionSuspendable { + getVersionHashInsideTransaction(branch) } } + private fun getVersionHashInsideTransaction(branch: BranchReference): String? { + return store[branchKey(branch)] ?: store[legacyBranchKey(branch)]?.also { store.put(branchKey(branch), it, true) } + } + private fun putVersionHash(branch: BranchReference, hash: String?) { store.put(branchKey(branch), hash, false) store.put(legacyBranchKey(branch), hash, false) @@ -295,18 +312,6 @@ class RepositoriesManager(val client: LocalModelClient) { private fun branchListKey(repositoryId: RepositoryId) = "$KEY_PREFIX:repositories:${repositoryId.id}:branches" - fun migrateLegacyRepositoriesList() { - val legacyRepositories = listLegacyRepositories().groupBy { it.repositoryId } - if (legacyRepositories.isNotEmpty()) { - store.runTransaction { - ensureRepositoriesAreInList(legacyRepositories.keys) - for ((legacyRepository, legacyBranches) in legacyRepositories) { - ensureBranchesAreInList(legacyRepository, legacyBranches.map { it.branchName }.toSet()) - } - } - } - } - private fun listLegacyRepositories(): Set { val result: MutableSet = HashSet() val infoVersionHash = client[RepositoryId("info").getBranchReference().getKey()] ?: return emptySet() diff --git a/model-server/src/main/kotlin/org/modelix/model/server/store/IStoreClient.kt b/model-server/src/main/kotlin/org/modelix/model/server/store/IStoreClient.kt index 2869ccfb6d..2f71ec5135 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/store/IStoreClient.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/store/IStoreClient.kt @@ -14,9 +14,11 @@ */ package org.modelix.model.server.store +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull import org.modelix.model.IKeyListener import java.io.File @@ -36,6 +38,10 @@ interface IStoreClient : AutoCloseable { fun runTransaction(body: () -> T): T } +suspend fun IStoreClient.runTransactionSuspendable(body: () -> T): T { + return withContext(Dispatchers.IO) { runTransaction(body) } +} + suspend fun pollEntry(storeClient: IStoreClient, key: String, lastKnownValue: String?): String? { var result: String? = null coroutineScope { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/templates/PageWithMenuBar.kt b/model-server/src/main/kotlin/org/modelix/model/server/templates/PageWithMenuBar.kt index b8b541fa98..7fdb89e5e3 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/templates/PageWithMenuBar.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/templates/PageWithMenuBar.kt @@ -62,6 +62,7 @@ class PageWithMenuBar(val activePage: String, val baseUrl: String) : Template INode?, val handleCall(call, { rootNode to area }, {}) } - suspend fun handleCall(call: ApplicationCall, input: suspend (write: Boolean) -> Pair, afterQueryExecution: () -> Unit = {}) { + suspend fun handleCall(call: ApplicationCall, input: suspend (write: Boolean) -> Pair, afterQueryExecution: suspend () -> Unit = {}) { try { val serializedQuery = call.receiveText() val json = UntypedModelQL.json From 4800396e430cb56d84947965636c505fba11d3dc Mon Sep 17 00:00:00 2001 From: Oleksandr Dzhychko Date: Fri, 8 Mar 2024 08:50:28 +0100 Subject: [PATCH 2/4] perf(model-server): avoid converting object maps to object flows and vice versa --- .../server/handlers/ModelReplicationServer.kt | 15 +-- .../server/handlers/RepositoriesManager.kt | 108 ++++++++++++------ 2 files changed, 77 insertions(+), 46 deletions(-) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt index 94a1d1e8ca..b04b551826 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/ModelReplicationServer.kt @@ -39,7 +39,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEmpty import kotlinx.coroutines.flow.withIndex import kotlinx.coroutines.withContext @@ -61,7 +60,6 @@ import org.modelix.model.operations.OTBranch import org.modelix.model.persistent.HashUtil import org.modelix.model.server.api.v2.VersionDelta import org.modelix.model.server.api.v2.VersionDeltaStream -import org.modelix.model.server.api.v2.toMap import org.modelix.model.server.store.IStoreClient import org.modelix.model.server.store.LocalModelClient import org.modelix.modelql.server.ModelQLServer @@ -231,9 +229,8 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) { val delta = VersionDelta( newVersionHash, lastVersionHash, - objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).toMap(), + objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).asMap(), ) - delta.checkObjectHashes() send(Json.encodeToString(delta)) lastVersionHash = newVersionHash } @@ -370,16 +367,14 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) { val delta = VersionDelta( versionHash, baseVersionHash, - objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).toMap(), + objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).asMap(), ) - delta.checkObjectHashes() respond(delta) } private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) { respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) { - repositoriesManager.computeDelta(versionHash, baseVersionHash) - .checkObjectHashes() + repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow() .flatten() .withSeparator("\n") .onEmpty { emit(versionHash) } @@ -410,7 +405,3 @@ private fun Flow.withSeparator(separator: String) = flow { emit(it) } } - -private fun Flow>.checkObjectHashes(): Flow> { - return onEach { HashUtil.checkObjectHash(it.first, it.second) } -} diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index e7bf4c525b..226121229a 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -16,8 +16,8 @@ package org.modelix.model.server.handlers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.channelFlow -import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.runBlocking import kotlinx.datetime.Clock import org.apache.commons.collections4.map.LRUMap @@ -30,6 +30,7 @@ import org.modelix.model.api.ITree import org.modelix.model.api.IdGeneratorDummy import org.modelix.model.api.PBranch import org.modelix.model.api.runSynchronized +import org.modelix.model.client2.checkObjectHashes import org.modelix.model.lazy.BranchReference import org.modelix.model.lazy.CLTree import org.modelix.model.lazy.CLVersion @@ -39,6 +40,8 @@ import org.modelix.model.lazy.RepositoryId import org.modelix.model.lazy.computeDelta import org.modelix.model.metameta.MetaModelBranch import org.modelix.model.persistent.CPVersion +import org.modelix.model.persistent.HashUtil +import org.modelix.model.server.api.v2.toMap import org.modelix.model.server.store.IStoreClient import org.modelix.model.server.store.LocalModelClient import org.modelix.model.server.store.pollEntry @@ -255,39 +258,12 @@ class RepositoriesManager(val client: LocalModelClient) { ?: throw IllegalStateException("No version found for branch '${branch.branchName}' in repository '${branch.repositoryId}'") } - private val deltaCache = LRUMap, SoftReference>>>(10) - fun computeDelta(versionHash: String, baseVersionHash: String?): Flow> { - if (versionHash == baseVersionHash) return emptyFlow() + private val deltaCache = LRUMap, SoftReference>>(10) + fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData { + if (versionHash == baseVersionHash) return ObjectData.empty if (baseVersionHash == null) { // no need to cache anything if there is no delta computation happening - - return channelFlow { - val version = CLVersion(versionHash, objectStore) - // Use a bulk query to make as few request to the underlying store as possible. - val bulkQuery = objectStore.newBulkQuery() - // It is unsatisfactory that we have to keep already emitted hashes in memory. - // But without changing the underlying model, - // we have to do this to not emit objects more than once. - val seenHashes = mutableSetOf() - fun emitObjects(entry: KVEntryReference<*>) { - if (seenHashes.contains(entry.getHash())) return - seenHashes.add(entry.getHash()) - bulkQuery.get(entry).onSuccess { - val value = checkNotNull(it) { "No value received for ${entry.getHash()}" } - // Use `send` instead of `trySend`, - // because `trySend` fails if the channel capacity is full. - // This might happen if the data is produced faster than consumed. - // A better solution would be to have bulk queries which itself are asynchronous - // but doing that needs more consideration. - runBlocking { channel.send(entry.getHash() to value.serialize()) } - for (referencedEntry in value.getReferencedEntries()) { - emitObjects(referencedEntry) - } - } - } - emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER)) - bulkQuery.process() - } + return allObjectDataAsFlow(versionHash) } return runSynchronized(deltaCache) { @@ -297,9 +273,47 @@ class RepositoriesManager(val client: LocalModelClient) { // SoftReference because deltas can be very large val version = CLVersion(versionHash, client.storeCache) val baseVersion = CLVersion(baseVersionHash, client.storeCache) - version.computeDelta(baseVersion) + val objectsMap = version.computeDelta(baseVersion) + ObjectDataMap(objectsMap) }.also { deltaCache[key] = SoftReference(it) } - }.value.entries.asFlow().map { it.toPair() } + }.value + } + + private fun allObjectDataAsFlow(versionHash: String): ObjectDataFlow { + val hashObjectFlow = channelFlow { + val version = CLVersion(versionHash, objectStore) + // Use a bulk query to make as few request to the underlying store as possible. + val bulkQuery = objectStore.newBulkQuery() + // It is unsatisfactory that we have to keep already emitted hashes in memory. + // But without changing the underlying model, + // we have to do this to not emit objects more than once. + val seenHashes = mutableSetOf() + fun emitObjects(entry: KVEntryReference<*>) { + if (seenHashes.contains(entry.getHash())) return + seenHashes.add(entry.getHash()) + bulkQuery.get(entry).onSuccess { + val value = checkNotNull(it) { "No value received for ${entry.getHash()}" } + // Use `send` instead of `trySend`, + // because `trySend` fails if the channel capacity is full. + // This might happen if the data is produced faster than consumed. + // A better solution would be to have bulk queries which itself are asynchronous + // but doing that needs more consideration. + runBlocking { + // Maybe we should avoid Flow> and use Flow. + // This needs profiling. + channel.send(entry.getHash() to value.serialize()) + } + for (referencedEntry in value.getReferencedEntries()) { + emitObjects(referencedEntry) + } + } + } + emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER)) + bulkQuery.process() + } + val checkedHashObjectFlow = hashObjectFlow.checkObjectHashes() + val objectData = ObjectDataFlow(checkedHashObjectFlow) + return objectData } private fun branchKey(branch: BranchReference): String { @@ -342,3 +356,29 @@ class RepositoriesManager(val client: LocalModelClient) { } class RepositoryAlreadyExistsException(val name: String) : IllegalStateException("Repository '$name' already exists") + +sealed interface ObjectData { + suspend fun asMap(): Map + fun asFlow(): Flow> + + companion object { + val empty = ObjectDataMap(emptyMap()) + } +} + +class ObjectDataMap(private val byHashObjects: Map) : ObjectData { + init { + HashUtil.checkObjectHashes(byHashObjects) + } + override suspend fun asMap(): Map = byHashObjects + override fun asFlow(): Flow> = byHashObjects.entries.asFlow().map { it.toPair() } +} + +class ObjectDataFlow(private val hashObjectFlow: Flow>) : ObjectData { + override suspend fun asMap(): Map = hashObjectFlow.toMap() + override fun asFlow(): Flow> = hashObjectFlow +} + +private fun Flow>.checkObjectHashes(): Flow> { + return onEach { HashUtil.checkObjectHash(it.first, it.second) } +} From 8308e422d4a6932ad6e482e5af243d0c40510a9b Mon Sep 17 00:00:00 2001 From: Oleksandr Dzhychko Date: Fri, 8 Mar 2024 08:51:11 +0100 Subject: [PATCH 3/4] perf(model-server): make version delta computation suspendable Execute non suspendable part outside request threads. --- .../server/handlers/RepositoriesManager.kt | 115 +++++++++++------- 1 file changed, 74 insertions(+), 41 deletions(-) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 226121229a..7420bc4f3d 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -13,11 +13,15 @@ */ package org.modelix.model.server.handlers +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.datetime.Clock import org.apache.commons.collections4.map.LRUMap @@ -29,8 +33,6 @@ import org.modelix.model.api.IReadTransaction import org.modelix.model.api.ITree import org.modelix.model.api.IdGeneratorDummy import org.modelix.model.api.PBranch -import org.modelix.model.api.runSynchronized -import org.modelix.model.client2.checkObjectHashes import org.modelix.model.lazy.BranchReference import org.modelix.model.lazy.CLTree import org.modelix.model.lazy.CLVersion @@ -46,6 +48,7 @@ import org.modelix.model.server.store.IStoreClient import org.modelix.model.server.store.LocalModelClient import org.modelix.model.server.store.pollEntry import org.modelix.model.server.store.runTransactionSuspendable +import org.slf4j.LoggerFactory import java.lang.ref.SoftReference import java.util.UUID @@ -258,58 +261,52 @@ class RepositoriesManager(val client: LocalModelClient) { ?: throw IllegalStateException("No version found for branch '${branch.branchName}' in repository '${branch.repositoryId}'") } - private val deltaCache = LRUMap, SoftReference>>(10) - fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData { + private val versionDeltaCache = VersionDeltaCache(client.storeCache) + suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData { if (versionHash == baseVersionHash) return ObjectData.empty if (baseVersionHash == null) { // no need to cache anything if there is no delta computation happening return allObjectDataAsFlow(versionHash) } - return runSynchronized(deltaCache) { - val key = versionHash to baseVersionHash - deltaCache.get(key)?.get() ?: lazy { - // lazy { ... } allows to run the computation without locking deltaCache - // SoftReference because deltas can be very large - val version = CLVersion(versionHash, client.storeCache) - val baseVersion = CLVersion(baseVersionHash, client.storeCache) - val objectsMap = version.computeDelta(baseVersion) - ObjectDataMap(objectsMap) - }.also { deltaCache[key] = SoftReference(it) } - }.value + return versionDeltaCache.getOrComputeDelta(versionHash, baseVersionHash) } private fun allObjectDataAsFlow(versionHash: String): ObjectDataFlow { val hashObjectFlow = channelFlow { - val version = CLVersion(versionHash, objectStore) - // Use a bulk query to make as few request to the underlying store as possible. - val bulkQuery = objectStore.newBulkQuery() - // It is unsatisfactory that we have to keep already emitted hashes in memory. - // But without changing the underlying model, - // we have to do this to not emit objects more than once. - val seenHashes = mutableSetOf() - fun emitObjects(entry: KVEntryReference<*>) { - if (seenHashes.contains(entry.getHash())) return - seenHashes.add(entry.getHash()) - bulkQuery.get(entry).onSuccess { - val value = checkNotNull(it) { "No value received for ${entry.getHash()}" } - // Use `send` instead of `trySend`, - // because `trySend` fails if the channel capacity is full. - // This might happen if the data is produced faster than consumed. - // A better solution would be to have bulk queries which itself are asynchronous - // but doing that needs more consideration. - runBlocking { - // Maybe we should avoid Flow> and use Flow. - // This needs profiling. - channel.send(entry.getHash() to value.serialize()) - } - for (referencedEntry in value.getReferencedEntries()) { - emitObjects(referencedEntry) + // Our bulk query is blocking, therefor we explicitly launch it on one of the Dispatchers.IO. + // Without it, the consumer could accidentally start the flow on this thread and block it. + launch(Dispatchers.IO) { + // Use a bulk query to make as few request to the underlying store as possible. + val bulkQuery = objectStore.newBulkQuery() + // It is unsatisfactory that we have to keep already emitted hashes in memory. + // But without changing the underlying model, + // we have to do this to not emit objects more than once. + val seenHashes = mutableSetOf() + fun emitObjects(entry: KVEntryReference<*>) { + if (seenHashes.contains(entry.getHash())) return + seenHashes.add(entry.getHash()) + bulkQuery.get(entry).onSuccess { + val value = checkNotNull(it) { "No value received for ${entry.getHash()}" } + // Use `send` instead of `trySend`, + // because `trySend` fails if the channel capacity is full. + // This might happen if the data is produced faster than consumed. + // A better solution would be to have bulk queries which itself are asynchronous + // but doing that needs more consideration. + runBlocking { + // Maybe we should avoid Flow> and use Flow. + // This needs profiling + channel.send(entry.getHash() to value.serialize()) + } + for (referencedEntry in value.getReferencedEntries()) { + emitObjects(referencedEntry) + } } } + emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER)) + LOG.debug("Starting to bulk query all objects.") + bulkQuery.process() } - emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER)) - bulkQuery.process() } val checkedHashObjectFlow = hashObjectFlow.checkObjectHashes() val objectData = ObjectDataFlow(checkedHashObjectFlow) @@ -347,6 +344,7 @@ class RepositoriesManager(val client: LocalModelClient) { } companion object { + private val LOG = LoggerFactory.getLogger(RepositoriesManager::class.java) const val KEY_PREFIX = ":v2" private const val REPOSITORIES_LIST_KEY = "$KEY_PREFIX:repositories" const val LEGACY_SERVER_ID_KEY = "repositoryId" @@ -382,3 +380,38 @@ class ObjectDataFlow(private val hashObjectFlow: Flow>) : O private fun Flow>.checkObjectHashes(): Flow> { return onEach { HashUtil.checkObjectHash(it.first, it.second) } } + +class VersionDeltaCache(val store: IDeserializingKeyValueStore) { + + companion object { + private val LOG = LoggerFactory.getLogger(VersionDeltaCache::class.java) + } + + private val cacheMap = LRUMap, SoftReference>>(10) + + suspend fun getOrComputeDelta(versionHash: String, baseVersionHash: String): ObjectDataMap { + val deferredDelta = synchronized(cacheMap) { + val key = versionHash to baseVersionHash + val existingDeferredDelta = cacheMap[key]?.get() + if (existingDeferredDelta != null) { + LOG.debug("Version delta found in cache for {}.", key) + existingDeferredDelta + } else { + LOG.debug("Version delta not found in cache for {}.", key) + val version = CLVersion(versionHash, store) + val baseVersion = CLVersion(baseVersionHash, store) + val newDeferredDelta = runBlocking(Dispatchers.IO) { + async { + LOG.debug("Computing for delta for {}.", key) + val result = ObjectDataMap(version.computeDelta(baseVersion)) + LOG.debug("Computed version delta for {}.", key) + result + } + } + cacheMap[key] = SoftReference(newDeferredDelta) + newDeferredDelta + } + } + return deferredDelta.await() + } +} From 79af74883c9e553466a191921e1946617a8a96f3 Mon Sep 17 00:00:00 2001 From: Oleksandr Dzhychko Date: Fri, 8 Mar 2024 09:15:47 +0100 Subject: [PATCH 4/4] perf(model-server): avoid starting suspendable transaction inside suspendable transactions This avoids blocking one additional thread. --- .../handlers/KeyValueLikeModelServer.kt | 20 ++++++-- .../server/handlers/RepositoriesManager.kt | 49 +++++++++++++++---- 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt index 625567a6fe..925c52aa52 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/KeyValueLikeModelServer.kt @@ -112,7 +112,11 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { if (isHealthy()) { call.respondText(text = "healthy", contentType = ContentType.Text.Plain, status = HttpStatusCode.OK) } else { - call.respondText(text = "not healthy", contentType = ContentType.Text.Plain, status = HttpStatusCode.InternalServerError) + call.respondText( + text = "not healthy", + contentType = ContentType.Text.Plain, + status = HttpStatusCode.InternalServerError, + ) } } get { @@ -317,18 +321,23 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { HashUtil.isSha256(key) -> { hashedObjects[key] = value ?: throw IllegalArgumentException("No value provided for $key") } + BranchReference.tryParseBranch(key) != null -> { branchChanges[BranchReference.tryParseBranch(key)!!] = value } + key.startsWith(PROTECTED_PREFIX) -> { throw NoPermissionException("Access to keys starting with '$PROTECTED_PREFIX' is only permitted to the model server itself.") } + key.startsWith(RepositoriesManager.KEY_PREFIX) -> { throw NoPermissionException("Access to keys starting with '${RepositoriesManager.KEY_PREFIX}' is only permitted to the model server itself.") } + key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2 -> { throw NoPermissionException("'$key' is read-only.") } + else -> { userDefinedEntries[key] = value } @@ -342,9 +351,9 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { storeClient.putAll(userDefinedEntries) for ((branch, value) in branchChanges) { if (value == null) { - runBlocking { repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName)) } + repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf(branch.branchName)) } else { - runBlocking { repositoriesManager.mergeChanges(branch, value) } + repositoriesManager.mergeChangesBlocking(branch, value) } } } @@ -366,7 +375,10 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) { if (key.startsWith(RepositoriesManager.KEY_PREFIX)) { throw NoPermissionException("Access to keys starting with '${RepositoriesManager.KEY_PREFIX}' is only permitted to the model server itself.") } - if ((key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2) && type.includes(EPermissionType.WRITE)) { + if ((key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2) && type.includes( + EPermissionType.WRITE, + ) + ) { throw NoPermissionException("'$key' is read-only.") } if (HashUtil.isSha256(key)) { diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 7420bc4f3d..cc2cc639b7 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -200,8 +200,18 @@ class RepositoriesManager(val client: LocalModelClient) { } suspend fun removeBranches(repository: RepositoryId, branchNames: Set) { + return store.runTransactionSuspendable { + removeBranchesBlocking(repository, branchNames) + } + } + + /** + * Same as [removeBranches] but blocking. + * Caller is expected to execute it outside the request thread. + */ + fun removeBranchesBlocking(repository: RepositoryId, branchNames: Set) { if (branchNames.isEmpty()) return - store.runTransactionSuspendable { + store.runTransaction { val key = branchListKey(repository) val existingBranches = store[key]?.lines()?.toSet() ?: emptySet() val remainingBranches = existingBranches - branchNames @@ -213,9 +223,18 @@ class RepositoriesManager(val client: LocalModelClient) { } suspend fun mergeChanges(branch: BranchReference, newVersionHash: String): String { - var result: String? = null - store.runTransactionSuspendable { - val headHash = getVersionHashInsideTransaction(branch) + return store.runTransactionSuspendable { + mergeChangesBlocking(branch, newVersionHash) + } + } + + /** + * Same as [mergeChanges] but blocking. + * Caller is expected to execute it outside the request thread. + */ + fun mergeChangesBlocking(branch: BranchReference, newVersionHash: String): String { + return store.runTransaction { + val headHash = getVersionHashBlocking(branch) val mergedHash = if (headHash == null) { newVersionHash } else { @@ -232,9 +251,8 @@ class RepositoriesManager(val client: LocalModelClient) { putVersionHash(branch, mergedHash) ensureRepositoriesAreInList(setOf(branch.repositoryId)) ensureBranchesAreInList(branch.repositoryId, setOf(branch.branchName)) - result = mergedHash + mergedHash } - return result!! } suspend fun getVersion(branch: BranchReference): CLVersion? { @@ -243,12 +261,24 @@ class RepositoriesManager(val client: LocalModelClient) { suspend fun getVersionHash(branch: BranchReference): String? { return store.runTransactionSuspendable { - getVersionHashInsideTransaction(branch) + getVersionHashBlocking(branch) } } - private fun getVersionHashInsideTransaction(branch: BranchReference): String? { - return store[branchKey(branch)] ?: store[legacyBranchKey(branch)]?.also { store.put(branchKey(branch), it, true) } + /** + * Same as [getVersionHash] but blocking. + * Caller is expected to execute it outside the request thread. + */ + private fun getVersionHashBlocking(branch: BranchReference): String? { + return store.runTransaction { + store[branchKey(branch)] ?: store[legacyBranchKey(branch)]?.also { + store.put( + branchKey(branch), + it, + true, + ) + } + } } private fun putVersionHash(branch: BranchReference, hash: String?) { @@ -368,6 +398,7 @@ class ObjectDataMap(private val byHashObjects: Map) : ObjectData init { HashUtil.checkObjectHashes(byHashObjects) } + override suspend fun asMap(): Map = byHashObjects override fun asFlow(): Flow> = byHashObjects.entries.asFlow().map { it.toPair() } }