diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 2e785121a7eb6..6e65b95416fc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -243,6 +243,10 @@ public void setCurrentContext(RecordContext switchingContext) { } } + public RecordContext getCurrentContext() { + return currentContext; + } + /** * Dispose a context. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 91af56155daa1..56360deaa1a8f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -305,7 +305,7 @@ public void setKeyContextElement2(StreamRecord record) throws Exception { @Override public Object getCurrentKey() { - return currentProcessingContext.getKey(); + return asyncExecutionController.getCurrentContext().getKey(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 29e3e3d5d904e..0800880dfff71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -154,7 +154,7 @@ public final void setAsyncKeyedContextElement( @Override public Object getCurrentKey() { - return currentProcessingContext.getKey(); + return asyncExecutionController.getCurrentContext().getKey(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java index 0892b401bcded..6ee1924201149 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java @@ -293,6 +293,7 @@ void testWatermarkHooks() throws Exception { testOperator.asyncProcessWithKey( 1L, () -> { + assertThat(testOperator.getCurrentKey()).isEqualTo(1L); testOperator.output(watermark.getTimestamp() + 1000L); }); if (counter.incrementAndGet() % 2 == 0) { @@ -484,6 +485,7 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index @Override public void onEventTime(InternalTimer timer) throws Exception { + assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect( new StreamRecord<>( "EventTimer-" + timer.getKey() + "-" + timer.getTimestamp())); @@ -491,6 +493,7 @@ public void onEventTime(InternalTimer timer) throws Exce @Override public void onProcessingTime(InternalTimer timer) throws Exception { + assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect( new StreamRecord<>( "ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp())); @@ -601,12 +604,14 @@ public void postProcessWatermark(Watermark watermark) throws Exception { @Override public void onEventTime(InternalTimer timer) throws Exception { + assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect(new StreamRecord<>(timer.getTimestamp())); } @Override - public void onProcessingTime(InternalTimer timer) - throws Exception {} + public void onProcessingTime(InternalTimer timer) throws Exception { + assertThat(getCurrentKey()).isEqualTo(timer.getKey()); + } @Override public void processElement1(StreamRecord element) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java index 671df72376185..67d4723bfe101 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java @@ -290,6 +290,7 @@ void testWatermarkHooks() throws Exception { operator.asyncProcessWithKey( 1L, () -> { + assertThat(operator.getCurrentKey()).isEqualTo(1L); operator.output(watermark.getTimestamp() + 1000L); }); if (counter.incrementAndGet() % 2 == 0) { @@ -620,6 +621,7 @@ public ElementOrder getElementOrder() { @Override public void onEventTime(InternalTimer timer) throws Exception { + assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect( new StreamRecord<>( "EventTimer-" + timer.getKey() + "-" + timer.getTimestamp())); @@ -627,6 +629,7 @@ public void onEventTime(InternalTimer timer) throws Exce @Override public void onProcessingTime(InternalTimer timer) throws Exception { + assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect( new StreamRecord<>( "ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp())); @@ -842,12 +845,14 @@ public void postProcessWatermark(Watermark watermark) throws Exception { @Override public void onEventTime(InternalTimer timer) throws Exception { + assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect(new StreamRecord<>(timer.getTimestamp())); } @Override - public void onProcessingTime(InternalTimer timer) - throws Exception {} + public void onProcessingTime(InternalTimer timer) throws Exception { + assertThat(getCurrentKey()).isEqualTo(timer.getKey()); + } private Input createInput(int idx) { return new AbstractInput(this, idx) {