From 627c288d02e0057b4e5a37b96c292a6253cc3c8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Fri, 20 Sep 2024 15:54:59 +0200 Subject: [PATCH] [NU-1790] Bump Flink to 1.19.1 (#6805) bump flink to 1.19.1 --- .../SerializationBenchmarkSetup.scala | 4 +- .../serialization/avro/AvroBenchmark.scala | 1 - build.sbt | 4 +- docs/Changelog.md | 1 + .../engine/flink/api/NkGlobalParameters.scala | 119 +++++++++++++++--- .../api/datastream/DataStreamImplicits.scala | 2 + .../TypeInformationDetection.scala | 8 +- .../ConcreteCaseClassTypeInfo.scala | 2 + .../HyperLogLogPlusAggregatorSpec.scala | 8 +- .../OnEventTriggerWindowOperator.scala | 2 + .../transformer/aggregate/transformers.scala | 3 + ...ink.api.serialization.SerializersRegistrar | 0 .../table/utils/DataTypesExtensionsSpec.scala | 2 + .../FlinkEngineRuntimeContextImpl.scala | 2 + .../FlinkMetricsProviderForScenario.scala | 2 + .../TestFlinkProcessCompilerDataFactory.scala | 3 + .../exception/FlinkExceptionHandler.scala | 2 + .../RestartStrategyFromConfiguration.scala | 4 +- .../registrar/FlinkProcessRegistrar.scala | 3 +- .../InterpretationResultTypeInformation.scala | 1 - .../StreamExecutionEnvPreparer.scala | 3 + .../process/runner/FlinkStubbedRunner.scala | 6 +- ...gResultAwareTypeInformationDetection.scala | 28 ++++- .../InterpretationResultMapTypeInfo.scala | 2 + .../TypedJavaMapBasedTypeInformation.scala | 28 +++-- .../TypedObjectBasedTypeInformation.scala | 19 ++- .../TypedScalaMapBasedTypeInformation.scala | 28 +++-- .../process/util/StateConfiguration.scala | 2 +- .../exception/FlinkExceptionHandlerSpec.scala | 2 + ...RestartStrategyFromConfigurationSpec.scala | 2 + ...ultAwareTypeInformationDetectionSpec.scala | 19 +-- .../serialization/SerializerRegistrar.scala | 3 + .../SerializerWithSpecifiedClass.scala | 0 .../generic/DelayedFlinkKafkaConsumer.scala | 2 + .../caseclass/CaseClassTypeInfo.scala | 2 + .../caseclass/CaseClassTypeInfoFactory.scala | 8 +- .../api/typeinfo/option/OptionTypeInfo.scala | 2 + .../CaseClassSerializationTest.scala | 6 +- .../kryo/AvroSerializersRegistrar.scala | 10 +- ...ink.api.serialization.SerializersRegistrar | 1 + .../src/main/resources/docker/Dockerfile | 2 +- .../src/main/resources/docker/conf.yml | 2 + ...inkTypeInformationSerializationMixin.scala | 4 + .../flink/test/FlinkMiniClusterHolder.scala | 8 +- .../MiniClusterExecutionEnvironment.scala | 22 +++- .../process/functional/TestReporter.scala | 2 +- .../engine/process/helpers/SampleNodes.scala | 6 +- .../NkGlobalParametersEncoderTest.scala | 58 +++++++++ examples/installation/docker-compose.yml | 4 +- .../installation/flink/flink-properties.yml | 7 +- 50 files changed, 379 insertions(+), 82 deletions(-) rename engine/flink/{schemed-kafka-components-utils => components/kafka}/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar (100%) create mode 100644 engine/flink/extensions-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/serialization/SerializerWithSpecifiedClass.scala create mode 100644 engine/flink/schemed-kafka-components-utils/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar create mode 100644 engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala diff --git a/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/serialization/SerializationBenchmarkSetup.scala b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/serialization/SerializationBenchmarkSetup.scala index b5894db3e05..93d22fe6b03 100644 --- a/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/serialization/SerializationBenchmarkSetup.scala +++ b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/serialization/SerializationBenchmarkSetup.scala @@ -1,7 +1,8 @@ package pl.touk.nussknacker.engine.benchmarks.serialization -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import com.github.ghik.silencer.silent +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation @@ -21,6 +22,7 @@ class SerializationBenchmarkSetup[T]( private val data = new ByteArrayOutputStream(10 * 1024) + @silent("deprecated") private val serializer = typeInfo.createSerializer(config) { diff --git a/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/serialization/avro/AvroBenchmark.scala b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/serialization/avro/AvroBenchmark.scala index a10dcb45e81..40b17722191 100644 --- a/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/serialization/avro/AvroBenchmark.scala +++ b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/serialization/avro/AvroBenchmark.scala @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.benchmarks.serialization.avro import com.typesafe.config.ConfigFactory import org.apache.avro.generic.GenericData -import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.openjdk.jmh.annotations._ import pl.touk.nussknacker.engine.benchmarks.serialization.SerializationBenchmarkSetup diff --git a/build.sbt b/build.sbt index e4b6e576ac0..aab1090311c 100644 --- a/build.sbt +++ b/build.sbt @@ -280,8 +280,8 @@ lazy val commonSettings = // Note: when updating check versions in 'flink*V' below, because some libraries must be fixed at versions provided // by Flink, or jobs may fail in runtime when Flink is run with 'classloader.resolve-order: parent-first'. // You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file. -val flinkV = "1.18.1" -val flinkConnectorKafkaV = "3.1.0-1.18" +val flinkV = "1.19.1" +val flinkConnectorKafkaV = "3.2.0-1.19" val flinkCommonsLang3V = "3.12.0" val flinkCommonsTextV = "1.10.0" val flinkCommonsIOV = "2.11.0" diff --git a/docs/Changelog.md b/docs/Changelog.md index 8c0603fa7d0..8fac976e740 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -53,6 +53,7 @@ * [#6176](https://github.com/TouK/nussknacker/pull/6176) Update most dependencies to latest versions, most important ones: * Jackson 2.15.4 -> 2.17.2 * cats 2.10 -> 2.12 +* [#6805](https://github.com/TouK/nussknacker/pull/6805) Support for Flink 1.19.1 ## 1.17 diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala index cf2be0446cf..aa0629b746b 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/NkGlobalParameters.scala @@ -7,6 +7,9 @@ import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} +import pl.touk.nussknacker.engine.flink.api.NkGlobalParameters.NkGlobalParametersToMapEncoder + import _root_.java.util import scala.jdk.CollectionConverters._ @@ -23,19 +26,9 @@ case class NkGlobalParameters( // here we decide which configuration properties should be shown in REST API etc. // NOTE: some of the information is used in FlinkRestManager - any changes here should be reflected there - override def toMap: util.Map[String, String] = { - - val baseProperties = Map[String, String]( - "buildInfo" -> buildInfo, - "versionId" -> processVersion.versionId.value.toString, - "processId" -> processVersion.processId.value.toString, - "modelVersion" -> processVersion.modelVersion.map(_.toString).orNull, - "user" -> processVersion.user - ) - val configMap = baseProperties ++ additionalInformation + override def toMap: util.Map[String, String] = // we wrap in HashMap because .asJava creates not-serializable map in 2.11 - new util.HashMap(configMap.filterNot(_._2 == null).asJava) - } + new util.HashMap(NkGlobalParametersToMapEncoder.encode(this).filterNot(_._2 == null).asJava) } @@ -84,8 +77,106 @@ object NkGlobalParameters { ec.setGlobalJobParameters(globalParameters) } - def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] = Option(ec.getGlobalJobParameters).collect { - case a: NkGlobalParameters => a + def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] = + NkGlobalParametersToMapEncoder.decode(ec.getGlobalJobParameters.toMap.asScala.toMap) + + private object NkGlobalParametersToMapEncoder { + + def encode(parameters: NkGlobalParameters): Map[String, String] = { + def encodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = { + map.map { case (key, value) => s"$prefix$key" -> value } + } + + val baseProperties = Map[String, String]( + "buildInfo" -> parameters.buildInfo, + "versionId" -> parameters.processVersion.versionId.value.toString, + "processId" -> parameters.processVersion.processId.value.toString, + "modelVersion" -> parameters.processVersion.modelVersion.map(_.toString).orNull, + "user" -> parameters.processVersion.user, + "processName" -> parameters.processVersion.processName.value + ) + + val configMap = parameters.configParameters + .map(ConfigGlobalParametersToMapEncoder.encode) + .getOrElse(Map.empty) + val namespaceTagsMap = parameters.namespaceParameters + .map(p => encodeWithKeyPrefix(p.tags, namespaceTagsMapPrefix)) + .getOrElse(Map.empty) + val additionalInformationMap = + encodeWithKeyPrefix(parameters.additionalInformation, additionalInformationMapPrefix) + + baseProperties ++ additionalInformationMap ++ configMap ++ namespaceTagsMap + } + + def decode(map: Map[String, String]): Option[NkGlobalParameters] = { + def decodeWithKeyPrefix(map: Map[String, String], prefix: String): Map[String, String] = { + map.view + .filter { case (key, _) => key.startsWith(prefix) } + .map { case (key, value) => key.stripPrefix(prefix) -> value } + .toMap + } + + val processVersionOpt = for { + versionId <- map.get("versionId").map(v => VersionId(v.toLong)) + processId <- map.get("processId").map(pid => ProcessId(pid.toLong)) + processName <- map.get("processName").map(ProcessName(_)) + user <- map.get("user") + } yield { + val modelVersion = map.get("modelVersion").map(_.toInt) + ProcessVersion(versionId, processName, processId, user, modelVersion) + } + val buildInfoOpt = map.get("buildInfo") + + val configParameters = ConfigGlobalParametersToMapEncoder.decode(map) + val namespaceTags = { + val namespaceTagsMap = decodeWithKeyPrefix(map, namespaceTagsMapPrefix) + if (namespaceTagsMap.isEmpty) None else Some(NamespaceMetricsTags(namespaceTagsMap)) + } + val additionalInformation = decodeWithKeyPrefix(map, additionalInformationMapPrefix) + + for { + processVersion <- processVersionOpt + buildInfo <- buildInfoOpt + } yield NkGlobalParameters(buildInfo, processVersion, configParameters, namespaceTags, additionalInformation) + } + + private object ConfigGlobalParametersToMapEncoder { + + def encode(params: ConfigGlobalParameters): Map[String, String] = { + Map( + s"$prefix.explicitUidInStatefulOperators" -> params.explicitUidInStatefulOperators + .map(_.toString) + .orNull, + s"$prefix.useIOMonadInInterpreter" -> params.useIOMonadInInterpreter + .map(_.toString) + .orNull, + s"$prefix.forceSyncInterpretationForSyncScenarioPart" -> params.forceSyncInterpretationForSyncScenarioPart + .map(_.toString) + .orNull + ) + } + + def decode(map: Map[String, String]): Option[ConfigGlobalParameters] = { + val mapContainsConfigGlobalParams = map.view.exists { case (key, _) => key.startsWith(prefix) } + if (mapContainsConfigGlobalParams) { + Some( + ConfigGlobalParameters( + explicitUidInStatefulOperators = map.get(s"$prefix.explicitUidInStatefulOperators").map(_.toBoolean), + useIOMonadInInterpreter = map.get(s"$prefix.useIOMonadInInterpreter").map(_.toBoolean), + forceSyncInterpretationForSyncScenarioPart = + map.get(s"$prefix.forceSyncInterpretationForSyncScenarioPart").map(_.toBoolean) + ) + ) + } else { + None + } + } + + private val prefix = "configParameters" + } + + private val namespaceTagsMapPrefix = "namespaceTags" + private val additionalInformationMapPrefix = "additionalInformation" } } diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/datastream/DataStreamImplicits.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/datastream/DataStreamImplicits.scala index e0aa1f66351..9a7d7483e3f 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/datastream/DataStreamImplicits.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/datastream/DataStreamImplicits.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.api.datastream +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -10,6 +11,7 @@ object DataStreamImplicits { implicit class DataStreamExtension[T](stream: DataStream[T]) { + @silent("deprecated") def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = { val cleanFun = stream.getExecutionEnvironment.clean(fun) val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]] diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 03305d7060d..b0a77d68f2c 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -31,6 +31,8 @@ trait TypeInformationDetection extends Serializable { def forType[T](typingResult: TypingResult): TypeInformation[T] + def priority: Int + } object TypeInformationDetection { @@ -50,11 +52,7 @@ object TypeInformationDetection { s"Classloader: ${printClassloaderDebugDetails(classloader)}. " + s"Ensure that your classpath is correctly configured, flinkExecutor.jar is probably missing" ) - case moreThanOne => - throw new IllegalStateException( - s"More than one ${classOf[TypeInformationDetection].getSimpleName} implementations on the classpath: $moreThanOne. " + - s"Classloader: ${printClassloaderDebugDetails(classloader)}" - ) + case moreThanOne => moreThanOne.maxBy(_.priority) } } diff --git a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/typeinformation/ConcreteCaseClassTypeInfo.scala b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/typeinformation/ConcreteCaseClassTypeInfo.scala index 907c002c82e..dc654c6a84f 100644 --- a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/typeinformation/ConcreteCaseClassTypeInfo.scala +++ b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/typeinformation/ConcreteCaseClassTypeInfo.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.typeinformation +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -10,6 +11,7 @@ import scala.reflect.{ClassTag, classTag} class ConcreteCaseClassTypeInfo[T <: Product](cls: Class[T], fields: List[(String, TypeInformation[_])]) extends CaseClassTypeInfo[T](cls, Array.empty, fields.map(_._2), fields.map(_._1)) { + @silent("deprecated") override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = { new ScalaCaseClassSerializer[T](cls, fields.map(_._2.createSerializer(config)).toArray) } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HyperLogLogPlusAggregatorSpec.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HyperLogLogPlusAggregatorSpec.scala index eb98cdd14f7..9a0ff5299dd 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HyperLogLogPlusAggregatorSpec.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/HyperLogLogPlusAggregatorSpec.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate import cats.data.Validated.Valid +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -12,11 +13,11 @@ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown} import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection -import pl.touk.nussknacker.engine.process.typeinformation.TypingResultAwareTypeInformationDetection import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import scala.util.Random +@silent("deprecated") class HyperLogLogPlusAggregatorSpec extends AnyFunSuite with Matchers { // the aim of this test is to be able to test different parameters easily @@ -73,8 +74,9 @@ class HyperLogLogPlusAggregatorSpec extends AnyFunSuite with Matchers { val typeInfo = rawTypeInfo.asInstanceOf[TypeInformation[CardinalityWrapper]] val serializer = typeInfo.createSerializer(ex) - val compatibility = - serializer.snapshotConfiguration().resolveSchemaCompatibility(typeInfo.createSerializer(ex)) + val compatibility = serializer + .snapshotConfiguration() + .resolveSchemaCompatibility(typeInfo.createSerializer(ex)) compatibility.isCompatibleAsIs shouldBe true val data = new ByteArrayOutputStream(1024) diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala index 205f8369d76..d9026604ef2 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/OnEventTriggerWindowOperator.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.{AggregateFunction, RuntimeContext} import org.apache.flink.api.common.state.AggregatingStateDescriptor import org.apache.flink.configuration.Configuration @@ -56,6 +57,7 @@ object OnEventTriggerWindowOperator { } +@silent("deprecated") class OnEventTriggerWindowOperator[A]( stream: KeyedStream[Input[A], String], fctx: FlinkCustomNodeContext, diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala index d9ede9b049d..84ea9e8b90e 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/aggregate/transformers.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.util.transformer.aggregate +import com.github.ghik.silencer.silent import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStream @@ -88,6 +89,7 @@ object transformers { ) } + @silent("deprecated") def tumblingTransformer( groupBy: LazyParameter[CharSequence], aggregateBy: LazyParameter[AnyRef], @@ -152,6 +154,7 @@ object transformers { ) // Experimental component, API may change in the future + @silent("deprecated") def sessionWindowTransformer( groupBy: LazyParameter[CharSequence], aggregateBy: LazyParameter[AnyRef], diff --git a/engine/flink/schemed-kafka-components-utils/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar b/engine/flink/components/kafka/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar similarity index 100% rename from engine/flink/schemed-kafka-components-utils/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar rename to engine/flink/components/kafka/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar diff --git a/engine/flink/components/table/src/test/scala/pl/touk/nussknacker/engine/flink/table/utils/DataTypesExtensionsSpec.scala b/engine/flink/components/table/src/test/scala/pl/touk/nussknacker/engine/flink/table/utils/DataTypesExtensionsSpec.scala index 9308d2f0daf..53d1399deb7 100644 --- a/engine/flink/components/table/src/test/scala/pl/touk/nussknacker/engine/flink/table/utils/DataTypesExtensionsSpec.scala +++ b/engine/flink/components/table/src/test/scala/pl/touk/nussknacker/engine/flink/table/utils/DataTypesExtensionsSpec.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.table.utils +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.DataTypes @@ -8,6 +9,7 @@ import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.api.typed.typing.Unknown import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions.LogicalTypeExtension +@silent("deprecated") class DataTypesExtensionsSpec extends AnyFunSuiteLike with Matchers { test("to typing result conversion for raw type") { diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala index 53fc8dc2ea8..ee33a6bca26 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.compiler +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.RuntimeContext import pl.touk.nussknacker.engine.api.JobData import pl.touk.nussknacker.engine.api.process.ComponentUseCase @@ -14,6 +15,7 @@ case class FlinkEngineRuntimeContextImpl( metricsProvider: MetricsProviderForScenario ) extends FlinkEngineRuntimeContext { + @silent("deprecated") override def contextIdGenerator(nodeId: String): ContextIdGenerator = new IncContextIdGenerator(jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getIndexOfThisSubtask) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala index dd90945962b..7fbd65f16f5 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkMetricsProviderForScenario.scala @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.process.compiler import cats.data.NonEmptyList import com.codahale.metrics import com.codahale.metrics.SlidingTimeWindowReservoir +import com.github.ghik.silencer.silent import org.apache.flink import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper @@ -52,6 +53,7 @@ class FlinkMetricsProviderForScenario(runtimeContext: RuntimeContext) extends Ba ??? // Shouldn't be needed because Flink jobs are recreated "from scratch" and no cleanup of metrics during cancel is needed } + @silent("deprecated") private def groupsWithName(nameParts: NonEmptyList[String], tags: Map[String, String]): (MetricGroup, String) = { val namespaceTags = extractTags(NkGlobalParameters.readFromContext(runtimeContext.getExecutionConfig)) tagMode(nameParts, tags ++ namespaceTags) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala index c80fdf2277b..f631e919eb4 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/TestFlinkProcessCompilerDataFactory.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.compiler +import com.github.ghik.silencer.silent import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.connector.source.Boundedness import pl.touk.nussknacker.engine.ModelData @@ -138,6 +139,8 @@ class TestFlinkExceptionHandler( listeners: Seq[ProcessListener], classLoader: ClassLoader ) extends FlinkExceptionHandler(metaData, modelDependencies, listeners, classLoader) { + + @silent("deprecated") override def restartStrategy: RestartStrategies.RestartStrategyConfiguration = RestartStrategies.noRestart() override val consumer: FlinkEspExceptionConsumer = _ => {} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/exception/FlinkExceptionHandler.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/exception/FlinkExceptionHandler.scala index f2fcc75498c..c57e9b2a757 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/exception/FlinkExceptionHandler.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/exception/FlinkExceptionHandler.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.exception +import com.github.ghik.silencer.silent import com.typesafe.config.Config import net.ceedubs.ficus.Ficus.{booleanValueReader, optionValueReader, stringValueReader, toFicusConfig} import org.apache.flink.api.common.restartstrategy.RestartStrategies @@ -47,6 +48,7 @@ class FlinkExceptionHandler( classLoader: ClassLoader ) extends ExceptionHandler { + @silent("deprecated") def restartStrategy: RestartStrategies.RestartStrategyConfiguration = RestartStrategyFromConfiguration.readFromConfiguration(modelDependencies.config, metaData) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/exception/RestartStrategyFromConfiguration.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/exception/RestartStrategyFromConfiguration.scala index f5bf96a8789..85947407aa2 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/exception/RestartStrategyFromConfiguration.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/exception/RestartStrategyFromConfiguration.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.exception +import com.github.ghik.silencer.silent import com.typesafe.config.{Config, ConfigValue, ConfigValueType} import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.restartstrategy.RestartStrategies @@ -7,8 +8,8 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStra import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.engine.api.MetaData import pl.touk.nussknacker.engine.util.MetaDataExtractor - import pl.touk.nussknacker.engine.util.config.CustomFicusInstances._ + import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.jdk.CollectionConverters._ @@ -55,6 +56,7 @@ object RestartStrategyFromConfiguration { // We convert HOCON to Flink configuration, so that we can use Flink parsing mechanisms // https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/ + @silent("deprecated") private def readFromConfig(config: Config): RestartStrategyConfiguration = { val flinkConfig = new Configuration // restart-strategy.fixed-delay.attempts diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala index 351c4e95729..b5e57e38127 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.registrar +import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.typeinfo.TypeInformation @@ -27,7 +28,6 @@ import pl.touk.nussknacker.engine.process.compiler.{ FlinkProcessCompilerDataFactory, UsedNodes } -import pl.touk.nussknacker.engine.process.typeinformation.TypingResultAwareTypeInformationDetection import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkCompatibilityProvider, FlinkJobConfig} import pl.touk.nussknacker.engine.resultcollector.{ProductionServiceInvocationCollector, ResultCollector} import pl.touk.nussknacker.engine.splittedgraph.end.BranchEnd @@ -57,6 +57,7 @@ class FlinkProcessRegistrar( import FlinkProcessRegistrar._ + @silent("deprecated") implicit def millisToTime(duration: Long): Time = Time.of(duration, TimeUnit.MILLISECONDS) def register( diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/InterpretationResultTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/InterpretationResultTypeInformation.scala index 58968906459..6fe30c6ce8f 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/InterpretationResultTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/InterpretationResultTypeInformation.scala @@ -6,7 +6,6 @@ import pl.touk.nussknacker.engine.api.PartReference import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection import pl.touk.nussknacker.engine.flink.typeinformation.ConcreteCaseClassTypeInfo -import pl.touk.nussknacker.engine.process.typeinformation.TypingResultAwareTypeInformationDetection import pl.touk.nussknacker.engine.process.typeinformation.internal.InterpretationResultMapTypeInfo import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/StreamExecutionEnvPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/StreamExecutionEnvPreparer.scala index c07ed748d53..dd8ef11b45d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/StreamExecutionEnvPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/StreamExecutionEnvPreparer.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.registrar +import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.runtime.state.StateBackend @@ -50,6 +51,7 @@ class DefaultStreamExecutionEnvPreparer( ) extends StreamExecutionEnvPreparer with LazyLogging { + @silent("deprecated") override def preRegistration( env: StreamExecutionEnvironment, compilerData: FlinkProcessCompilerData, @@ -88,6 +90,7 @@ class DefaultStreamExecutionEnvPreparer( } } + @silent("deprecated") protected def configureRocksDBBackend(env: StreamExecutionEnvironment, config: RocksDBStateBackendConfig): Unit = { env.setStateBackend(StateConfiguration.prepareRocksDBStateBackend(config).asInstanceOf[StateBackend]) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala index 5ba3446cec1..a51db93cf39 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/runner/FlinkStubbedRunner.scala @@ -55,11 +55,11 @@ trait FlinkStubbedRunner { val configuration: Configuration = new Configuration configuration.addAll(jobGraph.getJobConfiguration) - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, env.getParallelism) - configuration.setInteger(RestOptions.PORT, 0) + configuration.set[Integer](TaskManagerOptions.NUM_TASK_SLOTS, env.getParallelism) + configuration.set[Integer](RestOptions.PORT, 0) // FIXME: reversing flink default order - configuration.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") + configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first") // it is required for proper working of HadoopFileSystem FileSystem.initialize(configuration, null) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala index 56090ef5004..8e17e1f802e 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.process.typeinformation import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} +import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.api.java.typeutils.{ListTypeInfo, MapTypeInfo, MultisetTypeInfo, RowTypeInfo} import org.apache.flink.types.Row import pl.touk.nussknacker.engine.api.context.ValidationContext @@ -10,12 +11,10 @@ import pl.touk.nussknacker.engine.flink.api.TypedMultiset import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection import pl.touk.nussknacker.engine.flink.typeinformation.ConcreteCaseClassTypeInfo import pl.touk.nussknacker.engine.process.typeinformation.internal.ContextTypeHelpers -import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.{ - TypedJavaMapTypeInformation, - TypedScalaMapTypeInformation -} +import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.{TypedJavaMapTypeInformation, TypedScalaMapTypeInformation} import pl.touk.nussknacker.engine.util.Implicits._ + // TODO: handle avro types - see FlinkConfluentUtils /* This class generates TypeInformation based on ValidationContext and TypingResult. @@ -77,10 +76,10 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection // TODO: better handle specific map implementations - other than HashMap? case a: TypedObjectTypingResult if classOf[java.util.Map[String @unchecked, _]].isAssignableFrom(a.runtimeObjType.klass) => - TypedJavaMapTypeInformation(a.fields.mapValuesNow(forType)) + createJavaMapTypeInformation(a) // We generally don't use scala Maps in our runtime, but it is useful for some internal type infos: TODO move it somewhere else case a: TypedObjectTypingResult if a.runtimeObjType.klass == classOf[Map[String, _]] => - TypedScalaMapTypeInformation(a.fields.mapValuesNow(forType)) + createScalaMapTypeInformation(a) case a: SingleTypingResult if registeredTypeInfos.contains(a.runtimeObjType) => registeredTypeInfos(a.runtimeObjType) // TODO: scala case classes are not handled nicely here... CaseClassTypeInfo is created only via macro, here Kryo is used @@ -94,6 +93,22 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection }).asInstanceOf[TypeInformation[T]] } + private def createScalaMapTypeInformation(typingResult: TypedObjectTypingResult) = + TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult) + + private def createJavaMapTypeInformation(typingResult: TypedObjectTypingResult) = + TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult) + + protected def constructIntermediateCompatibilityResult( + newNestedSerializers: Array[TypeSerializer[_]], + oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] + ): CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] = { + CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult( + newNestedSerializers.map(_.snapshotConfiguration()), + oldNestedSerializerSnapshots + ) + } + def forValueWithContext[T]( validationContext: ValidationContext, value: TypeInformation[T] @@ -105,4 +120,5 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection ) } + override def priority: Int = Integer.MIN_VALUE } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/InterpretationResultMapTypeInfo.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/InterpretationResultMapTypeInfo.scala index f62935d3c94..ce27b7ea1c4 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/InterpretationResultMapTypeInfo.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/InterpretationResultMapTypeInfo.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} @@ -22,6 +23,7 @@ case class InterpretationResultMapTypeInfo(ctx: Map[String, TypeInformation[Inte override def isKeyType: Boolean = false + @silent("deprecated") override def createSerializer(config: ExecutionConfig): TypeSerializer[InterpretationResult] = InterpretationResultMapTypeSerializer(ctx.mapValuesNow(_.createSerializer(config))) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala index 229b5b60b2f..c2f7fd96633 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala @@ -3,33 +3,42 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject import java.{util => jutil} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} +import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult -case class TypedJavaMapTypeInformation(informations: Map[String, TypeInformation[_]]) - extends TypedObjectBasedTypeInformation[jutil.Map[String, AnyRef]](informations) { +case class TypedJavaMapTypeInformation( + informations: Map[String, TypeInformation[_]], + buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult +) extends TypedObjectBasedTypeInformation[jutil.Map[String, AnyRef]](informations) { override def createSerializer( serializers: Array[(String, TypeSerializer[_])] - ): TypeSerializer[jutil.Map[String, AnyRef]] = TypedJavaMapSerializer(serializers) + ): TypeSerializer[jutil.Map[String, AnyRef]] = + TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) } @SerialVersionUID(1L) -case class TypedJavaMapSerializer(override val serializers: Array[(String, TypeSerializer[_])]) - extends TypedObjectBasedTypeSerializer[jutil.Map[String, AnyRef]](serializers) +case class TypedJavaMapSerializer( + override val serializers: Array[(String, TypeSerializer[_])], + override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult +) extends TypedObjectBasedTypeSerializer[jutil.Map[String, AnyRef]](serializers) with BaseJavaMapBasedSerializer[AnyRef, jutil.Map[String, AnyRef]] { override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[jutil.Map[String, AnyRef]] = - TypedJavaMapSerializer(serializers) + TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) override def createInstance(): jutil.Map[String, AnyRef] = new jutil.HashMap() override def snapshotConfiguration( snapshots: Array[(String, TypeSerializerSnapshot[_])] - ): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) + ): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) { + override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult = + buildIntermediateSchemaCompatibilityResultFunction + } } -class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] { +abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] { def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = { this() @@ -38,6 +47,7 @@ class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[ override protected def restoreSerializer( restored: Array[(String, TypeSerializer[_])] - ): TypeSerializer[jutil.Map[String, AnyRef]] = TypedJavaMapSerializer(restored) + ): TypeSerializer[jutil.Map[String, AnyRef]] = + TypedJavaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala index 254c9559cc6..530cba98149 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject +import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation @@ -10,6 +11,7 @@ import org.apache.flink.api.common.typeutils.{ TypeSerializerSnapshot } import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult import scala.reflect.ClassTag @@ -44,6 +46,7 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[ override def isKeyType: Boolean = false + @silent("deprecated") override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = createSerializer(serializers = informations.map { case (k, v) => (k, v.createSerializer(config)) @@ -54,6 +57,15 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[ def createSerializer(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T] } +object TypedObjectBasedTypeInformation { + + type BuildIntermediateSchemaCompatibilityResult = ( + Array[TypeSerializer[_]], + Array[TypeSerializerSnapshot[_]] + ) => CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] + +} + //We use Array instead of List here, as we need access by index, which is faster for array abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, TypeSerializer[_])]) extends TypeSerializer[T] @@ -119,6 +131,8 @@ abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, def get(value: T, name: String): AnyRef def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T] + + def buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult } abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnapshot[T] with LazyLogging { @@ -157,6 +171,7 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps if nonEqualKeysCompatible == false we require keys in new and old serializer are the same */ + @silent("deprecated") override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] = { if (newSerializer.snapshotConfiguration().getClass != getClass) { TypeSerializerSchemaCompatibility.incompatible() @@ -170,7 +185,7 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1)) val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1)) - val fieldsCompatibility = CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult( + val fieldsCompatibility = buildIntermediateSchemaCompatibilityResult( newSerializersToUse.map(_._2), snapshotsToUse.map(_._2) ) @@ -222,6 +237,8 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps } } + val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult + override def restoreSerializer(): TypeSerializer[T] = restoreSerializer(serializersSnapshots.map { case (k, snapshot) => (k, snapshot.restoreSerializer()) }) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala index c45ff98c768..b987369b534 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala @@ -3,19 +3,25 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} +import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult -case class TypedScalaMapTypeInformation(informations: Map[String, TypeInformation[_]]) - extends TypedObjectBasedTypeInformation[Map[String, _ <: AnyRef]](informations) { +case class TypedScalaMapTypeInformation( + informations: Map[String, TypeInformation[_]], + buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult +) extends TypedObjectBasedTypeInformation[Map[String, _ <: AnyRef]](informations) { override def createSerializer( serializers: Array[(String, TypeSerializer[_])] - ): TypeSerializer[Map[String, _ <: AnyRef]] = TypedScalaMapSerializer(serializers) + ): TypeSerializer[Map[String, _ <: AnyRef]] = + TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) } @SerialVersionUID(1L) -case class TypedScalaMapSerializer(override val serializers: Array[(String, TypeSerializer[_])]) - extends TypedObjectBasedTypeSerializer[Map[String, _ <: AnyRef]](serializers) +case class TypedScalaMapSerializer( + override val serializers: Array[(String, TypeSerializer[_])], + override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult +) extends TypedObjectBasedTypeSerializer[Map[String, _ <: AnyRef]](serializers) with LazyLogging { override def deserialize(values: Array[AnyRef]): Map[String, _ <: AnyRef] = { @@ -30,17 +36,20 @@ case class TypedScalaMapSerializer(override val serializers: Array[(String, Type override def get(value: Map[String, _ <: AnyRef], k: String): AnyRef = value.getOrElse(k, null) override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[Map[String, _ <: AnyRef]] = - TypedScalaMapSerializer(serializers) + TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) override def createInstance(): Map[String, _ <: AnyRef] = Map.empty override def snapshotConfiguration( snapshots: Array[(String, TypeSerializerSnapshot[_])] - ): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) + ): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) { + override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult = + buildIntermediateSchemaCompatibilityResultFunction + } } -class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] { +abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] { def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = { this() @@ -49,6 +58,7 @@ class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot override protected def restoreSerializer( restored: Array[(String, TypeSerializer[_])] - ): TypeSerializer[Map[String, _ <: AnyRef]] = TypedScalaMapSerializer(restored) + ): TypeSerializer[Map[String, _ <: AnyRef]] = + TypedScalaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/util/StateConfiguration.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/util/StateConfiguration.scala index c919e3b81a2..6d0258e8589 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/util/StateConfiguration.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/util/StateConfiguration.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.engine.process.util -import org.apache.flink.contrib.streaming.state.{EmbeddedRocksDBStateBackend, PredefinedOptions, RocksDBStateBackend} +import org.apache.flink.contrib.streaming.state.{EmbeddedRocksDBStateBackend, PredefinedOptions} import org.apache.flink.runtime.state.AbstractStateBackend object StateConfiguration { diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/exception/FlinkExceptionHandlerSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/exception/FlinkExceptionHandlerSpec.scala index 022ece2d132..74e985c651b 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/exception/FlinkExceptionHandlerSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/exception/FlinkExceptionHandlerSpec.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.exception +import com.github.ghik.silencer.silent import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.scalatest.funsuite.AnyFunSuite @@ -12,6 +13,7 @@ import pl.touk.nussknacker.test.ClassLoaderWithServices import scala.jdk.CollectionConverters._ +@silent("deprecated") class FlinkExceptionHandlerSpec extends AnyFunSuite with Matchers { private val config = ConfigFactory.parseMap( diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/exception/RestartStrategyFromConfigurationSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/exception/RestartStrategyFromConfigurationSpec.scala index 4cd7a5fb5eb..7364696923e 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/exception/RestartStrategyFromConfigurationSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/exception/RestartStrategyFromConfigurationSpec.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.exception +import com.github.ghik.silencer.silent import com.typesafe.config.ConfigFactory import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration @@ -10,6 +11,7 @@ import pl.touk.nussknacker.engine.api.{MetaData, ProcessAdditionalFields, Stream import scala.jdk.CollectionConverters._ +@silent("deprecated") class RestartStrategyFromConfigurationSpec extends AnyFunSuite with Matchers { private val metaData = MetaData.combineTypeSpecificProperties( diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala index bf23ff2b158..af15bbcf529 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.process.typeinformation import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Kryo, Serializer} +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -28,6 +29,7 @@ import pl.touk.nussknacker.engine.process.typeinformation.testTypedObject.Custom import scala.jdk.CollectionConverters._ +@silent("deprecated") class TypingResultAwareTypeInformationDetectionSpec extends AnyFunSuite with Matchers @@ -240,17 +242,20 @@ class TypingResultAwareTypeInformationDetectionSpec } private def assertNested(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_] => Assertion)*): Unit = { - inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) => - array.zipAll(nested.toList, null, null).foreach { case ((name, serializer), (expectedName, expectedSerializer)) => - name shouldBe expectedName - expectedSerializer(serializer) - } + inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { + case TypedScalaMapSerializer(array, _) => + array.zipAll(nested.toList, null, null).foreach { + case ((name, serializer), (expectedName, expectedSerializer)) => + name shouldBe expectedName + expectedSerializer(serializer) + } } } private def assertMapSerializers(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_])*) = { - inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) => - array.toList shouldBe nested.toList + inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { + case TypedScalaMapSerializer(array, _) => + array.toList shouldBe nested.toList } } diff --git a/engine/flink/extensions-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/serialization/SerializerRegistrar.scala b/engine/flink/extensions-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/serialization/SerializerRegistrar.scala index de61b3dbb56..fb74f7560a3 100644 --- a/engine/flink/extensions-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/serialization/SerializerRegistrar.scala +++ b/engine/flink/extensions-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/serialization/SerializerRegistrar.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.flink.api.serialization import com.esotericsoftware.kryo.Serializer +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig trait SerializerRegistrar[S <: Serializer[_]] { @@ -21,6 +22,7 @@ class InstanceBasedKryoSerializerRegistrar[T, S <: Serializer[T] with Serializab clazz: Class[T] ) extends SerializerRegistrar[S] { + @silent("deprecated") override def registerIn(config: ExecutionConfig): Unit = { val serializableSerializer = new ExecutionConfig.SerializableSerializer(serializerInstance) config.getRegisteredTypesWithKryoSerializers.put(clazz, serializableSerializer) @@ -29,6 +31,7 @@ class InstanceBasedKryoSerializerRegistrar[T, S <: Serializer[T] with Serializab } +@silent("deprecated") class ClassBasedKryoSerializerRegistrar[T, S <: Serializer[T]](serializerClass: Class[S], clazz: Class[T]) extends SerializerRegistrar[S] { diff --git a/engine/flink/extensions-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/serialization/SerializerWithSpecifiedClass.scala b/engine/flink/extensions-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/serialization/SerializerWithSpecifiedClass.scala new file mode 100644 index 00000000000..e69de29bb2d diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala index c58f71fb979..a178f94f2c6 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.kafka.generic import cats.data.NonEmptyList +import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.functions.RuntimeContext @@ -108,6 +109,7 @@ class DelayedFlinkKafkaConsumer[T]( nodeId ) { + @silent("deprecated") override def createFetcher( sourceContext: SourceFunction.SourceContext[T], assignedPartitionsWithInitialOffsets: util.Map[KafkaTopicPartition, lang.Long], diff --git a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassTypeInfo.scala b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassTypeInfo.scala index 379304ce008..84203e6c9b2 100644 --- a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassTypeInfo.scala +++ b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassTypeInfo.scala @@ -17,6 +17,7 @@ */ package pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass +import com.github.ghik.silencer.silent import org.apache.flink.annotation.{Public, PublicEvolving} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.operators.Keys.ExpressionKeys @@ -244,6 +245,7 @@ abstract class CaseClassTypeInfo[T <: Product]( logicalKeyFields += fieldId } + @silent("deprecated") override def createTypeComparator(config: ExecutionConfig): TypeComparator[T] = { val maxIndex = logicalKeyFields.max diff --git a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassTypeInfoFactory.scala b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassTypeInfoFactory.scala index 77d46f068c0..1a920e87752 100644 --- a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassTypeInfoFactory.scala +++ b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassTypeInfoFactory.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation} import org.apache.flink.api.common.typeutils.TypeSerializer @@ -13,6 +14,7 @@ import scala.reflect.runtime.universe._ // Generic class factory for creating CaseClassTypeInfo abstract class CaseClassTypeInfoFactory[T <: Product: ClassTag] extends TypeInfoFactory[T] with Serializable { + @silent("deprecated") override def createTypeInfo( t: Type, genericParameters: java.util.Map[String, TypeInformation[_]] @@ -23,8 +25,10 @@ abstract class CaseClassTypeInfoFactory[T <: Product: ClassTag] extends TypeInfo new CaseClassTypeInfo[T](classType, Array.empty, fieldTypes.toIndexedSeq, fieldNames) { override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = { new ScalaCaseClassSerializer[T]( - classType, - fieldTypes.map(typeInfo => NullableSerializer.wrap(typeInfo.createSerializer(config), true)).toArray + clazz = classType, + scalaFieldSerializers = fieldTypes + .map(typeInfo => NullableSerializer.wrap(typeInfo.createSerializer(config), true)) + .toArray ) } } diff --git a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/option/OptionTypeInfo.scala b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/option/OptionTypeInfo.scala index 9266453faa1..e729d5295b7 100644 --- a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/option/OptionTypeInfo.scala +++ b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/option/OptionTypeInfo.scala @@ -17,6 +17,7 @@ */ package pl.touk.nussknacker.engine.flink.api.typeinfo.option +import com.github.ghik.silencer.silent import org.apache.flink.annotation.{Public, PublicEvolving, VisibleForTesting} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} @@ -59,6 +60,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformatio } @PublicEvolving + @silent("deprecated") def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = { if (elemTypeInfo == null) { // this happens when the type of a DataSet is None, i.e. DataSet[None] diff --git a/engine/flink/scala-utils/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassSerializationTest.scala b/engine/flink/scala-utils/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassSerializationTest.scala index 710d7422438..2b3b21b0dff 100644 --- a/engine/flink/scala-utils/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassSerializationTest.scala +++ b/engine/flink/scala-utils/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassSerializationTest.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInfo import org.apache.flink.api.common.typeutils.TypeSerializer @@ -48,8 +49,11 @@ class CaseClassSerializationTest extends AnyFunSuite with Matchers { deserialized shouldEqual input } + @silent("deprecated") private def getSerializer[T: ClassTag] = - TypeExtractor.getForClass(classTag[T].runtimeClass.asInstanceOf[Class[T]]).createSerializer(executionConfig) + TypeExtractor + .getForClass(classTag[T].runtimeClass.asInstanceOf[Class[T]]) + .createSerializer(executionConfig) private def serializeAndDeserialize[T](serializer: TypeSerializer[T], in: T): T = { val outStream = new ByteArrayOutputStream(bufferSize) diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroSerializersRegistrar.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroSerializersRegistrar.scala index f4f620feac6..ed56603b44c 100644 --- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroSerializersRegistrar.scala +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroSerializersRegistrar.scala @@ -19,11 +19,19 @@ class AvroSerializersRegistrar extends SerializersRegistrar with LazyLogging { override def register(modelConfig: Config, executionConfig: ExecutionConfig): Unit = { logger.debug("Registering default avro serializers") - AvroUtils.getAvroUtils.addAvroSerializersIfRequired(executionConfig, classOf[GenericData.Record]) + registerAvroSerializers(executionConfig) val resolvedKafkaConfig = resolveConfig(modelConfig) registerGenericRecordSchemaIdSerializationForGlobalKafkaConfigIfNeed(resolvedKafkaConfig, executionConfig) } + // protected for overriding for compatibility with Flink < v.1.19 + protected def registerAvroSerializers(executionConfig: ExecutionConfig): Unit = { + AvroUtils.getAvroUtils.addAvroSerializersIfRequired( + executionConfig.getSerializerConfig, + classOf[GenericData.Record] + ) + } + private def resolveConfig(modelConfig: Config): Option[KafkaConfig] = { val componentsConfig = modelConfig.getAs[Map[String, ComponentProviderConfig]]("components").getOrElse(Map.empty) val componentsKafkaConfigs = componentsConfig.toList diff --git a/engine/flink/schemed-kafka-components-utils/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar b/engine/flink/schemed-kafka-components-utils/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar new file mode 100644 index 00000000000..801436509d7 --- /dev/null +++ b/engine/flink/schemed-kafka-components-utils/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar @@ -0,0 +1 @@ +pl.touk.nussknacker.engine.schemedkafka.kryo.AvroSerializersRegistrar diff --git a/engine/flink/test-utils/src/main/resources/docker/Dockerfile b/engine/flink/test-utils/src/main/resources/docker/Dockerfile index 54d78f9838e..a94b1cab8dd 100644 --- a/engine/flink/test-utils/src/main/resources/docker/Dockerfile +++ b/engine/flink/test-utils/src/main/resources/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM flink:1.18.1-scala_2.12-java11 +FROM flink:1.19.1-scala_2.12-java11 COPY entrypointWithIP.sh / COPY conf.yml / diff --git a/engine/flink/test-utils/src/main/resources/docker/conf.yml b/engine/flink/test-utils/src/main/resources/docker/conf.yml index 459d2ff9610..f8c1eabd201 100644 --- a/engine/flink/test-utils/src/main/resources/docker/conf.yml +++ b/engine/flink/test-utils/src/main/resources/docker/conf.yml @@ -2,3 +2,5 @@ high-availability: NONE parallelism.default: 1 taskmanager.memory.jvm-metaspace.size: 512m taskmanager.numberOfTaskSlots: +jobmanager.memory.flink.size: 800m +taskmanager.memory.flink.size: 800m diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/serialization/FlinkTypeInformationSerializationMixin.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/serialization/FlinkTypeInformationSerializationMixin.scala index 8b76fc22e6c..4b62f893468 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/serialization/FlinkTypeInformationSerializationMixin.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/serialization/FlinkTypeInformationSerializationMixin.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.serialization +import com.github.ghik.silencer.silent import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer @@ -11,14 +12,17 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream} trait FlinkTypeInformationSerializationMixin extends Matchers { + @silent("deprecated") protected val executionConfigWithoutKryo: ExecutionConfig = new ExecutionConfig { disableGenericTypes() } + @silent("deprecated") protected val executionConfigWithKryo: ExecutionConfig = new ExecutionConfig { enableGenericTypes() } + @silent("deprecated") protected def getSerializeRoundTrip[T]( record: T, typeInfo: TypeInformation[T], diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkMiniClusterHolder.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkMiniClusterHolder.scala index db2aeca9492..d9aa3b7bd0f 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkMiniClusterHolder.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/FlinkMiniClusterHolder.scala @@ -1,7 +1,8 @@ package pl.touk.nussknacker.engine.flink.test -import java.util.concurrent.CompletableFuture +import com.github.ghik.silencer.silent +import java.util.concurrent.CompletableFuture import org.apache.flink.api.common.{JobID, JobStatus} import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration._ @@ -87,11 +88,12 @@ object FlinkMiniClusterHolder { userFlinkClusterConfig: Configuration, envConfig: AdditionalEnvironmentConfig = AdditionalEnvironmentConfig() ): FlinkMiniClusterHolder = { - userFlinkClusterConfig.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true) + userFlinkClusterConfig.set[java.lang.Boolean](CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true) val resource = prepareMiniClusterResource(userFlinkClusterConfig) new FlinkMiniClusterHolderImpl(resource, userFlinkClusterConfig, envConfig) } + @silent("deprecated") def prepareMiniClusterResource(userFlinkClusterConfig: Configuration): MiniClusterWithClientResource = { val taskManagerNumber = ConfigOptions .key(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER) @@ -112,7 +114,7 @@ object FlinkMiniClusterHolder { detachedClient: Boolean = true, // On the CI, 10 seconds is sometimes too low defaultWaitForStatePatience: PatienceConfig = - PatienceConfig(timeout = scaled(Span(20, Seconds)), interval = scaled(Span(100, Millis))) + PatienceConfig(timeout = scaled(Span(20, Seconds)), interval = scaled(Span(10, Millis))) ) } diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/MiniClusterExecutionEnvironment.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/MiniClusterExecutionEnvironment.scala index 1fb370583b3..4eb932f4426 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/MiniClusterExecutionEnvironment.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/flink/test/MiniClusterExecutionEnvironment.scala @@ -65,7 +65,9 @@ class MiniClusterExecutionEnvironment( jobName: String )(patience: Eventually.PatienceConfig = envConfig.defaultWaitForStatePatience): JobExecutionResult = { val res = execute(jobName) - waitForJobStateWithNotFailingCheck(res.getJobID, jobName, ExecutionState.FINISHED)(patience) + waitForJobStatusWithAdditionalCheck(res.getJobID, jobName, assertJobNotFailing(res.getJobID), JobStatus.FINISHED)( + patience + ) res } @@ -87,6 +89,24 @@ class MiniClusterExecutionEnvironment( waitForJobStateWithAdditionalCheck(jobID, name, {}, expectedState: _*)(patience) } + def waitForJobStatusWithAdditionalCheck( + jobID: JobID, + name: String, + additionalChecks: => Unit, + expectedJobStatus: JobStatus + )( + patience: Eventually.PatienceConfig = envConfig.defaultWaitForStatePatience + ): Unit = { + Eventually.eventually { + val executionGraph = flinkMiniClusterHolder.getExecutionGraph(jobID).get() + additionalChecks + assert( + executionGraph.getState.equals(expectedJobStatus), + s"Job $name does not have expected status: $expectedJobStatus" + ) + }(patience, implicitly[Retrying[Assertion]], implicitly[Position]) + } + def waitForJobStateWithAdditionalCheck( jobID: JobID, name: String, diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/functional/TestReporter.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/functional/TestReporter.scala index 4960158d363..96335c8156e 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/functional/TestReporter.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/functional/TestReporter.scala @@ -15,7 +15,7 @@ import java.util.Properties object TestReporterUtil { def configWithTestMetrics(name: String, c: Configuration = new Configuration()): Configuration = { - c.setString(MetricOptions.REPORTERS_LIST, "test") + c.set(MetricOptions.REPORTERS_LIST, "test") c.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.factory.class", classOf[TestReporterFactory].getName) c.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.name", name) c diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala index 2e3ab2fd555..8a564ffe275 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.process.helpers import cats.data.Validated.Valid import cats.data.ValidatedNel +import com.github.ghik.silencer.silent import io.circe.Json import io.circe.generic.JsonCodec import org.apache.flink.api.common.eventtime.WatermarkStrategy @@ -13,7 +14,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, OneInputStreamOperator} import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.runtime.streamrecord.{RecordAttributes, StreamRecord} import org.apache.flink.util.Collector import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent @@ -406,6 +407,8 @@ object SampleNodes { val outputResult = new StreamRecord[ValueWithContext[AnyRef]](valueWithContext, timestampToSet) output.collect(outputResult) } + override def processRecordAttributes(recordAttributes: RecordAttributes): Unit = + super.processRecordAttributes(recordAttributes) } str.transform("collectTimestammp", ctx.valueWithContextInfo.forUnknown, streamOperator) } @@ -482,6 +485,7 @@ object SampleNodes { object TransformerWithTime extends CustomStreamTransformer with Serializable { + @silent("deprecated") @MethodToInvoke def execute(@OutputVariableName outputVarName: String, @ParamName("seconds") seconds: Int)( implicit nodeId: NodeId diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala new file mode 100644 index 00000000000..ba95fb8d229 --- /dev/null +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/NkGlobalParametersEncoderTest.scala @@ -0,0 +1,58 @@ +package pl.touk.nussknacker.defaultmodel + +import org.apache.flink.api.common.ExecutionConfig +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} +import pl.touk.nussknacker.engine.flink.api.{ConfigGlobalParameters, NamespaceMetricsTags, NkGlobalParameters} + +class NkGlobalParametersEncoderTest extends AnyFunSuite with Matchers { + + test("global parameters set and read from context are equal") { + val globalParamsWithAllOptionalValues = NkGlobalParameters( + buildInfo = "aBuildInfo", + processVersion = ProcessVersion( + VersionId.initialVersionId, + ProcessName("aProcessName"), + ProcessId("1"), + "aUser", + Some(1) + ), + configParameters = Some(ConfigGlobalParameters(Some(true), Some(true), Some(true))), + namespaceParameters = Some(NamespaceMetricsTags(Map("metricTag" -> "metricVal"))), + additionalInformation = Map("additionalInfoKey" -> "additionalInfoVal") + ) + + val globalParamsWithNoOptionalValues = NkGlobalParameters( + buildInfo = "aBuildInfo", + processVersion = ProcessVersion( + VersionId.initialVersionId, + ProcessName("aProcessName"), + ProcessId("1"), + "aUser", + None + ), + configParameters = None, + namespaceParameters = None, + additionalInformation = Map.empty + ) + + List(globalParamsWithAllOptionalValues, globalParamsWithNoOptionalValues).foreach { params => + val ec = new ExecutionConfig() + ec.setGlobalJobParameters(params) + val globalParamsFromEc = NkGlobalParameters.readFromContext(ec).get + + params.buildInfo shouldBe globalParamsFromEc.buildInfo + params.processVersion shouldBe globalParamsFromEc.processVersion + params.configParameters shouldBe globalParamsFromEc.configParameters + params.namespaceParameters shouldBe globalParamsFromEc.namespaceParameters + params.additionalInformation shouldBe globalParamsFromEc.additionalInformation + } + } + + test("returns None when context doesnt have required parameters") { + NkGlobalParameters.readFromContext(new ExecutionConfig()) shouldBe None + } + +} diff --git a/examples/installation/docker-compose.yml b/examples/installation/docker-compose.yml index a730ace492e..a8e977f117a 100644 --- a/examples/installation/docker-compose.yml +++ b/examples/installation/docker-compose.yml @@ -210,7 +210,7 @@ services: build: context: ./flink/ args: - FLINK_VERSION: "1.18.1-scala_2.12-java11" + FLINK_VERSION: "1.19.1-scala_2.12-java11" restart: unless-stopped command: jobmanager environment: @@ -226,7 +226,7 @@ services: build: context: ./flink/ args: - FLINK_VERSION: "1.18.1-scala_2.12-java11" + FLINK_VERSION: "1.19.1-scala_2.12-java11" restart: unless-stopped command: taskmanager environment: diff --git a/examples/installation/flink/flink-properties.yml b/examples/installation/flink/flink-properties.yml index 12f45da9518..dd0b7fed688 100644 --- a/examples/installation/flink/flink-properties.yml +++ b/examples/installation/flink/flink-properties.yml @@ -1,6 +1,5 @@ taskmanager.numberOfTaskSlots: 8 - -state.backend: filesystem +state.backend.type: filesystem state.checkpoints.dir: file:///opt/flink/data/checkpoints state.savepoints.dir: file:///opt/flink/data/savepoints @@ -21,8 +20,8 @@ metrics.reporter.influxdb_reporter.port: 8087 metrics.reporter.influxdb_reporter.db: nussknacker_metrics metrics.reporter.influxdb_reporter.scope.variables.excludes: tm_id;job_id;task_id;task_attempt_id;operator_id;task_attempt_num;task_name metrics.scope.jm: local..jobmanagerGlobal -metrics.scope.jm.job: local..jobmanagerJob. +metrics.scope.jm-job: local..jobmanagerJob. metrics.scope.tm: local..taskmanagerGlobal. -metrics.scope.tm.job: local..taskmanagerJob.. +metrics.scope.tm-job: local..taskmanagerJob.. metrics.scope.task: local..taskmanagerTask.... metrics.scope.operator: local..taskmanagerTask....