diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java index dc5c50f43ac63..5766ffdce398a 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.file.sink; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.configuration.Configuration; @@ -42,7 +43,6 @@ import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.minicluster.RpcServiceSharing; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java index 6a21fea3ec2a3..ac279a97abbae 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java @@ -19,13 +19,13 @@ package org.apache.flink.connector.file.sink; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index b620c0ce4c58a..71a970ddd1959 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -27,8 +27,6 @@ import java.util.Map; import java.util.Set; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * Tracker for checkpoint statistics. * @@ -47,21 +45,6 @@ */ public interface CheckpointStatsTracker { - /** - * Callback when a checkpoint is restored. - * - * @param restored The restored checkpoint stats. - */ - @Deprecated - default void reportRestoredCheckpoint(RestoredCheckpointStats restored) { - checkNotNull(restored, "Restored checkpoint"); - reportRestoredCheckpoint( - restored.getCheckpointId(), - restored.getProperties(), - restored.getExternalPath(), - restored.getStateSize()); - } - void reportRestoredCheckpoint( long checkpointID, CheckpointProperties properties, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java deleted file mode 100644 index 5cfd3b504562d..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.annotation.Public; - -/** - * This interface must be implemented by functions/operations that want to receive a commit - * notification once a checkpoint has been completely acknowledged by all participants. - * - * @deprecated This interface has been moved to {@link - * org.apache.flink.api.common.state.CheckpointListener}. This class is kept to maintain - * backwards compatibility and will be removed in future releases. - */ -@Public -@Deprecated -public interface CheckpointListener extends org.apache.flink.api.common.state.CheckpointListener {} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 8f4097ccb603a..b740a62e0c580 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -559,27 +559,6 @@ public StreamExecutionEnvironment enableCheckpointing(long interval, Checkpointi return this; } - /** - * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow - * will be periodically snapshotted. In case of a failure, the streaming dataflow will be - * restarted from the latest completed checkpoint. This method selects {@link - * CheckpointingMode#EXACTLY_ONCE} guarantees. - * - *

The job draws checkpoints periodically, in the default interval. The state will be stored - * in the configured state backend. - * - *

NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. - * For that reason, iterative jobs will not be started if used with enabled checkpointing. - * - * @deprecated Use {@link #enableCheckpointing(long)} instead. - */ - @Deprecated - @PublicEvolving - public StreamExecutionEnvironment enableCheckpointing() { - checkpointCfg.setCheckpointInterval(500); - return this; - } - /** * Returns the checkpointing interval or -1 if checkpointing is disabled. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java index a4c7ed2228207..5c7b80d34ce43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java @@ -180,7 +180,7 @@ void testCheckpointTracking() throws Exception { null, 42); tracker.reportInitializationStarted(Collections.emptySet(), 123L); - tracker.reportRestoredCheckpoint(restored); + reportRestoredCheckpoint(tracker, restored); CheckpointStatsSnapshot snapshot = tracker.createSnapshot(); @@ -340,7 +340,8 @@ void testCreateSnapshot() { // Restore operation => new snapshot tracker.reportInitializationStarted(Collections.emptySet(), 0); - tracker.reportRestoredCheckpoint( + reportRestoredCheckpoint( + tracker, new RestoredCheckpointStats( 12, CheckpointProperties.forCheckpoint( @@ -410,7 +411,8 @@ public void addSpan(SpanBuilder spanBuilder) { final ExecutionAttemptID executionAttemptId2 = ExecutionAttemptID.randomId(); tracker.reportInitializationStarted( new HashSet<>(Arrays.asList(executionAttemptId3, executionAttemptId2)), 100); - tracker.reportRestoredCheckpoint( + reportRestoredCheckpoint( + tracker, new RestoredCheckpointStats( 42, CheckpointProperties.forCheckpoint( @@ -454,7 +456,8 @@ public void addSpan(SpanBuilder spanBuilder) { final ExecutionAttemptID executionAttemptId = ExecutionAttemptID.randomId(); tracker.reportInitializationStarted( new HashSet<>(Arrays.asList(executionAttemptId1, executionAttemptId)), 100); - tracker.reportRestoredCheckpoint( + reportRestoredCheckpoint( + tracker, new RestoredCheckpointStats( 44, CheckpointProperties.forCheckpoint( @@ -722,7 +725,7 @@ public > G gauge(String name, G gauge) { null, 42); stats.reportInitializationStarted(Collections.emptySet(), restoreTimestamp); - stats.reportRestoredCheckpoint(restored); + reportRestoredCheckpoint(stats, restored); assertThat(numCheckpoints.getValue()).isEqualTo(2); assertThat(numInProgressCheckpoints.getValue()).isZero(); @@ -754,4 +757,13 @@ public > G gauge(String name, G gauge) { private SubtaskStateStats createSubtaskStats(int index) { return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, false, true); } + + private void reportRestoredCheckpoint( + CheckpointStatsTracker tracker, RestoredCheckpointStats restored) { + tracker.reportRestoredCheckpoint( + restored.getCheckpointId(), + restored.getProperties(), + restored.getExternalPath(), + restored.getStateSize()); + } } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 3508289b8d65c..01583b17d6be0 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -245,21 +245,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable { enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE) } - /** - * Method for enabling fault-tolerance. Activates monitoring and backup of streaming operator - * states. Time interval between state checkpoints is specified in in millis. - * - * Setting this option assumes that the job is used in production and thus if not stated - * explicitly otherwise with calling the [[setRestartStrategy]] method in case of failure the job - * will be resubmitted to the cluster indefinitely. - */ - @deprecated - @PublicEvolving - def enableCheckpointing(): StreamExecutionEnvironment = { - javaEnv.enableCheckpointing() - this - } - /** @deprecated Use [[getCheckpointingConsistencyMode()]] instead. */ @deprecated def getCheckpointingMode = javaEnv.getCheckpointingMode() diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java index f701aff8d36be..a04cf1e3d2031 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java @@ -162,12 +162,6 @@ public StreamExecutionEnvironment enableCheckpointing(long interval, Checkpointi "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported."); } - @Override - public StreamExecutionEnvironment enableCheckpointing() { - throw new UnsupportedOperationException( - "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported."); - } - @Override public long getCheckpointInterval() { return realExecEnv.getCheckpointInterval(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java index 481199c9eb423..f2307e5515714 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java @@ -631,7 +631,7 @@ public void close() { } private void createIteration(StreamExecutionEnvironment env, int timeoutScale) { - env.enableCheckpointing(); + env.enableCheckpointing(500L); DataStream source = env.fromData(Collections.nCopies(parallelism * 2, false))