From 97f4ac4091f27b6156a4404aef8c8b0819eb3822 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Mon, 2 Dec 2024 14:54:56 +0800 Subject: [PATCH] [FLINK-36823][Checkpointing] Drain state requests after the user function perform snapshotState --- .../operators/AbstractAsyncStateStreamOperator.java | 4 ++++ .../operators/AbstractAsyncStateUdfStreamOperator.java | 2 ++ 2 files changed, 6 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index eafe6920557663..67e74fa6b69be1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -405,6 +405,10 @@ RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + public void drainStateRequests() { + asyncExecutionController.drainInflightRecords(0); + } + @Override public void finish() throws Exception { super.finish(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java index f8dd9ad235b536..5debc11cbd8254 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java @@ -93,6 +93,8 @@ public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); StreamingFunctionUtils.snapshotFunctionState( context, getOperatorStateBackend(), userFunction); + // Drain state requests in case the user function modifies the state. + drainStateRequests(); } @Override