diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f3eb3d6..1e9fd1f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: java: [11] - scala: [2.12.20, 2.13.15, 3.3.4] + scala: [2.13.15, 3.3.4] flink: [1.18.1, 1.19.1] include: - scala: 3.3.4 diff --git a/README.md b/README.md index 88e7904..d5259bd 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ -# Scala 2.12/2.13/3.x API for Apache Flink +# Scala 2.13/3.x API for Apache Flink [![CI Status](https://github.com/flink-extended/flink-scala-api/workflows/CI/badge.svg)](https://github.com/flinkextended/flink-scala-api/actions) -[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.12/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.12) +[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.13/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.13) [![License: Apache 2](https://img.shields.io/badge/License-Apache2-green.svg)](https://opensource.org/licenses/Apache-2.0) ![Last commit](https://img.shields.io/github/last-commit/flink-extended/flink-scala-api) ![Last release](https://img.shields.io/github/release/flink-extended/flink-scala-api) -This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.12, 2.13 and 3.x. +This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.13 and 3.x. ## Migration @@ -26,12 +26,12 @@ import org.apache.flinkx.api.serializers._ ## Usage -`flink-scala-api` is released to Maven-central for 2.12, 2.13 and 3. For SBT, add this snippet to `build.sbt`: +`flink-scala-api` is released to Maven-central for 2.13 and 3. For SBT, add this snippet to `build.sbt`: ```scala libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.1.6" ``` -For Ammonite: +## For Ammonite ```scala import $ivy.`org.flinkextended::flink-scala-api:1.18.1_1.1.6` @@ -39,6 +39,22 @@ import $ivy.`org.flinkextended::flink-scala-api:1.18.1_1.1.6` import $ivy.`org.apache.flink:flink-clients:1.18.1` ``` +## For Scala 2.12 + +If you want first to migrate to org.flinkextended:flink-scala-api staying on Scala 2.12, you can use the last build for Scala 2.12: + +```scala +libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.2.0" +// or +"org.flinkextended" %% "flink-scala-api" % "1.19.1_1.2.0" +// or +"org.flinkextended" %% "flink-scala-api" % "1.20.0_1.2.0" +``` + +Build for Scala 2.12 is no longer published. + +## SBT Project Template + If you want to create new project easily check this __Giter8 template__ out: [novakov-alexey/flink-scala-api.g8](https://github.com/novakov-alexey/flink-scala-api.g8) ## Supported Flink versions diff --git a/build.sbt b/build.sbt index b054ea6..2690562 100644 --- a/build.sbt +++ b/build.sbt @@ -9,6 +9,7 @@ lazy val flinkVersion = System.getProperty("flinkVersion", "1.18.1") lazy val root = (project in file(".")) .aggregate(`scala-api`, `examples`) .settings( + scalaVersion := rootScalaVersion, publish / skip := true ) @@ -17,7 +18,7 @@ lazy val `scala-api` = (project in file("modules/scala-api")) .settings( name := "flink-scala-api", scalaVersion := rootScalaVersion, - crossScalaVersions := Seq("2.12.20", "2.13.15", rootScalaVersion), + crossScalaVersions := Seq("2.13.15", rootScalaVersion), libraryDependencies ++= Seq( "org.apache.flink" % "flink-streaming-java" % flinkVersion, "org.apache.flink" % "flink-java" % flinkVersion, diff --git a/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala b/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala index 8688333..c53decc 100644 --- a/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala +++ b/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala @@ -30,6 +30,7 @@ trait CommonTaggedDerivation[TypeClass[_]]: )* ) + @annotation.nowarn val caseClass = new CaseClass[Typeclass, A]( typeInfo[A], isObject[A], diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala index 2426eae..8cabf98 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory import scala.collection.mutable import scala.jdk.CollectionConverters._ +import scala.annotation.tailrec /** A cleaner that renders closures serializable if they can be done so safely. */ @@ -63,20 +64,27 @@ object ClosureCleaner { // not a good idea (whereas we can clone closure objects just fine since we // understand how all their fields are used). private def getOuterClassesAndObjects(obj: AnyRef): (List[Class[_]], List[AnyRef]) = { - for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") { - f.setAccessible(true) - val outer = f.get(obj) - // The outer pointer may be null if we have cleaned this closure before - if (outer != null) { - if (isClosure(f.getType)) { - val recurRet = getOuterClassesAndObjects(outer) - return (f.getType :: recurRet._1, outer :: recurRet._2) - } else { - return (f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure - } + + @tailrec + def loop(fields: List[Field]): (List[Class[_]], List[AnyRef]) = + fields match { + case f :: tail => + f.setAccessible(true) + val outer = f.get(obj) + // The outer pointer may be null if we have cleaned this closure before + if (outer != null) { + if (isClosure(f.getType)) { + val recurRet = getOuterClassesAndObjects(outer) + (f.getType :: recurRet._1, outer :: recurRet._2) + } else { + (f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure + } + } else loop(tail) + case Nil => (Nil, Nil) } - } - (Nil, Nil) + + val fields = obj.getClass.getDeclaredFields.filter(_.getName == "$outer").toList + loop(fields) } /** Return a list of classes that represent closures enclosed in the given closure object. @@ -89,17 +97,20 @@ object ClosureCleaner { if (cr != null) { val set = mutable.Set.empty[Class[_]] cr.accept(new InnerClosureFinder(set), 0) - for (cls <- set -- seen) { + for (cls <- set.toSet -- seen) { seen += cls stack.push(cls) } } } - (seen - obj.getClass).toList + (seen.toSet - obj.getClass).toList } /** Initializes the accessed fields for outer classes and their super classes. */ - private def initAccessedFields(accessedFields: mutable.Map[Class[_], mutable.Set[String]], outerClasses: Seq[Class[_]]): Unit = { + private def initAccessedFields( + accessedFields: mutable.Map[Class[_], mutable.Set[String]], + outerClasses: Seq[Class[_]] + ): Unit = { for (cls <- outerClasses) { var currentClass = cls assert(currentClass != null, "The outer class can't be null.") diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala index 4e49346..561c8c3 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala @@ -189,7 +189,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { * @return * The resulting data stream. */ - def flatMap[R: TypeInformation](fun1: IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R] = { + def flatMap[R: TypeInformation](fun1: IN1 => IterableOnce[R], fun2: IN2 => IterableOnce[R]): DataStream[R] = { if (fun1 == null || fun2 == null) { throw new NullPointerException("FlatMap functions must not be null.") @@ -198,8 +198,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { val cleanFun2 = clean(fun2) val flatMapper = new CoFlatMapFunction[IN1, IN2, R] { - def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).foreach(out.collect) } - def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).foreach(out.collect) } + def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).iterator.foreach(out.collect) } + def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).iterator.foreach(out.collect) } } flatMap(flatMapper) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala index 52e3c30..1befbf3 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala @@ -560,13 +560,13 @@ class DataStream[T](stream: JavaStream[T]) { /** Creates a new DataStream by applying the given function to every element and flattening the results. */ - def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = { + def flatMap[R: TypeInformation](fun: T => IterableOnce[R]): DataStream[R] = { if (fun == null) { throw new NullPointerException("FlatMap function must not be null.") } val cleanFun = clean(fun) val flatMapper = new FlatMapFunction[T, R] { - def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).foreach(out.collect) } + def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).iterator.foreach(out.collect) } } flatMap(flatMapper) } @@ -690,8 +690,10 @@ class DataStream[T](stream: JavaStream[T]) { * For cases where the timestamps are not monotonously increasing, use the more general methods * [[assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)]] and * [[assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)]]. + * + * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead. */ - @PublicEvolving + @Deprecated def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = { val cleanExtractor = clean(extractor) val extractorFunction = new AscendingTimestampExtractor[T] { @@ -754,8 +756,12 @@ class DataStream[T](stream: JavaStream[T]) { def printToErr(sinkIdentifier: String): DataStreamSink[T] = stream.printToErr(sinkIdentifier) /** Writes a DataStream using the given [[OutputFormat]]. - */ - @PublicEvolving + * + * @deprecated Please use the {@link + * org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} explicitly + * using the {@link #addSink(SinkFunction)} method. + */ + @Deprecated def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T] = { stream.writeUsingOutputFormat(format) } @@ -790,7 +796,10 @@ class DataStream[T](stream: JavaStream[T]) { /** Adds the given sink to this DataStream. Only streams with sinks added will be executed once the * StreamExecutionEnvironment.execute(...) method is called. + * + * @deprecated Please use the sinkTo(sink: org.apache.flink.api.connector.sink2.Sink[T]) */ + @Deprecated def sinkTo(sink: org.apache.flink.api.connector.sink.Sink[T, _, _, _]): DataStreamSink[T] = stream.sinkTo(sink) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala index 2780e66..d5156e2 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala @@ -460,7 +460,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * Note that the user state object needs to be serializable. */ def flatMapWithState[R: TypeInformation, S: TypeInformation]( - fun: (T, Option[S]) => (TraversableOnce[R], Option[S]) + fun: (T, Option[S]) => (IterableOnce[R], Option[S]) ): DataStream[R] = { if (fun == null) { throw new NullPointerException("Flatmap function must not be null.") @@ -470,12 +470,12 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]] val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(javaStream.getExecutionConfig) - val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T, TraversableOnce[R], S] { + val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T, IterableOnce[R], S] { override val stateSerializer: TypeSerializer[S] = serializer override def flatMap(in: T, out: Collector[R]): Unit = { - applyWithState(in, cleanFun).foreach(out.collect) + applyWithState(in, cleanFun).iterator.foreach(out.collect) } } @@ -488,8 +488,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * Name under which to the publish the queryable state instance * @return * Queryable state instance + * @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a + * future Flink major version. */ - @PublicEvolving + @Deprecated def asQueryableState(queryableStateName: String): QueryableStateStream[K, T] = { val stateDescriptor = new ValueStateDescriptor(queryableStateName, dataType.createSerializer(executionConfig)) @@ -504,8 +506,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * State descriptor to create state instance from * @return * Queryable state instance + * + * @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a + * future Flink major version. */ - @PublicEvolving + @Deprecated def asQueryableState( queryableStateName: String, stateDescriptor: ValueStateDescriptor[T] @@ -529,8 +534,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * State descriptor to create state instance from * @return * Queryable state instance + * @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a + * future Flink major version. */ - @PublicEvolving + @Deprecated def asQueryableState( queryableStateName: String, stateDescriptor: ReducingStateDescriptor[T] diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala index 2f65196..004fdc0 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala @@ -487,19 +487,29 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** Creates a DataStream that represents the Strings produced by reading the given file line wise. The file will be * read with the system's default character set. + * + * @deprecated Use {@code + * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. */ + @Deprecated def readTextFile(filePath: String): DataStream[String] = asScalaStream(javaEnv.readTextFile(filePath)) /** Creates a data stream that represents the Strings produced by reading the given file line wise. The character set * with the given name will be used to read the files. + * @deprecated Use {@code + * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. */ + @Deprecated def readTextFile(filePath: String, charsetName: String): DataStream[String] = asScalaStream(javaEnv.readTextFile(filePath, charsetName)) /** Reads the given file with the given input format. The file path should be passed as a URI (e.g., * "file:///some/local/file" or "hdfs://host:port/file/path"). + * @deprecated Use {@code + * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. */ + @Deprecated def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataStream[T] = asScalaStream(javaEnv.readFile(inputFormat, filePath)) @@ -526,8 +536,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans * @return * The data stream that represents the data read from the given file + * + * @deprecated Use {@code + * FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}. */ - @PublicEvolving + @Deprecated def readFile[T: TypeInformation]( inputFormat: FileInputFormat[T], filePath: String, @@ -563,7 +576,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * have a parallelism of 1. To enable parallel execution, the user defined source should implement * ParallelSourceFunction or extend RichParallelSourceFunction. In these cases the resulting source will have the * parallelism of the environment. To change this afterwards call DataStreamSource.setParallelism(int) + * + * @deprecated This method relies on the {@link + * org.apache.flink.streaming.api.functions.source.SourceFunction} API, which is due to be + * removed. Use the {@link #fromSource[TypeInformation](Source, WatermarkStrategy, String)} + * method based on the new {@link org.apache.flink.api.connector.source.Source} API instead. */ + @Deprecated def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = { require(function != null, "Function must not be null.") @@ -573,7 +592,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } /** Create a DataStream using a user defined source function for arbitrary source functionality. - */ + * + * @deprecated This method relies on the {@link + * org.apache.flink.streaming.api.functions.source.SourceFunction} API, which is due to be + * removed. Use the {@link #fromSource[TypeInformation](Source, WatermarkStrategy, String)} + * method based on the new {@link org.apache.flink.api.connector.source.Source} API instead. + */ + @Deprecated def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = { require(function != null, "Function must not be null.") val sourceFunction = new SourceFunction[T] { diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala index db47206..011d558 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala @@ -63,8 +63,8 @@ class OnConnectedStream[IN1, IN2](stream: ConnectedStreams[IN1, IN2]) { */ @PublicEvolving def flatMapWith[R: TypeInformation]( - flatMap1: IN1 => TraversableOnce[R], - flatMap2: IN2 => TraversableOnce[R] + flatMap1: IN1 => IterableOnce[R], + flatMap2: IN2 => IterableOnce[R] ): DataStream[R] = stream.flatMap(flatMap1, flatMap2) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnDataStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnDataStream.scala index 9856f06..2f5ccda 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnDataStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnDataStream.scala @@ -55,7 +55,7 @@ class OnDataStream[T](stream: DataStream[T]) { * A dataset of R */ @PublicEvolving - def flatMapWith[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = + def flatMapWith[R: TypeInformation](fun: T => IterableOnce[R]): DataStream[R] = stream.flatMap(fun) /** Applies a predicate `fun` to each item of the stream, keeping only those for which the predicate holds diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunction.scala index 1ffe571..43b1500 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunction.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunction.scala @@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** A wrapper function that exposes a Scala Function3 as a Java AllWindowFunction. */ diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaProcessWindowFunctionWrapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaProcessWindowFunctionWrapper.scala index ba3db6e..68f1dfa 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaProcessWindowFunctionWrapper.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaProcessWindowFunctionWrapper.scala @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.functions.windowing.{ import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** A wrapper function that exposes a Scala ProcessWindowFunction as a ProcessWindowFunction function. * diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunction.scala index 45ea814..9d68ca0 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunction.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunction.scala @@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWi import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** A wrapper function that exposes a Scala Function4 as a Java WindowFunction. */ diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala index bf1b556..637076d 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala @@ -34,7 +34,7 @@ import Keys.ExpressionKeys import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import scala.annotation.{nowarn, tailrec} -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer /** TypeInformation for Case Classes. Creation and access is different from our Java Tuples so we have to treat them @@ -198,27 +198,32 @@ abstract class CaseClassTypeInfo[T <: Product]( field = "_" + (Integer.valueOf(field) + 1) } - for (i <- fieldNames.indices) { - if (fieldNames(i) == field) { - if (tail == null) { - return getTypeAt(i) - } else { - fieldTypes(i) match { - case co: CompositeType[_] => - return co.getTypeAt(tail) - case _ => - throw new InvalidFieldReferenceException( - "Nested field expression \"" + tail + - "\" not possible on atomic type " + fieldTypes(i) + "." - ) - } - } + @tailrec + def loop(indices: List[Int]): TypeInformation[X] = + indices match { + case i :: ii => + if (fieldNames(i) == field) { + if (tail == null) { + getTypeAt(i) + } else { + fieldTypes(i) match { + case co: CompositeType[_] => + co.getTypeAt(tail) + case _ => + throw new InvalidFieldReferenceException( + "Nested field expression \"" + tail + + "\" not possible on atomic type " + fieldTypes(i) + "." + ) + } + } + } else loop(ii) + case Nil => + throw new InvalidFieldReferenceException( + s"Unable to find field \"$field\" in type $this." + ) } - } - throw new InvalidFieldReferenceException( - "Unable to find field \"" + field + - "\" in type " + this + "." - ) + + loop(fieldNames.indices.toList) } @PublicEvolving diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala index 8044310..1ec843f 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** TypeInformation [[Either]]. */ diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala index 29a9dec..da37c0d 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** TypeInformation for [[Option]]. */