Skip to content

Commit

Permalink
CaseClassSerializer additional logging
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Oct 24, 2024
1 parent 2b3f1c1 commit 2b02c2c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
9 changes: 5 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1181,10 +1181,11 @@ lazy val flinkScalaUtils = (project in flink("scala-utils"))
name := "nussknacker-flink-scala-utils",
libraryDependencies ++= {
Seq(
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
"org.scalatest" %% "scalatest" % scalaTestV % Test,
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV % Provided,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
"org.scalatest" %% "scalatest" % scalaTestV % Test,
) ++ flinkLibScalaDeps(scalaVersion.value, Some("provided"))
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.types.NullFieldException

import scala.tools.nsc.interpreter.Results.Success
import scala.util.{Failure, Success, Try}

/**
* Serializer for Case Classes. Creation and access is different from our Java Tuples so we have to
* treat them differently.
Expand All @@ -31,6 +35,7 @@ import org.apache.flink.types.NullFieldException
@SerialVersionUID(7341356073446263475L)
abstract class CaseClassSerializer[T <: Product](clazz: Class[T], scalaFieldSerializers: Array[TypeSerializer[_]])
extends TupleSerializerBase[T](clazz, scalaFieldSerializers)
with LazyLogging
with Cloneable {

@transient var fields: Array[AnyRef] = _
Expand Down Expand Up @@ -122,7 +127,12 @@ abstract class CaseClassSerializer[T <: Product](clazz: Class[T], scalaFieldSeri
fields(i) = fieldSerializers(i).deserialize(source)
i += 1
}
createInstance(fields)

Try(createInstance(fields)) match {
case Success(value) => value
case Failure(exc) =>
throw new IllegalArgumentException(s"Failed to deserialize in class: ${clazz.getName}.", exc)
}
}

private def initArray() = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.typeutils._
import pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass.ScalaCaseClassSerializer.lookupConstructor
import pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass.ScalaCaseClassSerializer.{logger, lookupConstructor}

import java.io.{ObjectInputStream, ObjectStreamClass}
import scala.reflect.runtime.universe
import scala.util.{Failure, Success, Try}

/**
* This is a non macro-generated, concrete Scala case class serializer.
Expand Down Expand Up @@ -57,7 +59,7 @@ class ScalaCaseClassSerializer[T <: Product](

}

object ScalaCaseClassSerializer {
object ScalaCaseClassSerializer extends LazyLogging {

def lookupConstructor[T](cls: Class[T]): Array[AnyRef] => T = {
val rootMirror = universe.runtimeMirror(cls.getClassLoader)
Expand Down Expand Up @@ -87,7 +89,16 @@ object ScalaCaseClassSerializer {
val constructorMethodMirror = classMirror.reflectConstructor(primaryConstructorSymbol)

arr: Array[AnyRef] => {
constructorMethodMirror.apply(arr.toIndexedSeq: _*).asInstanceOf[T]
Try(constructorMethodMirror.apply(arr.toIndexedSeq: _*).asInstanceOf[T]) match {
case Success(value) => value
case Failure(exc) =>
val message = s"Constructor invoke failed, class: ${cls.getName}," +
s"constructorMethodMirror: ${constructorMethodMirror.getClass.getName}" +
s"arr: ${arr.mkString("Array(", ", ", ")")}" +
s"class symbol: ${classSymbol.getClass.getName}"

throw new IllegalArgumentException(message, exc)
}
}
}

Expand Down

0 comments on commit 2b02c2c

Please sign in to comment.