Skip to content

Commit

Permalink
Savepoint deserialization fixup - The class is an inner class, but no…
Browse files Browse the repository at this point in the history
…t statically accessible. (#7270) (#7272)
  • Loading branch information
raphaelsolarski authored Dec 3, 2024
1 parent 0801385 commit 7642e4d
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 69 deletions.
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [#7237](https://github.com/TouK/nussknacker/pull/7237) Fix: ToJsonEncoder keeps order fields during encoding map
* [#7240](https://github.com/TouK/nussknacker/pull/7240) Fixed race condition problem during SpEL expression evaluation
* [#7269](https://github.com/TouK/nussknacker/pull/7269) Fixed focus scrolling in expression editor
* [#7270](https://github.com/TouK/nussknacker/pull/7270) Savepoint deserialization fixup - some taken savepoints (e.g. for scenarios with async enrichers) were not deserializable which led to errors during redeployments on Flink

### 1.18.0 (22 November 2024)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.process.typeinformation

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.api.java.typeutils.{ListTypeInfo, MapTypeInfo, MultisetTypeInfo, RowTypeInfo}
import org.apache.flink.types.Row
import pl.touk.nussknacker.engine.api.context.ValidationContext
Expand Down Expand Up @@ -99,20 +98,10 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection
}

private def createScalaMapTypeInformation(typingResult: TypedObjectTypingResult) =
TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult)
TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType))

private def createJavaMapTypeInformation(typingResult: TypedObjectTypingResult) =
TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult)

protected def constructIntermediateCompatibilityResult(
newNestedSerializers: Array[TypeSerializer[_]],
oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
): CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] = {
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
newNestedSerializers.map(_.snapshotConfiguration()),
oldNestedSerializerSnapshots
)
}
TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType))

def forValueWithContext[T](
validationContext: ValidationContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,36 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
import java.{util => jutil}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult

case class TypedJavaMapTypeInformation(
informations: Map[String, TypeInformation[_]],
buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
informations: Map[String, TypeInformation[_]]
) extends TypedObjectBasedTypeInformation[jutil.Map[String, AnyRef]](informations) {

override def createSerializer(
serializers: Array[(String, TypeSerializer[_])]
): TypeSerializer[jutil.Map[String, AnyRef]] =
TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
TypedJavaMapSerializer(serializers)

}

@SerialVersionUID(1L)
case class TypedJavaMapSerializer(
override val serializers: Array[(String, TypeSerializer[_])],
override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
override val serializers: Array[(String, TypeSerializer[_])]
) extends TypedObjectBasedTypeSerializer[jutil.Map[String, AnyRef]](serializers)
with BaseJavaMapBasedSerializer[AnyRef, jutil.Map[String, AnyRef]] {

override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[jutil.Map[String, AnyRef]] =
TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
TypedJavaMapSerializer(serializers)

override def createInstance(): jutil.Map[String, AnyRef] = new jutil.HashMap()

override def snapshotConfiguration(
snapshots: Array[(String, TypeSerializerSnapshot[_])]
): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) {
override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult =
buildIntermediateSchemaCompatibilityResultFunction
}
): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots)

}

abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] {
final class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] {

def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
this()
Expand All @@ -48,6 +42,6 @@ abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializer
override protected def restoreSerializer(
restored: Array[(String, TypeSerializer[_])]
): TypeSerializer[jutil.Map[String, AnyRef]] =
TypedJavaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult)
TypedJavaMapSerializer(restored)

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult
import org.apache.flink.api.common.typeutils.{
CompositeTypeSerializerUtil,
TypeSerializer,
TypeSerializerSchemaCompatibility,
TypeSerializerSnapshot
}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult

import scala.reflect.ClassTag

Expand Down Expand Up @@ -57,15 +57,6 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[
def createSerializer(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T]
}

object TypedObjectBasedTypeInformation {

type BuildIntermediateSchemaCompatibilityResult = (
Array[TypeSerializer[_]],
Array[TypeSerializerSnapshot[_]]
) => CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing]

}

//We use Array instead of List here, as we need access by index, which is faster for array
abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, TypeSerializer[_])])
extends TypeSerializer[T]
Expand Down Expand Up @@ -132,17 +123,13 @@ abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String,

def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T]

def buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
}

abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnapshot[T] with LazyLogging {

protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _
private val constructIntermediateCompatibilityResultMethodName = "constructIntermediateCompatibilityResult"

def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
this()
this.serializersSnapshots = serializers
}
protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _

override def getCurrentVersion: Int = 1

Expand Down Expand Up @@ -182,10 +169,10 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
val newKeys = newSerializers.map(_._1)
val commons = currentKeys.intersect(newKeys)

val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1))
val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))
val newSerializersToUse: Array[(String, TypeSerializer[_])] = newSerializers.filter(k => commons.contains(k._1))
val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))

val fieldsCompatibility = buildIntermediateSchemaCompatibilityResult(
val fieldsCompatibility = constructIntermediateCompatibilityResultProxied(
newSerializersToUse.map(_._2),
snapshotsToUse.map(_._2)
)
Expand Down Expand Up @@ -237,7 +224,33 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
}
}

val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult
private def constructIntermediateCompatibilityResultProxied(
newNestedSerializers: Array[TypeSerializer[_]],
nestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
): IntermediateCompatibilityResult[_] = {
// signature of CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult has been changed between flink 1.18/1.19
// Because of contract of serialization/deserialization of TypeSerializerSnapshot in can't be easily provided by TypeInformationDetection SPI mechanism
try {
val newMethod = classOf[CompositeTypeSerializerUtil].getMethod(
constructIntermediateCompatibilityResultMethodName,
classOf[Array[TypeSerializerSnapshot[_]]],
classOf[Array[TypeSerializerSnapshot[_]]]
)
newMethod
.invoke(null, newNestedSerializers.map(_.snapshotConfiguration()), nestedSerializerSnapshots)
.asInstanceOf[IntermediateCompatibilityResult[_]]
} catch {
case _: NoSuchMethodException =>
val oldMethod = classOf[CompositeTypeSerializerUtil].getMethod(
constructIntermediateCompatibilityResultMethodName,
classOf[Array[TypeSerializer[_]]],
classOf[Array[TypeSerializerSnapshot[_]]]
)
oldMethod
.invoke(null, newNestedSerializers, nestedSerializerSnapshots)
.asInstanceOf[IntermediateCompatibilityResult[_]]
}
}

override def restoreSerializer(): TypeSerializer[T] = restoreSerializer(serializersSnapshots.map {
case (k, snapshot) => (k, snapshot.restoreSerializer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,21 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult

case class TypedScalaMapTypeInformation(
informations: Map[String, TypeInformation[_]],
buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
informations: Map[String, TypeInformation[_]]
) extends TypedObjectBasedTypeInformation[Map[String, _ <: AnyRef]](informations) {

override def createSerializer(
serializers: Array[(String, TypeSerializer[_])]
): TypeSerializer[Map[String, _ <: AnyRef]] =
TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
TypedScalaMapSerializer(serializers)

}

@SerialVersionUID(1L)
case class TypedScalaMapSerializer(
override val serializers: Array[(String, TypeSerializer[_])],
override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
override val serializers: Array[(String, TypeSerializer[_])]
) extends TypedObjectBasedTypeSerializer[Map[String, _ <: AnyRef]](serializers)
with LazyLogging {

Expand All @@ -36,20 +33,17 @@ case class TypedScalaMapSerializer(
override def get(value: Map[String, _ <: AnyRef], k: String): AnyRef = value.getOrElse(k, null)

override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[Map[String, _ <: AnyRef]] =
TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
TypedScalaMapSerializer(serializers)

override def createInstance(): Map[String, _ <: AnyRef] = Map.empty

override def snapshotConfiguration(
snapshots: Array[(String, TypeSerializerSnapshot[_])]
): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) {
override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult =
buildIntermediateSchemaCompatibilityResultFunction
}
): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots)

}

abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] {
final class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] {

def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
this()
Expand All @@ -59,6 +53,6 @@ abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerialize
override protected def restoreSerializer(
restored: Array[(String, TypeSerializer[_])]
): TypeSerializer[Map[String, _ <: AnyRef]] =
TypedScalaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult)
TypedScalaMapSerializer(restored)

}
Original file line number Diff line number Diff line change
Expand Up @@ -264,20 +264,17 @@ class TypingResultAwareTypeInformationDetectionSpec
}

private def assertNested(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_] => Assertion)*): Unit = {
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) {
case TypedScalaMapSerializer(array, _) =>
array.zipAll(nested.toList, null, null).foreach {
case ((name, serializer), (expectedName, expectedSerializer)) =>
name shouldBe expectedName
expectedSerializer(serializer)
}
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) =>
array.zipAll(nested.toList, null, null).foreach { case ((name, serializer), (expectedName, expectedSerializer)) =>
name shouldBe expectedName
expectedSerializer(serializer)
}
}
}

private def assertMapSerializers(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_])*) = {
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) {
case TypedScalaMapSerializer(array, _) =>
array.toList shouldBe nested.toList
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) =>
array.toList shouldBe nested.toList
}
}

Expand Down

0 comments on commit 7642e4d

Please sign in to comment.