diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 1e4b39326e5af..5a99033288f66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -331,6 +331,12 @@ RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + @Override + public void finish() throws Exception { + super.finish(); + asyncExecutionController.drainInflightRecords(0); + } + @Override public void close() throws Exception { super.close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 01e86a378a01e..0c74736bd9e33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -268,6 +268,12 @@ public RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + @Override + public void finish() throws Exception { + super.finish(); + asyncExecutionController.drainInflightRecords(0); + } + @Override public void close() throws Exception { super.close();