diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 27cec546bb27a..94015ec1c11c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -243,6 +243,10 @@ public void setCurrentContext(RecordContext switchingContext) { } } + public RecordContext getCurrentContext() { + return currentContext; + } + /** * Dispose a context. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index c8b70819e7aa9..db056474b0fa4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -305,7 +305,7 @@ public void setKeyContextElement2(StreamRecord record) throws Exception { @Override public Object getCurrentKey() { - return currentProcessingContext.getKey(); + return asyncExecutionController.getCurrentContext().getKey(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 6c90e4f997f9e..2a0c1a6ebef89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -154,7 +154,7 @@ public final void setAsyncKeyedContextElement( @Override public Object getCurrentKey() { - return currentProcessingContext.getKey(); + return asyncExecutionController.getCurrentContext().getKey(); } @Override