Skip to content

Commit

Permalink
[FLINK-34516] Use new CheckpointingMode in flink-core in scala
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Mar 13, 2024
1 parent f7ff832 commit e67233e
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.core.execution.{CheckpointingMode, JobClient, JobListener}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source._
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
Expand All @@ -45,7 +45,6 @@ import _root_.scala.language.implicitConversions
import com.esotericsoftware.kryo.Serializer

import java.net.URI

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -202,7 +201,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
@PublicEvolving
def enableCheckpointing(
interval: Long,
mode: CheckpointingMode,
mode: org.apache.flink.streaming.api.CheckpointingMode,
force: Boolean): StreamExecutionEnvironment = {
javaEnv.enableCheckpointing(interval, mode, force)
this
Expand All @@ -214,17 +213,41 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
* restarted from the latest completed checkpoint.
*
* The job draws checkpoints periodically, in the given interval. The system uses the given
* [[CheckpointingMode]] for the checkpointing ("exactly once" vs "at least once"). The state will
* be stored in the configured state backend.
* [[org.apache.flink.streaming.api.CheckpointingMode]] for the checkpointing ("exactly once" vs
* "at least once"). The state will be stored in the configured state backend.
*
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For
* that reason, iterative jobs will not be started if used with enabled checkpointing. To override
* this mechanism, use the [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
* that reason, iterative jobs will not be started if used with enabled checkpointing.
*
* @param interval
* Time interval between state checkpoints in milliseconds.
* @param mode
* The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
* @deprecated
* Use [[enableCheckpointing(Long, CheckpointingMode)]] instead.
*/
@deprecated
def enableCheckpointing(interval: Long, mode: org.apache.flink.streaming.api.CheckpointingMode): StreamExecutionEnvironment = {
javaEnv.enableCheckpointing(interval, mode)
this
}

/**
* Enables checkpointing for the streaming job. The distributed state of the streaming dataflow
* will be periodically snapshotted. In case of a failure, the streaming dataflow will be
* restarted from the latest completed checkpoint.
*
* The job draws checkpoints periodically, in the given interval. The system uses the given
* [[CheckpointingMode]] for the checkpointing ("exactly once" vs "at least once"). The state will
* be stored in the configured state backend.
*
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For
* that reason, iterative jobs will not be started if used with enabled checkpointing.
*
* @param interval
* Time interval between state checkpoints in milliseconds.
* @param mode
* The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
*/
def enableCheckpointing(interval: Long, mode: CheckpointingMode): StreamExecutionEnvironment = {
javaEnv.enableCheckpointing(interval, mode)
Expand All @@ -241,8 +264,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
* backend.
*
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For
* that reason, iterative jobs will not be started if used with enabled checkpointing. To override
* this mechanism, use the [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
* that reason, iterative jobs will not be started if used with enabled checkpointing.
*
* @param interval
* Time interval between state checkpoints in milliseconds.
Expand All @@ -266,8 +288,14 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
this
}

/**
* @deprecated Use [[getCheckpointingConsistencyMode()]] instead.
*/
@deprecated
def getCheckpointingMode = javaEnv.getCheckpointingMode()

def getCheckpointingConsistencyMode = javaEnv.getCheckpointingConsistencyMode()

/**
* Sets the state backend that describes how to store operator. It defines the data structures
* that hold state during execution (for example hash tables, RocksDB, or other data stores).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.core.execution.CheckpointingMode
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.config.OptimizerConfigOptions
Expand All @@ -33,14 +33,12 @@ import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy._
import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
import org.apache.flink.types.Row

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{BeforeEach, TestTemplate}
import org.junit.jupiter.api.extension.ExtendWith

import java.time.ZoneId
import java.util

import scala.collection.JavaConversions._

@ExtendWith(Array(classOf[ParameterizedTestExtension]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.core.execution.CheckpointingMode
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction
import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
import org.apache.flink.types.Row

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{BeforeEach, TestTemplate}
import org.junit.jupiter.api.extension.ExtendWith
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@ package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.core.execution.CheckpointingMode
import org.apache.flink.table.api.bridge.scala.tableConversions
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
import org.apache.flink.types.Row

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{BeforeEach, TestTemplate}
import org.junit.jupiter.api.extension.ExtendWith

import java.util

import scala.collection.JavaConversions._

/** IT cases for window aggregates with distinct aggregates. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@ package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.core.execution.CheckpointingMode
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
import org.apache.flink.types.Row

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{BeforeEach, TestTemplate}
import org.junit.jupiter.api.extension.ExtendWith

import java.time.ZoneId
import java.util

import scala.collection.JavaConversions._

@ExtendWith(Array(classOf[ParameterizedTestExtension]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.core.execution.CheckpointingMode
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction
import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
import org.apache.flink.types.Row

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{BeforeEach, TestTemplate}
import org.junit.jupiter.api.extension.ExtendWith
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.core.execution.CheckpointingMode
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{BeforeEach, TestTemplate}
import org.junit.jupiter.api.extension.ExtendWith
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.configuration.{CheckpointingOptions, Configuration}
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.core.execution.CheckpointingMode
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.source.FromElementsFunction
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
Expand All @@ -36,13 +36,11 @@ import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.types.logical.RowType
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{AfterEach, BeforeEach}

import java.nio.file.Files
import java.util

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down

0 comments on commit e67233e

Please sign in to comment.