From b5079ad40a99809b83f0e363efa19db863b6fef4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Fri, 13 Sep 2024 14:02:40 +0200 Subject: [PATCH] add proper SPI instead of having to patch --- build.sbt | 1 + ...SerializerUtilCompatibilityProvider.scala} | 13 +++++++++++- .../TypedObjectBasedTypeInformation.scala | 20 +++++++++---------- .../kryo/AvroSerializersRegistrar.scala | 11 +++++++++- ...a => AvroUtilsCompatibilityProvider.scala} | 8 +++++++- 5 files changed, 40 insertions(+), 13 deletions(-) rename engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/{CompositeTypeSerializerUtilCompatibilityLayer.scala => CompositeTypeSerializerUtilCompatibilityProvider.scala} (61%) rename engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/{AvroUtilsCompatibilityLayer.scala => AvroUtilsCompatibilityProvider.scala} (68%) diff --git a/build.sbt b/build.sbt index 64638f18eb7..e391bd485fd 100644 --- a/build.sbt +++ b/build.sbt @@ -979,6 +979,7 @@ lazy val flinkSchemedKafkaComponentsUtils = (project in flink("schemed-kafka-com .dependsOn( schemedKafkaComponentsUtils % "compile;test->test", flinkKafkaComponentsUtils, + utilsInternal, flinkExtensionsApi % Provided, flinkComponentsUtils % Provided, componentsUtils % Provided, diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/CompositeTypeSerializerUtilCompatibilityLayer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/CompositeTypeSerializerUtilCompatibilityProvider.scala similarity index 61% rename from engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/CompositeTypeSerializerUtilCompatibilityLayer.scala rename to engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/CompositeTypeSerializerUtilCompatibilityProvider.scala index f9d151bb281..52bf781a401 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/CompositeTypeSerializerUtilCompatibilityLayer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/CompositeTypeSerializerUtilCompatibilityProvider.scala @@ -3,11 +3,20 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot} -private[typedobject] object CompositeTypeSerializerUtilCompatibilityLayer { +trait CompositeTypeSerializerUtil { def constructIntermediateCompatibilityResult[T]( newNestedSerializerSnapshots: Array[TypeSerializer[_]], oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] + ): IntermediateCompatibilityResult[T] + +} + +object DefaultCompositeTypeSerializerUtil extends CompositeTypeSerializerUtil { + + override def constructIntermediateCompatibilityResult[T]( + newNestedSerializerSnapshots: Array[TypeSerializer[_]], + oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] ): IntermediateCompatibilityResult[T] = { CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult( newNestedSerializerSnapshots.map(_.snapshotConfiguration()), @@ -16,3 +25,5 @@ private[typedobject] object CompositeTypeSerializerUtilCompatibilityLayer { } } + +trait CompositeTypeSerializerUtilCompatibilityProvider extends CompositeTypeSerializerUtil diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala index d7282675462..879647a81c7 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala @@ -4,13 +4,9 @@ import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.{ - CompositeTypeSerializerUtil, - TypeSerializer, - TypeSerializerSchemaCompatibility, - TypeSerializerSnapshot -} +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader import scala.reflect.ClassTag @@ -173,10 +169,14 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1)) val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1)) - val fieldsCompatibility = CompositeTypeSerializerUtilCompatibilityLayer.constructIntermediateCompatibilityResult( - newSerializersToUse.map(_._2), - snapshotsToUse.map(_._2) - ) + val fieldsCompatibility = ScalaServiceLoader + .load[CompositeTypeSerializerUtilCompatibilityProvider](getClass.getClassLoader) + .headOption + .getOrElse(DefaultCompositeTypeSerializerUtil) + .constructIntermediateCompatibilityResult( + newSerializersToUse.map(_._2), + snapshotsToUse.map(_._2) + ) // We construct detailed message to show when there are compatibility issues def fieldsCompatibilityMessage: String = newSerializersToUse diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroSerializersRegistrar.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroSerializersRegistrar.scala index 233b6e6e601..751efea5b35 100644 --- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroSerializersRegistrar.scala +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroSerializersRegistrar.scala @@ -11,17 +11,26 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClie import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.GenericRecordSchemaIdSerializationSupport import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaRegistryClientFactory +import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader // We need it because we use avro records inside our Context class class AvroSerializersRegistrar extends SerializersRegistrar with LazyLogging { override def register(modelConfig: Config, executionConfig: ExecutionConfig): Unit = { logger.debug("Registering default avro serializers") - AvroUtilsCompatibilityLayer.addAvroSerializersIfRequired(executionConfig) + registerAvroSerializers(executionConfig) val resolvedKafkaConfig = resolveConfig(modelConfig) registerGenericRecordSchemaIdSerializationForGlobalKafkaConfigIfNeed(resolvedKafkaConfig, executionConfig) } + private def registerAvroSerializers(executionConfig: ExecutionConfig): Unit = { + val avroUtilsProvider = ScalaServiceLoader + .load[AvroUtilsCompatibilityProvider](getClass.getClassLoader) + .headOption + .getOrElse(DefaultAvroUtils) + avroUtilsProvider.addAvroSerializersIfRequired(executionConfig) + } + private def resolveConfig(modelConfig: Config): Option[KafkaConfig] = { val componentsConfig = modelConfig.getAs[Map[String, ComponentProviderConfig]]("components").getOrElse(Map.empty) val componentsKafkaConfigs = componentsConfig.toList diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroUtilsCompatibilityLayer.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroUtilsCompatibilityProvider.scala similarity index 68% rename from engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroUtilsCompatibilityLayer.scala rename to engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroUtilsCompatibilityProvider.scala index b96abfe39c4..4efde744765 100644 --- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroUtilsCompatibilityLayer.scala +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/kryo/AvroUtilsCompatibilityProvider.scala @@ -4,7 +4,13 @@ import org.apache.avro.generic.GenericData import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.java.typeutils.AvroUtils -private[kryo] object AvroUtilsCompatibilityLayer { +trait AvroUtils { + def addAvroSerializersIfRequired(executionConfig: ExecutionConfig): Unit +} + +trait AvroUtilsCompatibilityProvider extends AvroUtils + +object DefaultAvroUtils extends AvroUtils { def addAvroSerializersIfRequired(executionConfig: ExecutionConfig): Unit = { AvroUtils.getAvroUtils.addAvroSerializersIfRequired(