Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1790] Bump Flink to 1.19 #6805

Merged
merged 48 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
1e92478
attempt to compile with flink 1.19
mslabek Aug 30, 2024
2d1516f
restart fix
mslabek Aug 30, 2024
705da6c
more silents
mslabek Aug 30, 2024
c3148fd
more silents
mslabek Aug 30, 2024
9d5aa8d
bump flink to 1.19.1
mslabek Sep 2, 2024
e19a73f
silent
mslabek Sep 2, 2024
cef838c
set to test only 2.12
mslabek Sep 3, 2024
aa89606
update integration tests
mslabek Sep 3, 2024
9bcb193
Revert "set to test only 2.12"
mslabek Sep 3, 2024
af091bf
test state interval check hypothesis
mslabek Sep 3, 2024
ef0ef72
test hypothesis 2
mslabek Sep 3, 2024
1c28979
encode/decode NkGlobalParameters
mslabek Sep 3, 2024
269bfcd
bump flink in example
mslabek Sep 3, 2024
57b5979
fix build
mslabek Sep 3, 2024
c63dade
fixes in flink config for example installation
mslabek Sep 4, 2024
4a71e95
limit places incompatible with Flink 1.18
mslabek Sep 4, 2024
590915b
cleanup encoding/decoding of NkGlobalParameters
mslabek Sep 4, 2024
1322109
cleanup test utils check
mslabek Sep 4, 2024
053cbeb
make cypress pass
mslabek Sep 5, 2024
b20f8f4
debug ci
mslabek Sep 5, 2024
a63b343
Revert "debug ci"
mslabek Sep 6, 2024
022010b
Revert "make cypress pass"
mslabek Sep 6, 2024
1bb07cf
remove comment
mslabek Sep 6, 2024
eba0cef
Revert "Revert "make cypress pass""
mslabek Sep 8, 2024
d350ab3
empty commit
mslabek Sep 8, 2024
ab49dfa
add separate object for compatibility patch
mslabek Sep 6, 2024
f448a80
add compatibility layers
mslabek Sep 9, 2024
d0e8bca
Revert "Revert "Revert "make cypress pass"""
mslabek Sep 9, 2024
530fa83
update changelog
mslabek Sep 9, 2024
37e7c94
add proper SPI instead of having to patch
mslabek Sep 13, 2024
1a44a43
fix build
mslabek Sep 13, 2024
e6b909a
change serializer compatibility SPI
mslabek Sep 16, 2024
ec428d2
skip cypress
mslabek Sep 18, 2024
5954129
remove Avro serializer SPI
mslabek Sep 18, 2024
b7d840e
add SPI for test
mslabek Sep 18, 2024
3d47e28
skip cypress
mslabek Sep 18, 2024
d85ea72
try to solve SPI problem
mslabek Sep 18, 2024
71e0f97
try to make SPI pluginable
mslabek Sep 19, 2024
1cadb86
fix
mslabek Sep 19, 2024
eabb968
review fixes
mslabek Sep 19, 2024
72dec83
cleanup
mslabek Sep 19, 2024
84d499e
remove process size
mslabek Sep 19, 2024
2162899
change process size
mslabek Sep 19, 2024
775f743
change process size
mslabek Sep 19, 2024
87277ea
use ordering instead of trait hierarchy
mslabek Sep 20, 2024
526bcb7
comment out cypress
mslabek Sep 20, 2024
506394b
restore cypress
mslabek Sep 20, 2024
404849e
Merge branch 'staging' into preview/NU-1790-bump-flink-to-1.19
mslabek Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package pl.touk.nussknacker.engine.benchmarks.serialization

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.github.ghik.silencer.silent

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -21,6 +22,7 @@ class SerializationBenchmarkSetup[T](

private val data = new ByteArrayOutputStream(10 * 1024)

@silent("deprecated")
private val serializer = typeInfo.createSerializer(config)

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.benchmarks.serialization.avro

import com.typesafe.config.ConfigFactory
import org.apache.avro.generic.GenericData
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.openjdk.jmh.annotations._
import pl.touk.nussknacker.engine.benchmarks.serialization.SerializationBenchmarkSetup
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ lazy val commonSettings =
// Note: when updating check versions in 'flink*V' below, because some libraries must be fixed at versions provided
// by Flink, or jobs may fail in runtime when Flink is run with 'classloader.resolve-order: parent-first'.
// You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file.
val flinkV = "1.18.1"
val flinkConnectorKafkaV = "3.1.0-1.18"
val flinkV = "1.19.1"
val flinkConnectorKafkaV = "3.2.0-1.19"
val flinkCommonsLang3V = "3.12.0"
val flinkCommonsTextV = "1.10.0"
val flinkCommonsIOV = "2.11.0"
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* [#6176](https://github.com/TouK/nussknacker/pull/6176) Update most dependencies to latest versions, most important ones:
* Jackson 2.15.4 -> 2.17.2
* cats 2.10 -> 2.12
* [#6805](https://github.com/TouK/nussknacker/pull/6805) Support for Flink 1.19.1

## 1.17

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}
import pl.touk.nussknacker.engine.flink.api.NkGlobalParameters.NkGlobalParametersToMapEncoder


import _root_.java.util
import scala.jdk.CollectionConverters._
Expand All @@ -23,19 +26,9 @@ case class NkGlobalParameters(

// here we decide which configuration properties should be shown in REST API etc.
// NOTE: some of the information is used in FlinkRestManager - any changes here should be reflected there
override def toMap: util.Map[String, String] = {

val baseProperties = Map[String, String](
"buildInfo" -> buildInfo,
"versionId" -> processVersion.versionId.value.toString,
"processId" -> processVersion.processId.value.toString,
"modelVersion" -> processVersion.modelVersion.map(_.toString).orNull,
"user" -> processVersion.user
)
val configMap = baseProperties ++ additionalInformation
override def toMap: util.Map[String, String] =
// we wrap in HashMap because .asJava creates not-serializable map in 2.11
new util.HashMap(configMap.filterNot(_._2 == null).asJava)
}
new util.HashMap(NkGlobalParametersToMapEncoder.encode(this).filterNot(_._2 == null).asJava)

}

Expand Down Expand Up @@ -84,8 +77,106 @@ object NkGlobalParameters {
ec.setGlobalJobParameters(globalParameters)
}

def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] = Option(ec.getGlobalJobParameters).collect {
case a: NkGlobalParameters => a
def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] =
NkGlobalParametersToMapEncoder.decode(ec.getGlobalJobParameters.toMap.asScala.toMap)

private object NkGlobalParametersToMapEncoder {

def encode(parameters: NkGlobalParameters): Map[String, String] = {
def encodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.map { case (key, value) => s"$prefix$key" -> value }
}

val baseProperties = Map[String, String](
"buildInfo" -> parameters.buildInfo,
"versionId" -> parameters.processVersion.versionId.value.toString,
"processId" -> parameters.processVersion.processId.value.toString,
"modelVersion" -> parameters.processVersion.modelVersion.map(_.toString).orNull,
"user" -> parameters.processVersion.user,
"processName" -> parameters.processVersion.processName.value
)

val configMap = parameters.configParameters
.map(ConfigGlobalParametersToMapEncoder.encode)
.getOrElse(Map.empty)
val namespaceTagsMap = parameters.namespaceParameters
.map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix))
.getOrElse(Map.empty)
val additionalInformationMap =
encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix)

baseProperties ++ additionalInformationMap ++ configMap ++ namespaceTagsMap
}

def decode(map: Map[String, String]): Option[NkGlobalParameters] = {
def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = {
map.view
.filter { case (key, _) => key.startsWith(prefix) }
.map { case (key, value) => key.stripPrefix(prefix) -> value }
.toMap
}

val processVersionOpt = for {
versionId <- map.get("versionId").map(v => VersionId(v.toLong))
processId <- map.get("processId").map(pid => ProcessId(pid.toLong))
processName <- map.get("processName").map(ProcessName(_))
user <- map.get("user")
} yield {
val modelVersion = map.get("modelVersion").map(_.toInt)
ProcessVersion(versionId, processName, processId, user, modelVersion)
}
val buildInfoOpt = map.get("buildInfo")

val configParameters = ConfigGlobalParametersToMapEncoder.decode(map)
val namespaceTags = {
val namespaceTagsMap = decodeWithKeyPrefix(map, namespaceTagsMapPrefix)
if (namespaceTagsMap.isEmpty) None else Some(NamespaceMetricsTags(namespaceTagsMap))
}
val additionalInformation = decodeWithKeyPrefix(map, additionalInformationMapPrefix)

for {
processVersion <- processVersionOpt
buildInfo <- buildInfoOpt
} yield NkGlobalParameters(buildInfo, processVersion, configParameters, namespaceTags, additionalInformation)
}

private object ConfigGlobalParametersToMapEncoder {

def encode(params: ConfigGlobalParameters): Map[String, String] = {
Map(
s"$prefix.explicitUidInStatefulOperators" -> params.explicitUidInStatefulOperators
.map(_.toString)
.orNull,
s"$prefix.useIOMonadInInterpreter" -> params.useIOMonadInInterpreter
.map(_.toString)
.orNull,
s"$prefix.forceSyncInterpretationForSyncScenarioPart" -> params.forceSyncInterpretationForSyncScenarioPart
.map(_.toString)
.orNull
)
}

def decode(map: Map[String, String]): Option[ConfigGlobalParameters] = {
val mapContainsConfigGlobalParams = map.view.exists { case (key, _) => key.startsWith(prefix) }
if (mapContainsConfigGlobalParams) {
Some(
ConfigGlobalParameters(
explicitUidInStatefulOperators = map.get(s"$prefix.explicitUidInStatefulOperators").map(_.toBoolean),
useIOMonadInInterpreter = map.get(s"$prefix.useIOMonadInInterpreter").map(_.toBoolean),
forceSyncInterpretationForSyncScenarioPart =
map.get(s"$prefix.forceSyncInterpretationForSyncScenarioPart").map(_.toBoolean)
)
)
} else {
None
}
}

private val prefix = "configParameters"
}

private val namespaceTagsMapPrefix = "namespaceTags"
private val additionalInformationMapPrefix = "additionalInformation"
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.api.datastream

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
Expand All @@ -10,6 +11,7 @@ object DataStreamImplicits {

implicit class DataStreamExtension[T](stream: DataStream[T]) {

@silent("deprecated")
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
val cleanFun = stream.getExecutionEnvironment.clean(fun)
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ trait TypeInformationDetection extends Serializable {

def forType[T](typingResult: TypingResult): TypeInformation[T]

def priority: Int

}

object TypeInformationDetection {
Expand All @@ -50,11 +52,7 @@ object TypeInformationDetection {
s"Classloader: ${printClassloaderDebugDetails(classloader)}. " +
s"Ensure that your classpath is correctly configured, flinkExecutor.jar is probably missing"
)
case moreThanOne =>
throw new IllegalStateException(
s"More than one ${classOf[TypeInformationDetection].getSimpleName} implementations on the classpath: $moreThanOne. " +
s"Classloader: ${printClassloaderDebugDetails(classloader)}"
)
case moreThanOne => moreThanOne.maxBy(_.priority)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.typeinformation

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
Expand All @@ -10,6 +11,7 @@ import scala.reflect.{ClassTag, classTag}
class ConcreteCaseClassTypeInfo[T <: Product](cls: Class[T], fields: List[(String, TypeInformation[_])])
extends CaseClassTypeInfo[T](cls, Array.empty, fields.map(_._2), fields.map(_._1)) {

@silent("deprecated")
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = {
new ScalaCaseClassSerializer[T](cls, fields.map(_._2.createSerializer(config)).toArray)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import cats.data.Validated.Valid
import com.github.ghik.silencer.silent
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
Expand All @@ -12,11 +13,11 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown}
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
import pl.touk.nussknacker.engine.process.typeinformation.TypingResultAwareTypeInformationDetection

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.util.Random

@silent("deprecated")
class HyperLogLogPlusAggregatorSpec extends AnyFunSuite with Matchers {

// the aim of this test is to be able to test different parameters easily
Expand Down Expand Up @@ -73,8 +74,9 @@ class HyperLogLogPlusAggregatorSpec extends AnyFunSuite with Matchers {
val typeInfo = rawTypeInfo.asInstanceOf[TypeInformation[CardinalityWrapper]]
val serializer = typeInfo.createSerializer(ex)

val compatibility =
serializer.snapshotConfiguration().resolveSchemaCompatibility(typeInfo.createSerializer(ex))
val compatibility = serializer
.snapshotConfiguration()
.resolveSchemaCompatibility(typeInfo.createSerializer(ex))
compatibility.isCompatibleAsIs shouldBe true

val data = new ByteArrayOutputStream(1024)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.{AggregateFunction, RuntimeContext}
import org.apache.flink.api.common.state.AggregatingStateDescriptor
import org.apache.flink.configuration.Configuration
Expand Down Expand Up @@ -56,6 +57,7 @@ object OnEventTriggerWindowOperator {

}

@silent("deprecated")
class OnEventTriggerWindowOperator[A](
stream: KeyedStream[Input[A], String],
fctx: FlinkCustomNodeContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.transformer.aggregate

import com.github.ghik.silencer.silent
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
Expand Down Expand Up @@ -88,6 +89,7 @@ object transformers {
)
}

@silent("deprecated")
def tumblingTransformer(
groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
Expand Down Expand Up @@ -152,6 +154,7 @@ object transformers {
)

// Experimental component, API may change in the future
@silent("deprecated")
def sessionWindowTransformer(
groupBy: LazyParameter[CharSequence],
aggregateBy: LazyParameter[AnyRef],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.table.utils

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.DataTypes
Expand All @@ -8,6 +9,7 @@ import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions.LogicalTypeExtension

@silent("deprecated")
class DataTypesExtensionsSpec extends AnyFunSuiteLike with Matchers {

test("to typing result conversion for raw type") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.compiler

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.RuntimeContext
import pl.touk.nussknacker.engine.api.JobData
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
Expand All @@ -14,6 +15,7 @@ case class FlinkEngineRuntimeContextImpl(
metricsProvider: MetricsProviderForScenario
) extends FlinkEngineRuntimeContext {

@silent("deprecated")
override def contextIdGenerator(nodeId: String): ContextIdGenerator =
new IncContextIdGenerator(jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getIndexOfThisSubtask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.process.compiler
import cats.data.NonEmptyList
import com.codahale.metrics
import com.codahale.metrics.SlidingTimeWindowReservoir
import com.github.ghik.silencer.silent
import org.apache.flink
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
Expand Down Expand Up @@ -52,6 +53,7 @@ class FlinkMetricsProviderForScenario(runtimeContext: RuntimeContext) extends Ba
??? // Shouldn't be needed because Flink jobs are recreated "from scratch" and no cleanup of metrics during cancel is needed
}

@silent("deprecated")
private def groupsWithName(nameParts: NonEmptyList[String], tags: Map[String, String]): (MetricGroup, String) = {
val namespaceTags = extractTags(NkGlobalParameters.readFromContext(runtimeContext.getExecutionConfig))
tagMode(nameParts, tags ++ namespaceTags)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.compiler

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.connector.source.Boundedness
import pl.touk.nussknacker.engine.ModelData
Expand Down Expand Up @@ -138,6 +139,8 @@ class TestFlinkExceptionHandler(
listeners: Seq[ProcessListener],
classLoader: ClassLoader
) extends FlinkExceptionHandler(metaData, modelDependencies, listeners, classLoader) {

@silent("deprecated")
override def restartStrategy: RestartStrategies.RestartStrategyConfiguration = RestartStrategies.noRestart()

override val consumer: FlinkEspExceptionConsumer = _ => {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.exception

import com.github.ghik.silencer.silent
import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus.{booleanValueReader, optionValueReader, stringValueReader, toFicusConfig}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
Expand Down Expand Up @@ -47,6 +48,7 @@ class FlinkExceptionHandler(
classLoader: ClassLoader
) extends ExceptionHandler {

@silent("deprecated")
def restartStrategy: RestartStrategies.RestartStrategyConfiguration =
RestartStrategyFromConfiguration.readFromConfiguration(modelDependencies.config, metaData)

Expand Down
Loading
Loading