From cafe85cecb6ff8aa7f92e937e0d69c9935cb87a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Thu, 12 Dec 2024 08:02:20 +0100 Subject: [PATCH] Fix: Fetching job config user data from Flikn --- .../engine/flink/api/NkGlobalParameters.scala | 10 ++++++--- .../engine/management/FlinkRestManager.scala | 21 ++++++++++++++++--- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala index fabab3aec76..39016fd5173 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala @@ -84,7 +84,7 @@ object NkGlobalParameters { def encode(parameters: NkGlobalParameters): Map[String, String] = { def encodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = { - map.map { case (key, value) => s"$prefix$key" -> value } + map.map { case (key, value) => s"$prefix.$key" -> value } } val baseProperties = Map[String, String]( @@ -100,9 +100,11 @@ object NkGlobalParameters { val configMap = parameters.configParameters .map(ConfigGlobalParametersToMapEncoder.encode) .getOrElse(Map.empty) + val namespaceTagsMap = parameters.namespaceParameters .map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix)) .getOrElse(Map.empty) + val additionalInformationMap = encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix) @@ -112,8 +114,8 @@ object NkGlobalParameters { def decode(map: Map[String, String]): Option[NkGlobalParameters] = { def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = { map.view - .filter { case (key, _) => key.startsWith(prefix) } - .map { case (key, value) => key.stripPrefix(prefix) -> value } + .filter { case (key, _) => key.startsWith(s"$prefix.") } + .map { case (key, value) => key.stripPrefix(s"$prefix.") -> value } .toMap } @@ -130,10 +132,12 @@ object NkGlobalParameters { val buildInfoOpt = map.get("buildInfo") val configParameters = ConfigGlobalParametersToMapEncoder.decode(map) + val namespaceTags = { val namespaceTagsMap = decodeWithKeyPrefix(map, namespaceTagsMapPrefix) if (namespaceTagsMap.isEmpty) None else Some(NamespaceMetricsTags(namespaceTagsMap)) } + val additionalInformation = decodeWithKeyPrefix(map, additionalInformationMapPrefix) for { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index 2d74cc8861f..377b345d151 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.management import com.typesafe.scalalogging.LazyLogging +import io.circe.Json import org.apache.flink.api.common.{JobID, JobStatus} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment._ @@ -158,13 +159,27 @@ class FlinkRestManager( private def withParsedJobConfig(jobId: String, name: ProcessName): Future[Option[ParsedJobConfig]] = { client.getJobConfig(jobId).map { executionConfig => val userConfig = executionConfig.`user-config` + for { version <- userConfig.get("versionId").flatMap(_.asString).map(_.toLong).map(VersionId(_)) user <- userConfig.get("user").map(_.asString.getOrElse("")) modelVersion = userConfig.get("modelVersion").flatMap(_.asString).map(_.toInt) - processId = ProcessId(userConfig.get("processId").flatMap(_.asString).map(_.toLong).getOrElse(-1L)) - labels = userConfig.get("labels").flatMap(_.asArray).map(_.toList.flatMap(_.asString)).toList.flatten - deploymentId = userConfig.get("deploymentId").flatMap(_.asString).map(DeploymentId(_)) + processId = ProcessId( + userConfig.get("processId").flatMap(_.asString).map(_.toLong).getOrElse(-1L) + ) + labels = userConfig + .get("labels") + .flatMap(_.asArray) + .map(_.toList.flatMap(_.asString)) + .toList + .flatten + + // Back compatibility + deploymentId = userConfig + .get("deploymentId") + .orElse(userConfig.get("additionalInformation.deploymentId")) + .flatMap(_.asString) + .map(DeploymentId(_)) } yield { val versionDetails = ProcessVersion(version, name, processId, labels, user, modelVersion) ParsedJobConfig(versionDetails, deploymentId)