From eb25ad96d1350a1847ee1dfba07c6b31337036ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Solarski?= Date: Tue, 3 Dec 2024 12:35:51 +0100 Subject: [PATCH] Savepoint deserialization fixup - The class is an inner class, but not statically accessible. (#7270) * Savepoint deserialization fixup - The class 'pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedScalaMapSerializer$$anon$1' is not instantiable: The class is an inner class, but not statically accessible. * Changelog entry added --- docs/Changelog.md | 1 + ...gResultAwareTypeInformationDetection.scala | 15 +----- .../TypedJavaMapBasedTypeInformation.scala | 20 +++---- .../TypedObjectBasedTypeInformation.scala | 53 ++++++++++++------- .../TypedScalaMapBasedTypeInformation.scala | 20 +++---- ...ultAwareTypeInformationDetectionSpec.scala | 17 +++--- 6 files changed, 57 insertions(+), 69 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index 8f928107a9c..2e945a86e9e 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -24,6 +24,7 @@ * [#7237](https://github.com/TouK/nussknacker/pull/7237) Fix: ToJsonEncoder keeps order fields during encoding map * [#7240](https://github.com/TouK/nussknacker/pull/7240) Fixed race condition problem during SpEL expression evaluation * [#7269](https://github.com/TouK/nussknacker/pull/7269) Fixed focus scrolling in expression editor +* [#7270](https://github.com/TouK/nussknacker/pull/7270) Savepoint deserialization fixup - some taken savepoints (e.g. for scenarios with async enrichers) were not deserializable which led to errors during redeployments on Flink ### 1.18.0 (22 November 2024) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala index 131137fd1bd..7d89fb427cd 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala @@ -1,7 +1,6 @@ package pl.touk.nussknacker.engine.process.typeinformation import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} -import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.api.java.typeutils.{ListTypeInfo, MapTypeInfo, MultisetTypeInfo, RowTypeInfo} import org.apache.flink.types.Row import pl.touk.nussknacker.engine.api.context.ValidationContext @@ -99,20 +98,10 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection } private def createScalaMapTypeInformation(typingResult: TypedObjectTypingResult) = - TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult) + TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType)) private def createJavaMapTypeInformation(typingResult: TypedObjectTypingResult) = - TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult) - - protected def constructIntermediateCompatibilityResult( - newNestedSerializers: Array[TypeSerializer[_]], - oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] - ): CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] = { - CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult( - newNestedSerializers.map(_.snapshotConfiguration()), - oldNestedSerializerSnapshots - ) - } + TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType)) def forValueWithContext[T]( validationContext: ValidationContext, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala index c2f7fd96633..df03b2c5c15 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala @@ -3,42 +3,36 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject import java.{util => jutil} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} -import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult case class TypedJavaMapTypeInformation( - informations: Map[String, TypeInformation[_]], - buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult + informations: Map[String, TypeInformation[_]] ) extends TypedObjectBasedTypeInformation[jutil.Map[String, AnyRef]](informations) { override def createSerializer( serializers: Array[(String, TypeSerializer[_])] ): TypeSerializer[jutil.Map[String, AnyRef]] = - TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) + TypedJavaMapSerializer(serializers) } @SerialVersionUID(1L) case class TypedJavaMapSerializer( - override val serializers: Array[(String, TypeSerializer[_])], - override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult + override val serializers: Array[(String, TypeSerializer[_])] ) extends TypedObjectBasedTypeSerializer[jutil.Map[String, AnyRef]](serializers) with BaseJavaMapBasedSerializer[AnyRef, jutil.Map[String, AnyRef]] { override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[jutil.Map[String, AnyRef]] = - TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) + TypedJavaMapSerializer(serializers) override def createInstance(): jutil.Map[String, AnyRef] = new jutil.HashMap() override def snapshotConfiguration( snapshots: Array[(String, TypeSerializerSnapshot[_])] - ): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) { - override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult = - buildIntermediateSchemaCompatibilityResultFunction - } + ): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) } -abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] { +final class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] { def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = { this() @@ -48,6 +42,6 @@ abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializer override protected def restoreSerializer( restored: Array[(String, TypeSerializer[_])] ): TypeSerializer[jutil.Map[String, AnyRef]] = - TypedJavaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult) + TypedJavaMapSerializer(restored) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala index 530cba98149..e7ca4c1e5a8 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala @@ -4,6 +4,7 @@ import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult import org.apache.flink.api.common.typeutils.{ CompositeTypeSerializerUtil, TypeSerializer, @@ -11,7 +12,6 @@ import org.apache.flink.api.common.typeutils.{ TypeSerializerSnapshot } import org.apache.flink.core.memory.{DataInputView, DataOutputView} -import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult import scala.reflect.ClassTag @@ -57,15 +57,6 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[ def createSerializer(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T] } -object TypedObjectBasedTypeInformation { - - type BuildIntermediateSchemaCompatibilityResult = ( - Array[TypeSerializer[_]], - Array[TypeSerializerSnapshot[_]] - ) => CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] - -} - //We use Array instead of List here, as we need access by index, which is faster for array abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, TypeSerializer[_])]) extends TypeSerializer[T] @@ -132,17 +123,13 @@ abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T] - def buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult } abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnapshot[T] with LazyLogging { - protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _ + private val constructIntermediateCompatibilityResultMethodName = "constructIntermediateCompatibilityResult" - def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = { - this() - this.serializersSnapshots = serializers - } + protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _ override def getCurrentVersion: Int = 1 @@ -182,10 +169,10 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps val newKeys = newSerializers.map(_._1) val commons = currentKeys.intersect(newKeys) - val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1)) - val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1)) + val newSerializersToUse: Array[(String, TypeSerializer[_])] = newSerializers.filter(k => commons.contains(k._1)) + val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1)) - val fieldsCompatibility = buildIntermediateSchemaCompatibilityResult( + val fieldsCompatibility = constructIntermediateCompatibilityResultProxied( newSerializersToUse.map(_._2), snapshotsToUse.map(_._2) ) @@ -237,7 +224,33 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps } } - val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult + private def constructIntermediateCompatibilityResultProxied( + newNestedSerializers: Array[TypeSerializer[_]], + nestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] + ): IntermediateCompatibilityResult[_] = { + // signature of CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult has been changed between flink 1.18/1.19 + // Because of contract of serialization/deserialization of TypeSerializerSnapshot in can't be easily provided by TypeInformationDetection SPI mechanism + try { + val newMethod = classOf[CompositeTypeSerializerUtil].getMethod( + constructIntermediateCompatibilityResultMethodName, + classOf[Array[TypeSerializerSnapshot[_]]], + classOf[Array[TypeSerializerSnapshot[_]]] + ) + newMethod + .invoke(null, newNestedSerializers.map(_.snapshotConfiguration()), nestedSerializerSnapshots) + .asInstanceOf[IntermediateCompatibilityResult[_]] + } catch { + case _: NoSuchMethodException => + val oldMethod = classOf[CompositeTypeSerializerUtil].getMethod( + constructIntermediateCompatibilityResultMethodName, + classOf[Array[TypeSerializer[_]]], + classOf[Array[TypeSerializerSnapshot[_]]] + ) + oldMethod + .invoke(null, newNestedSerializers, nestedSerializerSnapshots) + .asInstanceOf[IntermediateCompatibilityResult[_]] + } + } override def restoreSerializer(): TypeSerializer[T] = restoreSerializer(serializersSnapshots.map { case (k, snapshot) => (k, snapshot.restoreSerializer()) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala index b987369b534..4b204c0418a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala @@ -3,24 +3,21 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} -import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult case class TypedScalaMapTypeInformation( - informations: Map[String, TypeInformation[_]], - buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult + informations: Map[String, TypeInformation[_]] ) extends TypedObjectBasedTypeInformation[Map[String, _ <: AnyRef]](informations) { override def createSerializer( serializers: Array[(String, TypeSerializer[_])] ): TypeSerializer[Map[String, _ <: AnyRef]] = - TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) + TypedScalaMapSerializer(serializers) } @SerialVersionUID(1L) case class TypedScalaMapSerializer( - override val serializers: Array[(String, TypeSerializer[_])], - override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult + override val serializers: Array[(String, TypeSerializer[_])] ) extends TypedObjectBasedTypeSerializer[Map[String, _ <: AnyRef]](serializers) with LazyLogging { @@ -36,20 +33,17 @@ case class TypedScalaMapSerializer( override def get(value: Map[String, _ <: AnyRef], k: String): AnyRef = value.getOrElse(k, null) override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[Map[String, _ <: AnyRef]] = - TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction) + TypedScalaMapSerializer(serializers) override def createInstance(): Map[String, _ <: AnyRef] = Map.empty override def snapshotConfiguration( snapshots: Array[(String, TypeSerializerSnapshot[_])] - ): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) { - override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult = - buildIntermediateSchemaCompatibilityResultFunction - } + ): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) } -abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] { +final class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] { def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = { this() @@ -59,6 +53,6 @@ abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerialize override protected def restoreSerializer( restored: Array[(String, TypeSerializer[_])] ): TypeSerializer[Map[String, _ <: AnyRef]] = - TypedScalaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult) + TypedScalaMapSerializer(restored) } diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala index 8ed2795c1d6..f0b9b72f8c7 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala @@ -264,20 +264,17 @@ class TypingResultAwareTypeInformationDetectionSpec } private def assertNested(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_] => Assertion)*): Unit = { - inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { - case TypedScalaMapSerializer(array, _) => - array.zipAll(nested.toList, null, null).foreach { - case ((name, serializer), (expectedName, expectedSerializer)) => - name shouldBe expectedName - expectedSerializer(serializer) - } + inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) => + array.zipAll(nested.toList, null, null).foreach { case ((name, serializer), (expectedName, expectedSerializer)) => + name shouldBe expectedName + expectedSerializer(serializer) + } } } private def assertMapSerializers(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_])*) = { - inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { - case TypedScalaMapSerializer(array, _) => - array.toList shouldBe nested.toList + inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) => + array.toList shouldBe nested.toList } }