Skip to content

Commit

Permalink
Fix: Fetching user config data from Flink (#7320)
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki authored Dec 12, 2024
1 parent 3ec9d3e commit 62af6bd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object NkGlobalParameters {

def encode(parameters: NkGlobalParameters): Map[String, String] = {
def encodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.map { case (key, value) => s"$prefix$key" -> value }
map.map { case (key, value) => s"$prefix.$key" -> value }
}

val baseProperties = Map[String, String](
Expand All @@ -100,9 +100,11 @@ object NkGlobalParameters {
val configMap = parameters.configParameters
.map(ConfigGlobalParametersToMapEncoder.encode)
.getOrElse(Map.empty)

val namespaceTagsMap = parameters.namespaceParameters
.map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix))
.getOrElse(Map.empty)

val additionalInformationMap =
encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix)

Expand All @@ -112,8 +114,8 @@ object NkGlobalParameters {
def decode(map: Map[String, String]): Option[NkGlobalParameters] = {
def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.view
.filter { case (key, _) => key.startsWith(prefix) }
.map { case (key, value) => key.stripPrefix(prefix) -> value }
.filter { case (key, _) => key.startsWith(s"$prefix.") }
.map { case (key, value) => key.stripPrefix(s"$prefix.") -> value }
.toMap
}

Expand All @@ -130,10 +132,12 @@ object NkGlobalParameters {
val buildInfoOpt = map.get("buildInfo")

val configParameters = ConfigGlobalParametersToMapEncoder.decode(map)

val namespaceTags = {
val namespaceTagsMap = decodeWithKeyPrefix(map, namespaceTagsMapPrefix)
if (namespaceTagsMap.isEmpty) None else Some(NamespaceMetricsTags(namespaceTagsMap))
}

val additionalInformation = decodeWithKeyPrefix(map, additionalInformationMapPrefix)

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,21 @@ class FlinkRestManager(
private def withParsedJobConfig(jobId: String, name: ProcessName): Future[Option[ParsedJobConfig]] = {
client.getJobConfig(jobId).map { executionConfig =>
val userConfig = executionConfig.`user-config`

for {
version <- userConfig.get("versionId").flatMap(_.asString).map(_.toLong).map(VersionId(_))
user <- userConfig.get("user").map(_.asString.getOrElse(""))
modelVersion = userConfig.get("modelVersion").flatMap(_.asString).map(_.toInt)
processId = ProcessId(userConfig.get("processId").flatMap(_.asString).map(_.toLong).getOrElse(-1L))
labels = userConfig.get("labels").flatMap(_.asArray).map(_.toList.flatMap(_.asString)).toList.flatten
deploymentId = userConfig.get("deploymentId").flatMap(_.asString).map(DeploymentId(_))
processId = ProcessId(
userConfig.get("processId").flatMap(_.asString).map(_.toLong).getOrElse(-1L)
)
labels = userConfig
.get("labels")
.flatMap(_.asArray)
.map(_.toList.flatMap(_.asString))
.toList
.flatten
deploymentId = userConfig.get("additionalInformation.deploymentId").flatMap(_.asString).map(DeploymentId(_))
} yield {
val versionDetails = ProcessVersion(version, name, processId, labels, user, modelVersion)
ParsedJobConfig(versionDetails, deploymentId)
Expand Down

0 comments on commit 62af6bd

Please sign in to comment.