Skip to content

Commit

Permalink
Merge pull request #563 from modelix/fix/compute-delta-outside-reques…
Browse files Browse the repository at this point in the history
…t-thread

fix(model-server): do not execute long-running repository operation on request threads
  • Loading branch information
odzhychko authored Mar 11, 2024
2 parents d560134 + 79af748 commit 45f7d53
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
val params = call.request.queryParameters
val limit = toInt(params["limit"], 500)
val skip = toInt(params["skip"], 0)
val latestVersion = repositoriesManager.getVersion(branch)
checkNotNull(latestVersion) { "Branch not found: $branch" }
call.respondHtmlTemplate(PageWithMenuBar("repos/", "../../..")) {
headContent {
style {
Expand All @@ -80,7 +82,7 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
repositoryPageStyle()
}
bodyContent {
buildRepositoryPage(branch, params["head"], skip, limit)
buildRepositoryPage(branch, latestVersion, params["head"], skip, limit)
}
}
}
Expand All @@ -105,7 +107,7 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
}
}

fun revert(repositoryAndBranch: BranchReference, from: String?, to: String?, author: String?) {
suspend fun revert(repositoryAndBranch: BranchReference, from: String?, to: String?, author: String?) {
val version = repositoriesManager.getVersion(repositoryAndBranch) ?: throw RuntimeException("Branch doesn't exist: $repositoryAndBranch")
val branch = OTBranch(PBranch(version.tree, client.idGenerator), client.idGenerator, client.storeCache!!)
branch.runWriteT { t ->
Expand Down Expand Up @@ -160,8 +162,13 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
}
}

private fun FlowContent.buildRepositoryPage(repositoryAndBranch: BranchReference, headHash: String?, skip: Int, limit: Int) {
val latestVersion = repositoriesManager.getVersion(repositoryAndBranch) ?: throw RuntimeException("Branch not found: $repositoryAndBranch")
private fun FlowContent.buildRepositoryPage(
repositoryAndBranch: BranchReference,
latestVersion: CLVersion,
headHash: String?,
skip: Int,
limit: Int,
) {
val headVersion = if (headHash == null || headHash.length == 0) latestVersion else CLVersion(headHash, client.storeCache!!)
var rowIndex = 0
h1 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.ktor.server.resources.put
import io.ktor.server.response.respondText
import io.ktor.server.routing.routing
import io.ktor.util.pipeline.PipelineContext
import kotlinx.coroutines.runBlocking
import kotlinx.html.br
import kotlinx.html.div
import kotlinx.html.h1
Expand All @@ -48,12 +49,12 @@ import org.modelix.model.lazy.RepositoryId
import org.modelix.model.persistent.HashUtil
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.pollEntry
import org.modelix.model.server.store.runTransactionSuspendable
import org.modelix.model.server.templates.PageWithMenuBar
import org.slf4j.LoggerFactory
import java.io.IOException
import java.util.*
import java.util.regex.Pattern
import kotlin.collections.LinkedHashMap

val PERMISSION_MODEL_SERVER = "model-server".asResource()
val MODEL_SERVER_ENTRY = KeycloakResourceType("model-server-entry", KeycloakScope.READ_WRITE_DELETE)
Expand Down Expand Up @@ -85,7 +86,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
// request to initialize it lazily, would make the code less robust.
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
// the special conditions in the affected requests to be updated.
repositoriesManager.maybeInitAndGetSeverId()
runBlocking { repositoriesManager.maybeInitAndGetSeverId() }
application.apply {
modelServerModule()
}
Expand All @@ -111,7 +112,11 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
if (isHealthy()) {
call.respondText(text = "healthy", contentType = ContentType.Text.Plain, status = HttpStatusCode.OK)
} else {
call.respondText(text = "not healthy", contentType = ContentType.Text.Plain, status = HttpStatusCode.InternalServerError)
call.respondText(
text = "not healthy",
contentType = ContentType.Text.Plain,
status = HttpStatusCode.InternalServerError,
)
}
}
get<Paths.getHeaders> {
Expand Down Expand Up @@ -283,7 +288,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
return result
}

protected fun CallContext.putEntries(newEntries: Map<String, String?>) {
protected suspend fun CallContext.putEntries(newEntries: Map<String, String?>) {
val referencedKeys: MutableSet<String> = HashSet()
for ((key, value) in newEntries) {
checkKeyPermission(key, EPermissionType.WRITE)
Expand Down Expand Up @@ -316,18 +321,23 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
HashUtil.isSha256(key) -> {
hashedObjects[key] = value ?: throw IllegalArgumentException("No value provided for $key")
}

BranchReference.tryParseBranch(key) != null -> {
branchChanges[BranchReference.tryParseBranch(key)!!] = value
}

key.startsWith(PROTECTED_PREFIX) -> {
throw NoPermissionException("Access to keys starting with '$PROTECTED_PREFIX' is only permitted to the model server itself.")
}

key.startsWith(RepositoriesManager.KEY_PREFIX) -> {
throw NoPermissionException("Access to keys starting with '${RepositoriesManager.KEY_PREFIX}' is only permitted to the model server itself.")
}

key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2 -> {
throw NoPermissionException("'$key' is read-only.")
}

else -> {
userDefinedEntries[key] = value
}
Expand All @@ -336,14 +346,14 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {

HashUtil.checkObjectHashes(hashedObjects)

repositoriesManager.client.store.runTransaction {
repositoriesManager.client.store.runTransactionSuspendable {
storeClient.putAll(hashedObjects)
storeClient.putAll(userDefinedEntries)
for ((branch, value) in branchChanges) {
if (value == null) {
repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName))
repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf(branch.branchName))
} else {
repositoriesManager.mergeChanges(branch, value)
repositoriesManager.mergeChangesBlocking(branch, value)
}
}
}
Expand All @@ -365,7 +375,10 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
if (key.startsWith(RepositoriesManager.KEY_PREFIX)) {
throw NoPermissionException("Access to keys starting with '${RepositoriesManager.KEY_PREFIX}' is only permitted to the model server itself.")
}
if ((key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2) && type.includes(EPermissionType.WRITE)) {
if ((key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2) && type.includes(
EPermissionType.WRITE,
)
) {
throw NoPermissionException("'$key' is read-only.")
}
if (HashUtil.isSha256(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onEmpty
import kotlinx.coroutines.flow.withIndex
import kotlinx.coroutines.withContext
Expand All @@ -61,7 +60,6 @@ import org.modelix.model.operations.OTBranch
import org.modelix.model.persistent.HashUtil
import org.modelix.model.server.api.v2.VersionDelta
import org.modelix.model.server.api.v2.VersionDeltaStream
import org.modelix.model.server.api.v2.toMap
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.LocalModelClient
import org.modelix.modelql.server.ModelQLServer
Expand Down Expand Up @@ -231,9 +229,8 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
val delta = VersionDelta(
newVersionHash,
lastVersionHash,
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).toMap(),
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).asMap(),
)
delta.checkObjectHashes()
send(Json.encodeToString(delta))
lastVersionHash = newVersionHash
}
Expand Down Expand Up @@ -370,16 +367,14 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
val delta = VersionDelta(
versionHash,
baseVersionHash,
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).toMap(),
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).asMap(),
)
delta.checkObjectHashes()
respond(delta)
}

private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) {
respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) {
repositoriesManager.computeDelta(versionHash, baseVersionHash)
.checkObjectHashes()
repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow()
.flatten()
.withSeparator("\n")
.onEmpty { emit(versionHash) }
Expand Down Expand Up @@ -410,7 +405,3 @@ private fun Flow<String>.withSeparator(separator: String) = flow {
emit(it)
}
}

private fun <V : String?> Flow<Pair<String, V>>.checkObjectHashes(): Flow<Pair<String, V>> {
return onEach { HashUtil.checkObjectHash(it.first, it.second) }
}
Loading

0 comments on commit 45f7d53

Please sign in to comment.