Skip to content

Commit

Permalink
add proper SPI instead of having to patch
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Sep 13, 2024
1 parent ed5a6e7 commit b5079ad
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 13 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ lazy val flinkSchemedKafkaComponentsUtils = (project in flink("schemed-kafka-com
.dependsOn(
schemedKafkaComponentsUtils % "compile;test->test",
flinkKafkaComponentsUtils,
utilsInternal,
flinkExtensionsApi % Provided,
flinkComponentsUtils % Provided,
componentsUtils % Provided,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult
import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot}

private[typedobject] object CompositeTypeSerializerUtilCompatibilityLayer {
trait CompositeTypeSerializerUtil {

def constructIntermediateCompatibilityResult[T](
newNestedSerializerSnapshots: Array[TypeSerializer[_]],
oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
): IntermediateCompatibilityResult[T]

}

object DefaultCompositeTypeSerializerUtil extends CompositeTypeSerializerUtil {

override def constructIntermediateCompatibilityResult[T](
newNestedSerializerSnapshots: Array[TypeSerializer[_]],
oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
): IntermediateCompatibilityResult[T] = {
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
newNestedSerializerSnapshots.map(_.snapshotConfiguration()),
Expand All @@ -16,3 +25,5 @@ private[typedobject] object CompositeTypeSerializerUtilCompatibilityLayer {
}

}

trait CompositeTypeSerializerUtilCompatibilityProvider extends CompositeTypeSerializerUtil
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{
CompositeTypeSerializerUtil,
TypeSerializer,
TypeSerializerSchemaCompatibility,
TypeSerializerSnapshot
}
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader

import scala.reflect.ClassTag

Expand Down Expand Up @@ -173,10 +169,14 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1))
val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))

val fieldsCompatibility = CompositeTypeSerializerUtilCompatibilityLayer.constructIntermediateCompatibilityResult(
newSerializersToUse.map(_._2),
snapshotsToUse.map(_._2)
)
val fieldsCompatibility = ScalaServiceLoader
.load[CompositeTypeSerializerUtilCompatibilityProvider](getClass.getClassLoader)
.headOption
.getOrElse(DefaultCompositeTypeSerializerUtil)
.constructIntermediateCompatibilityResult(
newSerializersToUse.map(_._2),
snapshotsToUse.map(_._2)
)

// We construct detailed message to show when there are compatibility issues
def fieldsCompatibilityMessage: String = newSerializersToUse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,26 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClie
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.GenericRecordSchemaIdSerializationSupport
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.util.loader.ScalaServiceLoader

// We need it because we use avro records inside our Context class
class AvroSerializersRegistrar extends SerializersRegistrar with LazyLogging {

override def register(modelConfig: Config, executionConfig: ExecutionConfig): Unit = {
logger.debug("Registering default avro serializers")
AvroUtilsCompatibilityLayer.addAvroSerializersIfRequired(executionConfig)
registerAvroSerializers(executionConfig)
val resolvedKafkaConfig = resolveConfig(modelConfig)
registerGenericRecordSchemaIdSerializationForGlobalKafkaConfigIfNeed(resolvedKafkaConfig, executionConfig)
}

private def registerAvroSerializers(executionConfig: ExecutionConfig): Unit = {
val avroUtilsProvider = ScalaServiceLoader
.load[AvroUtilsCompatibilityProvider](getClass.getClassLoader)
.headOption
.getOrElse(DefaultAvroUtils)
avroUtilsProvider.addAvroSerializersIfRequired(executionConfig)
}

private def resolveConfig(modelConfig: Config): Option[KafkaConfig] = {
val componentsConfig = modelConfig.getAs[Map[String, ComponentProviderConfig]]("components").getOrElse(Map.empty)
val componentsKafkaConfigs = componentsConfig.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import org.apache.avro.generic.GenericData
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.AvroUtils

private[kryo] object AvroUtilsCompatibilityLayer {
trait AvroUtils {
def addAvroSerializersIfRequired(executionConfig: ExecutionConfig): Unit
}

trait AvroUtilsCompatibilityProvider extends AvroUtils

object DefaultAvroUtils extends AvroUtils {

def addAvroSerializersIfRequired(executionConfig: ExecutionConfig): Unit = {
AvroUtils.getAvroUtils.addAvroSerializersIfRequired(
Expand Down

0 comments on commit b5079ad

Please sign in to comment.