diff --git a/build.sbt b/build.sbt index df6c9d6454e..5f203867f87 100644 --- a/build.sbt +++ b/build.sbt @@ -1181,10 +1181,11 @@ lazy val flinkScalaUtils = (project in flink("scala-utils")) name := "nussknacker-flink-scala-utils", libraryDependencies ++= { Seq( - "org.scala-lang" % "scala-reflect" % scalaVersion.value, - "org.apache.flink" % "flink-streaming-java" % flinkV % Provided, - "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV, - "org.scalatest" %% "scalatest" % scalaTestV % Test, + "org.scala-lang" % "scala-reflect" % scalaVersion.value, + "org.apache.flink" % "flink-streaming-java" % flinkV % Provided, + "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV % Provided, + "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV, + "org.scalatest" %% "scalatest" % scalaTestV % Test, ) ++ flinkLibScalaDeps(scalaVersion.value, Some("provided")) } ) diff --git a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassSerializer.scala b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassSerializer.scala index 6abc2a5930a..8276906aaae 100644 --- a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassSerializer.scala +++ b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/CaseClassSerializer.scala @@ -17,12 +17,16 @@ */ package pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass +import com.typesafe.scalalogging.LazyLogging import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.types.NullFieldException +import scala.tools.nsc.interpreter.Results.Success +import scala.util.{Failure, Success, Try} + /** * Serializer for Case Classes. Creation and access is different from our Java Tuples so we have to * treat them differently. @@ -31,6 +35,7 @@ import org.apache.flink.types.NullFieldException @SerialVersionUID(7341356073446263475L) abstract class CaseClassSerializer[T <: Product](clazz: Class[T], scalaFieldSerializers: Array[TypeSerializer[_]]) extends TupleSerializerBase[T](clazz, scalaFieldSerializers) + with LazyLogging with Cloneable { @transient var fields: Array[AnyRef] = _ @@ -122,7 +127,12 @@ abstract class CaseClassSerializer[T <: Product](clazz: Class[T], scalaFieldSeri fields(i) = fieldSerializers(i).deserialize(source) i += 1 } - createInstance(fields) + + Try(createInstance(fields)) match { + case Success(value) => value + case Failure(exc) => + throw new IllegalArgumentException(s"Failed to deserialize in class: ${clazz.getName}.", exc) + } } private def initArray() = { diff --git a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/ScalaCaseClassSerializer.scala b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/ScalaCaseClassSerializer.scala index 2d6591a1d88..dd3c056cb64 100644 --- a/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/ScalaCaseClassSerializer.scala +++ b/engine/flink/scala-utils/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinfo/caseclass/ScalaCaseClassSerializer.scala @@ -17,11 +17,13 @@ */ package pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass +import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.typeutils._ -import pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass.ScalaCaseClassSerializer.lookupConstructor +import pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass.ScalaCaseClassSerializer.{logger, lookupConstructor} import java.io.{ObjectInputStream, ObjectStreamClass} import scala.reflect.runtime.universe +import scala.util.{Failure, Success, Try} /** * This is a non macro-generated, concrete Scala case class serializer. @@ -57,7 +59,7 @@ class ScalaCaseClassSerializer[T <: Product]( } -object ScalaCaseClassSerializer { +object ScalaCaseClassSerializer extends LazyLogging { def lookupConstructor[T](cls: Class[T]): Array[AnyRef] => T = { val rootMirror = universe.runtimeMirror(cls.getClassLoader) @@ -87,7 +89,16 @@ object ScalaCaseClassSerializer { val constructorMethodMirror = classMirror.reflectConstructor(primaryConstructorSymbol) arr: Array[AnyRef] => { - constructorMethodMirror.apply(arr.toIndexedSeq: _*).asInstanceOf[T] + Try(constructorMethodMirror.apply(arr.toIndexedSeq: _*).asInstanceOf[T]) match { + case Success(value) => value + case Failure(exc) => + val message = s"Constructor invoke failed, class: ${cls.getName}," + + s"constructorMethodMirror: ${constructorMethodMirror.getClass.getName}" + + s"arr: ${arr.mkString("Array(", ", ", ")")}" + + s"class symbol: ${classSymbol.getClass.getName}" + + throw new IllegalArgumentException(message, exc) + } } }