From 2ad98ca410358dbfeb668a77f19916aac49415b9 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Sun, 29 Sep 2024 18:23:48 +0800 Subject: [PATCH] [FLINK-36395][runtime] Drain async state processing when operator finish --- .../operators/AbstractAsyncStateStreamOperator.java | 6 ++++++ .../operators/AbstractAsyncStateStreamOperatorV2.java | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 1e4b39326e5af..5a99033288f66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -331,6 +331,12 @@ RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + @Override + public void finish() throws Exception { + super.finish(); + asyncExecutionController.drainInflightRecords(0); + } + @Override public void close() throws Exception { super.close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 01e86a378a01e..0c74736bd9e33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -268,6 +268,12 @@ public RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + @Override + public void finish() throws Exception { + super.finish(); + asyncExecutionController.drainInflightRecords(0); + } + @Override public void close() throws Exception { super.close();