diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index eafe6920557663..67e74fa6b69be1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -405,6 +405,10 @@ RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + public void drainStateRequests() { + asyncExecutionController.drainInflightRecords(0); + } + @Override public void finish() throws Exception { super.finish(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java index f8dd9ad235b536..5debc11cbd8254 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java @@ -93,6 +93,8 @@ public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); StreamingFunctionUtils.snapshotFunctionState( context, getOperatorStateBackend(), userFunction); + // Drain state requests in case the user function modifies the state. + drainStateRequests(); } @Override