Skip to content

Commit

Permalink
[NU-1790] Bump Flink to 1.19.1 (#6805)
Browse files Browse the repository at this point in the history
bump flink to 1.19.1
  • Loading branch information
mslabek authored Sep 20, 2024
1 parent 838f5ce commit 627c288
Show file tree
Hide file tree
Showing 50 changed files with 379 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package pl.touk.nussknacker.engine.benchmarks.serialization

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.github.ghik.silencer.silent

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -21,6 +22,7 @@ class SerializationBenchmarkSetup[T](

private val data = new ByteArrayOutputStream(10 * 1024)

@silent("deprecated")
private val serializer = typeInfo.createSerializer(config)

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.benchmarks.serialization.avro

import com.typesafe.config.ConfigFactory
import org.apache.avro.generic.GenericData
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.openjdk.jmh.annotations._
import pl.touk.nussknacker.engine.benchmarks.serialization.SerializationBenchmarkSetup
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ lazy val commonSettings =
// Note: when updating check versions in 'flink*V' below, because some libraries must be fixed at versions provided
// by Flink, or jobs may fail in runtime when Flink is run with 'classloader.resolve-order: parent-first'.
// You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file.
val flinkV = "1.18.1"
val flinkConnectorKafkaV = "3.1.0-1.18"
val flinkV = "1.19.1"
val flinkConnectorKafkaV = "3.2.0-1.19"
val flinkCommonsLang3V = "3.12.0"
val flinkCommonsTextV = "1.10.0"
val flinkCommonsIOV = "2.11.0"
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* [#6176](https://github.com/TouK/nussknacker/pull/6176) Update most dependencies to latest versions, most important ones:
* Jackson 2.15.4 -> 2.17.2
* cats 2.10 -> 2.12
* [#6805](https://github.com/TouK/nussknacker/pull/6805) Support for Flink 1.19.1

## 1.17

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
import pl.touk.nussknacker.engine.flink.api.NkGlobalParameters.NkGlobalParametersToMapEncoder


import _root_.java.util
import scala.jdk.CollectionConverters._
Expand All @@ -23,19 +26,9 @@ case class NkGlobalParameters(

// here we decide which configuration properties should be shown in REST API etc.
// NOTE: some of the information is used in FlinkRestManager - any changes here should be reflected there
override def toMap: util.Map[String, String] = {

val baseProperties = Map[String, String](
"buildInfo" -> buildInfo,
"versionId" -> processVersion.versionId.value.toString,
"processId" -> processVersion.processId.value.toString,
"modelVersion" -> processVersion.modelVersion.map(_.toString).orNull,
"user" -> processVersion.user
)
val configMap = baseProperties ++ additionalInformation
override def toMap: util.Map[String, String] =
// we wrap in HashMap because .asJava creates not-serializable map in 2.11
new util.HashMap(configMap.filterNot(_._2 == null).asJava)
}
new util.HashMap(NkGlobalParametersToMapEncoder.encode(this).filterNot(_._2 == null).asJava)

}

Expand Down Expand Up @@ -84,8 +77,106 @@ object NkGlobalParameters {
ec.setGlobalJobParameters(globalParameters)
}

def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] = Option(ec.getGlobalJobParameters).collect {
case a: NkGlobalParameters => a
def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] =
NkGlobalParametersToMapEncoder.decode(ec.getGlobalJobParameters.toMap.asScala.toMap)

private object NkGlobalParametersToMapEncoder {

def encode(parameters: NkGlobalParameters): Map[String, String] = {
def encodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.map { case (key, value) => s"$prefix$key" -> value }
}

val baseProperties = Map[String, String](
"buildInfo" -> parameters.buildInfo,
"versionId" -> parameters.processVersion.versionId.value.toString,
"processId" -> parameters.processVersion.processId.value.toString,
"modelVersion" -> parameters.processVersion.modelVersion.map(_.toString).orNull,
"user" -> parameters.processVersion.user,
"processName" -> parameters.processVersion.processName.value
)

val configMap = parameters.configParameters
.map(ConfigGlobalParametersToMapEncoder.encode)
.getOrElse(Map.empty)
val namespaceTagsMap = parameters.namespaceParameters
.map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix))
.getOrElse(Map.empty)
val additionalInformationMap =
encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix)

baseProperties ++ additionalInformationMap ++ configMap ++ namespaceTagsMap
}

def decode(map: Map[String, String]): Option[NkGlobalParameters] = {
def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.view
.filter { case (key, _) => key.startsWith(prefix) }
.map { case (key, value) => key.stripPrefix(prefix) -> value }
.toMap
}

val processVersionOpt = for {
versionId <- map.get("versionId").map(v => VersionId(v.toLong))
processId <- map.get("processId").map(pid => ProcessId(pid.toLong))
processName <- map.get("processName").map(ProcessName(_))
user <- map.get("user")
} yield {
val modelVersion = map.get("modelVersion").map(_.toInt)
ProcessVersion(versionId, processName, processId, user, modelVersion)
}
val buildInfoOpt = map.get("buildInfo")

val configParameters = ConfigGlobalParametersToMapEncoder.decode(map)
val namespaceTags = {
val namespaceTagsMap = decodeWithKeyPrefix(map, namespaceTagsMapPrefix)
if (namespaceTagsMap.isEmpty) None else Some(NamespaceMetricsTags(namespaceTagsMap))
}
val additionalInformation = decodeWithKeyPrefix(map, additionalInformationMapPrefix)

for {
processVersion <- processVersionOpt
buildInfo <- buildInfoOpt
} yield NkGlobalParameters(buildInfo, processVersion, configParameters, namespaceTags, additionalInformation)
}

private object ConfigGlobalParametersToMapEncoder {

def encode(params: ConfigGlobalParameters): Map[String, String] = {
Map(
s"$prefix.explicitUidInStatefulOperators" -> params.explicitUidInStatefulOperators
.map(_.toString)
.orNull,
s"$prefix.useIOMonadInInterpreter" -> params.useIOMonadInInterpreter
.map(_.toString)
.orNull,
s"$prefix.forceSyncInterpretationForSyncScenarioPart" -> params.forceSyncInterpretationForSyncScenarioPart
.map(_.toString)
.orNull
)
}

def decode(map: Map[String, String]): Option[ConfigGlobalParameters] = {
val mapContainsConfigGlobalParams = map.view.exists { case (key, _) => key.startsWith(prefix) }
if (mapContainsConfigGlobalParams) {
Some(
ConfigGlobalParameters(
explicitUidInStatefulOperators = map.get(s"$prefix.explicitUidInStatefulOperators").map(_.toBoolean),
useIOMonadInInterpreter = map.get(s"$prefix.useIOMonadInInterpreter").map(_.toBoolean),
forceSyncInterpretationForSyncScenarioPart =
map.get(s"$prefix.forceSyncInterpretationForSyncScenarioPart").map(_.toBoolean)
)
)
} else {
None
}
}

private val prefix = "configParameters"
}

private val namespaceTagsMapPrefix = "namespaceTags"
private val additionalInformationMapPrefix = "additionalInformation"
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.api.datastream

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
Expand All @@ -10,6 +11,7 @@ object DataStreamImplicits {

implicit class DataStreamExtension[T](stream: DataStream[T]) {

@silent("deprecated")
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
val cleanFun = stream.getExecutionEnvironment.clean(fun)
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ trait TypeInformationDetection extends Serializable {

def forType[T](typingResult: TypingResult): TypeInformation[T]

def priority: Int

}

object TypeInformationDetection {
Expand All @@ -50,11 +52,7 @@ object TypeInformationDetection {
s"Classloader: ${printClassloaderDebugDetails(classloader)}. " +
s"Ensure that your classpath is correctly configured, flinkExecutor.jar is probably missing"
)
case moreThanOne =>
throw new IllegalStateException(
s"More than one ${classOf[TypeInformationDetection].getSimpleName} implementations on the classpath: $moreThanOne. " +
s"Classloader: ${printClassloaderDebugDetails(classloader)}"
)
case moreThanOne => moreThanOne.maxBy(_.priority)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.typeinformation

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
Expand All @@ -10,6 +11,7 @@ import scala.reflect.{ClassTag, classTag}
class ConcreteCaseClassTypeInfo[T <: Product](cls: Class[T], fields: List[(String, TypeInformation[_])])
extends CaseClassTypeInfo[T](cls, Array.empty, fields.map(_._2), fields.map(_._1)) {

@silent("deprecated")
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = {
new ScalaCaseClassSerializer[T](cls, fields.map(_._2.createSerializer(config)).toArray)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import cats.data.Validated.Valid
import com.github.ghik.silencer.silent
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
Expand All @@ -12,11 +13,11 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown}
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
import pl.touk.nussknacker.engine.process.typeinformation.TypingResultAwareTypeInformationDetection

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.util.Random

@silent("deprecated")
class HyperLogLogPlusAggregatorSpec extends AnyFunSuite with Matchers {

// the aim of this test is to be able to test different parameters easily
Expand Down Expand Up @@ -73,8 +74,9 @@ class HyperLogLogPlusAggregatorSpec extends AnyFunSuite with Matchers {
val typeInfo = rawTypeInfo.asInstanceOf[TypeInformation[CardinalityWrapper]]
val serializer = typeInfo.createSerializer(ex)

val compatibility =
serializer.snapshotConfiguration().resolveSchemaCompatibility(typeInfo.createSerializer(ex))
val compatibility = serializer
.snapshotConfiguration()
.resolveSchemaCompatibility(typeInfo.createSerializer(ex))
compatibility.isCompatibleAsIs shouldBe true

val data = new ByteArrayOutputStream(1024)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.{AggregateFunction, RuntimeContext}
import org.apache.flink.api.common.state.AggregatingStateDescriptor
import org.apache.flink.configuration.Configuration
Expand Down Expand Up @@ -56,6 +57,7 @@ object OnEventTriggerWindowOperator {

}

@silent("deprecated")
class OnEventTriggerWindowOperator[A](
stream: KeyedStream[Input[A], String],
fctx: FlinkCustomNodeContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import com.github.ghik.silencer.silent
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
Expand Down Expand Up @@ -88,6 +89,7 @@ object transformers {
)
}

@silent("deprecated")
def tumblingTransformer(
groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
Expand Down Expand Up @@ -152,6 +154,7 @@ object transformers {
)

// Experimental component, API may change in the future
@silent("deprecated")
def sessionWindowTransformer(
groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.table.utils

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.DataTypes
Expand All @@ -8,6 +9,7 @@ import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions.LogicalTypeExtension

@silent("deprecated")
class DataTypesExtensionsSpec extends AnyFunSuiteLike with Matchers {

test("to typing result conversion for raw type") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.compiler

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.RuntimeContext
import pl.touk.nussknacker.engine.api.JobData
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
Expand All @@ -14,6 +15,7 @@ case class FlinkEngineRuntimeContextImpl(
metricsProvider: MetricsProviderForScenario
) extends FlinkEngineRuntimeContext {

@silent("deprecated")
override def contextIdGenerator(nodeId: String): ContextIdGenerator =
new IncContextIdGenerator(jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getIndexOfThisSubtask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.process.compiler
import cats.data.NonEmptyList
import com.codahale.metrics
import com.codahale.metrics.SlidingTimeWindowReservoir
import com.github.ghik.silencer.silent
import org.apache.flink
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
Expand Down Expand Up @@ -52,6 +53,7 @@ class FlinkMetricsProviderForScenario(runtimeContext: RuntimeContext) extends Ba
??? // Shouldn't be needed because Flink jobs are recreated "from scratch" and no cleanup of metrics during cancel is needed
}

@silent("deprecated")
private def groupsWithName(nameParts: NonEmptyList[String], tags: Map[String, String]): (MetricGroup, String) = {
val namespaceTags = extractTags(NkGlobalParameters.readFromContext(runtimeContext.getExecutionConfig))
tagMode(nameParts, tags ++ namespaceTags)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.compiler

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.connector.source.Boundedness
import pl.touk.nussknacker.engine.ModelData
Expand Down Expand Up @@ -138,6 +139,8 @@ class TestFlinkExceptionHandler(
listeners: Seq[ProcessListener],
classLoader: ClassLoader
) extends FlinkExceptionHandler(metaData, modelDependencies, listeners, classLoader) {

@silent("deprecated")
override def restartStrategy: RestartStrategies.RestartStrategyConfiguration = RestartStrategies.noRestart()

override val consumer: FlinkEspExceptionConsumer = _ => {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.exception

import com.github.ghik.silencer.silent
import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus.{booleanValueReader, optionValueReader, stringValueReader, toFicusConfig}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
Expand Down Expand Up @@ -47,6 +48,7 @@ class FlinkExceptionHandler(
classLoader: ClassLoader
) extends ExceptionHandler {

@silent("deprecated")
def restartStrategy: RestartStrategies.RestartStrategyConfiguration =
RestartStrategyFromConfiguration.readFromConfiguration(modelDependencies.config, metaData)

Expand Down
Loading

0 comments on commit 627c288

Please sign in to comment.