Skip to content

Commit

Permalink
drop scala 2.12 support, add deprecations, remove returns usage (#176)
Browse files Browse the repository at this point in the history
* drop scala 2.12 support, add deprecations, remove returns usage

* add a note on the last Scala 2.12 build
  • Loading branch information
novakov-alexey authored Nov 26, 2024
1 parent 07e2f29 commit a20b7fe
Show file tree
Hide file tree
Showing 17 changed files with 143 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
java: [11]
scala: [2.12.20, 2.13.15, 3.3.4]
scala: [2.13.15, 3.3.4]
flink: [1.18.1, 1.19.1]
include:
- scala: 3.3.4
Expand Down
26 changes: 21 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Scala 2.12/2.13/3.x API for Apache Flink
# Scala 2.13/3.x API for Apache Flink

[![CI Status](https://github.com/flink-extended/flink-scala-api/workflows/CI/badge.svg)](https://github.com/flinkextended/flink-scala-api/actions)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.12/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.12)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.13/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/org.flinkextended/flink-scala-api_2.13)
[![License: Apache 2](https://img.shields.io/badge/License-Apache2-green.svg)](https://opensource.org/licenses/Apache-2.0)
![Last commit](https://img.shields.io/github/last-commit/flink-extended/flink-scala-api)
![Last release](https://img.shields.io/github/release/flink-extended/flink-scala-api)

This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.12, 2.13 and 3.x.
This project is a community-maintained fork of official Apache Flink Scala API, cross-built for scala 2.13 and 3.x.

## Migration

Expand All @@ -26,19 +26,35 @@ import org.apache.flinkx.api.serializers._

## Usage

`flink-scala-api` is released to Maven-central for 2.12, 2.13 and 3. For SBT, add this snippet to `build.sbt`:
`flink-scala-api` is released to Maven-central for 2.13 and 3. For SBT, add this snippet to `build.sbt`:
```scala
libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.1.6"
```

For Ammonite:
## For Ammonite

```scala
import $ivy.`org.flinkextended::flink-scala-api:1.18.1_1.1.6`
// you might need flink-client too in order to run in the REPL
import $ivy.`org.apache.flink:flink-clients:1.18.1`
```

## For Scala 2.12

If you want first to migrate to org.flinkextended:flink-scala-api staying on Scala 2.12, you can use the last build for Scala 2.12:

```scala
libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.2.0"
// or
"org.flinkextended" %% "flink-scala-api" % "1.19.1_1.2.0"
// or
"org.flinkextended" %% "flink-scala-api" % "1.20.0_1.2.0"
```

Build for Scala 2.12 is no longer published.

## SBT Project Template

If you want to create new project easily check this __Giter8 template__ out: [novakov-alexey/flink-scala-api.g8](https://github.com/novakov-alexey/flink-scala-api.g8)

## Supported Flink versions
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ lazy val flinkVersion = System.getProperty("flinkVersion", "1.18.1")
lazy val root = (project in file("."))
.aggregate(`scala-api`, `examples`)
.settings(
scalaVersion := rootScalaVersion,
publish / skip := true
)

Expand All @@ -17,7 +18,7 @@ lazy val `scala-api` = (project in file("modules/scala-api"))
.settings(
name := "flink-scala-api",
scalaVersion := rootScalaVersion,
crossScalaVersions := Seq("2.12.20", "2.13.15", rootScalaVersion),
crossScalaVersions := Seq("2.13.15", rootScalaVersion),
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-streaming-java" % flinkVersion,
"org.apache.flink" % "flink-java" % flinkVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait CommonTaggedDerivation[TypeClass[_]]:
)*
)

@annotation.nowarn
val caseClass = new CaseClass[Typeclass, A](
typeInfo[A],
isObject[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.annotation.tailrec

/** A cleaner that renders closures serializable if they can be done so safely.
*/
Expand Down Expand Up @@ -63,20 +64,27 @@ object ClosureCleaner {
// not a good idea (whereas we can clone closure objects just fine since we
// understand how all their fields are used).
private def getOuterClassesAndObjects(obj: AnyRef): (List[Class[_]], List[AnyRef]) = {
for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
f.setAccessible(true)
val outer = f.get(obj)
// The outer pointer may be null if we have cleaned this closure before
if (outer != null) {
if (isClosure(f.getType)) {
val recurRet = getOuterClassesAndObjects(outer)
return (f.getType :: recurRet._1, outer :: recurRet._2)
} else {
return (f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
}

@tailrec
def loop(fields: List[Field]): (List[Class[_]], List[AnyRef]) =
fields match {
case f :: tail =>
f.setAccessible(true)
val outer = f.get(obj)
// The outer pointer may be null if we have cleaned this closure before
if (outer != null) {
if (isClosure(f.getType)) {
val recurRet = getOuterClassesAndObjects(outer)
(f.getType :: recurRet._1, outer :: recurRet._2)
} else {
(f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
}
} else loop(tail)
case Nil => (Nil, Nil)
}
}
(Nil, Nil)

val fields = obj.getClass.getDeclaredFields.filter(_.getName == "$outer").toList
loop(fields)
}

/** Return a list of classes that represent closures enclosed in the given closure object.
Expand All @@ -89,17 +97,20 @@ object ClosureCleaner {
if (cr != null) {
val set = mutable.Set.empty[Class[_]]
cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) {
for (cls <- set.toSet -- seen) {
seen += cls
stack.push(cls)
}
}
}
(seen - obj.getClass).toList
(seen.toSet - obj.getClass).toList
}

/** Initializes the accessed fields for outer classes and their super classes. */
private def initAccessedFields(accessedFields: mutable.Map[Class[_], mutable.Set[String]], outerClasses: Seq[Class[_]]): Unit = {
private def initAccessedFields(
accessedFields: mutable.Map[Class[_], mutable.Set[String]],
outerClasses: Seq[Class[_]]
): Unit = {
for (cls <- outerClasses) {
var currentClass = cls
assert(currentClass != null, "The outer class can't be null.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return
* The resulting data stream.
*/
def flatMap[R: TypeInformation](fun1: IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
def flatMap[R: TypeInformation](fun1: IN1 => IterableOnce[R], fun2: IN2 => IterableOnce[R]): DataStream[R] = {

if (fun1 == null || fun2 == null) {
throw new NullPointerException("FlatMap functions must not be null.")
Expand All @@ -198,8 +198,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
val cleanFun2 = clean(fun2)

val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).foreach(out.collect) }
def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).foreach(out.collect) }
def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).iterator.foreach(out.collect) }
def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).iterator.foreach(out.collect) }
}

flatMap(flatMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,13 @@ class DataStream[T](stream: JavaStream[T]) {

/** Creates a new DataStream by applying the given function to every element and flattening the results.
*/
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {
def flatMap[R: TypeInformation](fun: T => IterableOnce[R]): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
val cleanFun = clean(fun)
val flatMapper = new FlatMapFunction[T, R] {
def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).foreach(out.collect) }
def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).iterator.foreach(out.collect) }
}
flatMap(flatMapper)
}
Expand Down Expand Up @@ -690,8 +690,10 @@ class DataStream[T](stream: JavaStream[T]) {
* For cases where the timestamps are not monotonously increasing, use the more general methods
* [[assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)]] and
* [[assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)]].
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
*/
@PublicEvolving
@Deprecated
def assignAscendingTimestamps(extractor: T => Long): DataStream[T] = {
val cleanExtractor = clean(extractor)
val extractorFunction = new AscendingTimestampExtractor[T] {
Expand Down Expand Up @@ -754,8 +756,12 @@ class DataStream[T](stream: JavaStream[T]) {
def printToErr(sinkIdentifier: String): DataStreamSink[T] = stream.printToErr(sinkIdentifier)

/** Writes a DataStream using the given [[OutputFormat]].
*/
@PublicEvolving
*
* @deprecated Please use the {@link
* org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} explicitly
* using the {@link #addSink(SinkFunction)} method.
*/
@Deprecated
def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T] = {
stream.writeUsingOutputFormat(format)
}
Expand Down Expand Up @@ -790,7 +796,10 @@ class DataStream[T](stream: JavaStream[T]) {

/** Adds the given sink to this DataStream. Only streams with sinks added will be executed once the
* StreamExecutionEnvironment.execute(...) method is called.
*
* @deprecated Please use the sinkTo(sink: org.apache.flink.api.connector.sink2.Sink[T])
*/
@Deprecated
def sinkTo(sink: org.apache.flink.api.connector.sink.Sink[T, _, _, _]): DataStreamSink[T] =
stream.sinkTo(sink)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* Note that the user state object needs to be serializable.
*/
def flatMapWithState[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (TraversableOnce[R], Option[S])
fun: (T, Option[S]) => (IterableOnce[R], Option[S])
): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Flatmap function must not be null.")
Expand All @@ -470,12 +470,12 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(javaStream.getExecutionConfig)

val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T, TraversableOnce[R], S] {
val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T, IterableOnce[R], S] {

override val stateSerializer: TypeSerializer[S] = serializer

override def flatMap(in: T, out: Collector[R]): Unit = {
applyWithState(in, cleanFun).foreach(out.collect)
applyWithState(in, cleanFun).iterator.foreach(out.collect)
}
}

Expand All @@ -488,8 +488,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* Name under which to the publish the queryable state instance
* @return
* Queryable state instance
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
* future Flink major version.
*/
@PublicEvolving
@Deprecated
def asQueryableState(queryableStateName: String): QueryableStateStream[K, T] = {
val stateDescriptor = new ValueStateDescriptor(queryableStateName, dataType.createSerializer(executionConfig))

Expand All @@ -504,8 +506,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* State descriptor to create state instance from
* @return
* Queryable state instance
*
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
* future Flink major version.
*/
@PublicEvolving
@Deprecated
def asQueryableState(
queryableStateName: String,
stateDescriptor: ValueStateDescriptor[T]
Expand All @@ -529,8 +534,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* State descriptor to create state instance from
* @return
* Queryable state instance
* @deprecated The Queryable State feature is deprecated since Flink 1.18, and will be removed in a
* future Flink major version.
*/
@PublicEvolving
@Deprecated
def asQueryableState(
queryableStateName: String,
stateDescriptor: ReducingStateDescriptor[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,19 +487,29 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {

/** Creates a DataStream that represents the Strings produced by reading the given file line wise. The file will be
* read with the system's default character set.
*
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
*/
@Deprecated
def readTextFile(filePath: String): DataStream[String] =
asScalaStream(javaEnv.readTextFile(filePath))

/** Creates a data stream that represents the Strings produced by reading the given file line wise. The character set
* with the given name will be used to read the files.
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
*/
@Deprecated
def readTextFile(filePath: String, charsetName: String): DataStream[String] =
asScalaStream(javaEnv.readTextFile(filePath, charsetName))

/** Reads the given file with the given input format. The file path should be passed as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
*/
@Deprecated
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataStream[T] =
asScalaStream(javaEnv.readFile(inputFormat, filePath))

Expand All @@ -526,8 +536,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
* @return
* The data stream that represents the data read from the given file
*
* @deprecated Use {@code
* FileSource#forRecordStreamFormat()/forBulkFileFormat()/forRecordFileFormat() instead}.
*/
@PublicEvolving
@Deprecated
def readFile[T: TypeInformation](
inputFormat: FileInputFormat[T],
filePath: String,
Expand Down Expand Up @@ -563,7 +576,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* have a parallelism of 1. To enable parallel execution, the user defined source should implement
* ParallelSourceFunction or extend RichParallelSourceFunction. In these cases the resulting source will have the
* parallelism of the environment. To change this afterwards call DataStreamSource.setParallelism(int)
*
* @deprecated This method relies on the {@link
* org.apache.flink.streaming.api.functions.source.SourceFunction} API, which is due to be
* removed. Use the {@link #fromSource[TypeInformation](Source, WatermarkStrategy, String)}
* method based on the new {@link org.apache.flink.api.connector.source.Source} API instead.
*/
@Deprecated
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
require(function != null, "Function must not be null.")

Expand All @@ -573,7 +592,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}

/** Create a DataStream using a user defined source function for arbitrary source functionality.
*/
*
* @deprecated This method relies on the {@link
* org.apache.flink.streaming.api.functions.source.SourceFunction} API, which is due to be
* removed. Use the {@link #fromSource[TypeInformation](Source, WatermarkStrategy, String)}
* method based on the new {@link org.apache.flink.api.connector.source.Source} API instead.
*/
@Deprecated
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
require(function != null, "Function must not be null.")
val sourceFunction = new SourceFunction[T] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class OnConnectedStream[IN1, IN2](stream: ConnectedStreams[IN1, IN2]) {
*/
@PublicEvolving
def flatMapWith[R: TypeInformation](
flatMap1: IN1 => TraversableOnce[R],
flatMap2: IN2 => TraversableOnce[R]
flatMap1: IN1 => IterableOnce[R],
flatMap2: IN2 => IterableOnce[R]
): DataStream[R] =
stream.flatMap(flatMap1, flatMap2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class OnDataStream[T](stream: DataStream[T]) {
* A dataset of R
*/
@PublicEvolving
def flatMapWith[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] =
def flatMapWith[R: TypeInformation](fun: T => IterableOnce[R]): DataStream[R] =
stream.flatMap(fun)

/** Applies a predicate `fun` to each item of the stream, keeping only those for which the predicate holds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction =>
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/** A wrapper function that exposes a Scala Function3 as a Java AllWindowFunction.
*/
Expand Down
Loading

0 comments on commit a20b7fe

Please sign in to comment.