Skip to content

Commit

Permalink
[FLINK-34511][Runtime/State] Remove deprecated checkpointing related …
Browse files Browse the repository at this point in the history
…APIs
  • Loading branch information
Zakelly committed Sep 24, 2024
1 parent f699af3 commit 7d026ae
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.connector.file.sink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
Expand All @@ -42,7 +43,6 @@
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.connector.file.sink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.Map;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Tracker for checkpoint statistics.
*
Expand All @@ -47,21 +45,6 @@
*/
public interface CheckpointStatsTracker {

/**
* Callback when a checkpoint is restored.
*
* @param restored The restored checkpoint stats.
*/
@Deprecated
default void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
checkNotNull(restored, "Restored checkpoint");
reportRestoredCheckpoint(
restored.getCheckpointId(),
restored.getProperties(),
restored.getExternalPath(),
restored.getStateSize());
}

void reportRestoredCheckpoint(
long checkpointID,
CheckpointProperties properties,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -559,27 +559,6 @@ public StreamExecutionEnvironment enableCheckpointing(long interval, Checkpointi
return this;
}

/**
* Enables checkpointing for the streaming job. The distributed state of the streaming dataflow
* will be periodically snapshotted. In case of a failure, the streaming dataflow will be
* restarted from the latest completed checkpoint. This method selects {@link
* CheckpointingMode#EXACTLY_ONCE} guarantees.
*
* <p>The job draws checkpoints periodically, in the default interval. The state will be stored
* in the configured state backend.
*
* <p>NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment.
* For that reason, iterative jobs will not be started if used with enabled checkpointing.
*
* @deprecated Use {@link #enableCheckpointing(long)} instead.
*/
@Deprecated
@PublicEvolving
public StreamExecutionEnvironment enableCheckpointing() {
checkpointCfg.setCheckpointInterval(500);
return this;
}

/**
* Returns the checkpointing interval or -1 if checkpointing is disabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void testCheckpointTracking() throws Exception {
null,
42);
tracker.reportInitializationStarted(Collections.emptySet(), 123L);
tracker.reportRestoredCheckpoint(restored);
reportRestoredCheckpoint(tracker, restored);

CheckpointStatsSnapshot snapshot = tracker.createSnapshot();

Expand Down Expand Up @@ -340,7 +340,8 @@ void testCreateSnapshot() {

// Restore operation => new snapshot
tracker.reportInitializationStarted(Collections.emptySet(), 0);
tracker.reportRestoredCheckpoint(
reportRestoredCheckpoint(
tracker,
new RestoredCheckpointStats(
12,
CheckpointProperties.forCheckpoint(
Expand Down Expand Up @@ -410,7 +411,8 @@ public void addSpan(SpanBuilder spanBuilder) {
final ExecutionAttemptID executionAttemptId2 = ExecutionAttemptID.randomId();
tracker.reportInitializationStarted(
new HashSet<>(Arrays.asList(executionAttemptId3, executionAttemptId2)), 100);
tracker.reportRestoredCheckpoint(
reportRestoredCheckpoint(
tracker,
new RestoredCheckpointStats(
42,
CheckpointProperties.forCheckpoint(
Expand Down Expand Up @@ -454,7 +456,8 @@ public void addSpan(SpanBuilder spanBuilder) {
final ExecutionAttemptID executionAttemptId = ExecutionAttemptID.randomId();
tracker.reportInitializationStarted(
new HashSet<>(Arrays.asList(executionAttemptId1, executionAttemptId)), 100);
tracker.reportRestoredCheckpoint(
reportRestoredCheckpoint(
tracker,
new RestoredCheckpointStats(
44,
CheckpointProperties.forCheckpoint(
Expand Down Expand Up @@ -722,7 +725,7 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
null,
42);
stats.reportInitializationStarted(Collections.emptySet(), restoreTimestamp);
stats.reportRestoredCheckpoint(restored);
reportRestoredCheckpoint(stats, restored);

assertThat(numCheckpoints.getValue()).isEqualTo(2);
assertThat(numInProgressCheckpoints.getValue()).isZero();
Expand Down Expand Up @@ -754,4 +757,13 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
private SubtaskStateStats createSubtaskStats(int index) {
return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true);
}

private void reportRestoredCheckpoint(
CheckpointStatsTracker tracker, RestoredCheckpointStats restored) {
tracker.reportRestoredCheckpoint(
restored.getCheckpointId(),
restored.getProperties(),
restored.getExternalPath(),
restored.getStateSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,21 +245,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
}

/**
* Method for enabling fault-tolerance. Activates monitoring and backup of streaming operator
* states. Time interval between state checkpoints is specified in in millis.
*
* Setting this option assumes that the job is used in production and thus if not stated
* explicitly otherwise with calling the [[setRestartStrategy]] method in case of failure the job
* will be resubmitted to the cluster indefinitely.
*/
@deprecated
@PublicEvolving
def enableCheckpointing(): StreamExecutionEnvironment = {
javaEnv.enableCheckpointing()
this
}

/** @deprecated Use [[getCheckpointingConsistencyMode()]] instead. */
@deprecated
def getCheckpointingMode = javaEnv.getCheckpointingMode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,6 @@ public StreamExecutionEnvironment enableCheckpointing(long interval, Checkpointi
"This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
}

@Override
public StreamExecutionEnvironment enableCheckpointing() {
throw new UnsupportedOperationException(
"This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported.");
}

@Override
public long getCheckpointInterval() {
return realExecEnv.getCheckpointInterval();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ public void close() {
}

private void createIteration(StreamExecutionEnvironment env, int timeoutScale) {
env.enableCheckpointing();
env.enableCheckpointing(500L);

DataStream<Boolean> source =
env.fromData(Collections.nCopies(parallelism * 2, false))
Expand Down

0 comments on commit 7d026ae

Please sign in to comment.