Skip to content

Commit

Permalink
Fixed: decoding / encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Dec 12, 2024
1 parent 4ab6fe4 commit 86533d3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,11 @@ object NkGlobalParameters {
val configMap = parameters.configParameters
.map(ConfigGlobalParametersToMapEncoder.encode)
.getOrElse(Map.empty)

val namespaceTagsMap = parameters.namespaceParameters
.map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix))
.getOrElse(Map.empty)

val additionalInformationMap =
encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix)

Expand All @@ -112,8 +114,8 @@ object NkGlobalParameters {
def decode(map: Map[String, String]): Option[NkGlobalParameters] = {
def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.view
.filter { case (key, _) => key.startsWith(prefix) }
.map { case (key, value) => key.stripPrefix(prefix) -> value }
.filter { case (key, _) => key.startsWith(s"$prefix.") }
.map { case (key, value) => key.stripPrefix(s"$prefix.") -> value }
.toMap
}

Expand All @@ -130,10 +132,12 @@ object NkGlobalParameters {
val buildInfoOpt = map.get("buildInfo")

val configParameters = ConfigGlobalParametersToMapEncoder.decode(map)

val namespaceTags = {
val namespaceTagsMap = decodeWithKeyPrefix(map, namespaceTagsMapPrefix)
if (namespaceTagsMap.isEmpty) None else Some(NamespaceMetricsTags(namespaceTagsMap))
}

val additionalInformation = decodeWithKeyPrefix(map, additionalInformationMapPrefix)

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ class FlinkRestManager(
val userConfig = executionConfig.`user-config`

for {
version <- userConfig.get("additionalInformation.versionId").flatMap(_.asString).map(_.toLong).map(VersionId(_))
user <- userConfig.get("additionalInformation.user").map(_.asString.getOrElse(""))
modelVersion = userConfig.get("additionalInformation.modelVersion").flatMap(_.asString).map(_.toInt)
version <- userConfig.get("versionId").flatMap(_.asString).map(_.toLong).map(VersionId(_))
user <- userConfig.get("user").map(_.asString.getOrElse(""))
modelVersion = userConfig.get("modelVersion").flatMap(_.asString).map(_.toInt)
processId = ProcessId(
userConfig.get("additionalInformation.processId").flatMap(_.asString).map(_.toLong).getOrElse(-1L)
userConfig.get("processId").flatMap(_.asString).map(_.toLong).getOrElse(-1L)
)
labels = userConfig
.get("additionalInformation.labels")
.get("labels")
.flatMap(_.asArray)
.map(_.toList.flatMap(_.asString))
.toList
Expand Down

0 comments on commit 86533d3

Please sign in to comment.