diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 2facdaee597ed8..e86ee8cc6cf062 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; @@ -332,17 +333,33 @@ public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); return; } - asyncExecutionController.processNonRecord( - () -> { - // todo: make async operator deal with interruptible watermark - if (timeServiceManager != null) { - CompletableFuture future = timeServiceManager.advanceWatermark(mark); - future.thenAccept(v -> output.emitWatermark(mark)); - asyncExecutionController.drainWithTimerIfNeeded(future); - } else { - output.emitWatermark(mark); - } - }); + asyncExecutionController.processNonRecord(() -> doProcessWatermark(mark, null)); + } + + /** + * Handle the watermark and timers, then run a provided {@link Runnable} asynchronously right + * after the watermark is emitted. + * + * @param mark The watermark. + * @param postAction The runnable for post action. + */ + protected void doProcessWatermark(Watermark mark, @Nullable Runnable postAction) + throws Exception { + // todo: make async operator deal with interruptible watermark + if (timeServiceManager != null) { + CompletableFuture future = timeServiceManager.advanceWatermark(mark); + future.thenAccept(v -> emitWatermark(mark, postAction)); + asyncExecutionController.drainWithTimerIfNeeded(future); + } else { + emitWatermark(mark, postAction); + } + } + + private void emitWatermark(Watermark mark, @Nullable Runnable postAction) { + output.emitWatermark(mark); + if (postAction != null) { + postAction.run(); + } } @Override @@ -368,10 +385,12 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index boolean wasIdle = combinedWatermark.isIdle(); // index is 0-based if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) { - super.processWatermark( - new Watermark(combinedWatermark.getCombinedWatermark())); - } - if (wasIdle != combinedWatermark.isIdle()) { + doProcessWatermark( + new Watermark(combinedWatermark.getCombinedWatermark()), + wasIdle == combinedWatermark.isIdle() + ? null + : () -> output.emitWatermarkStatus(watermarkStatus)); + } else if (wasIdle != combinedWatermark.isIdle()) { output.emitWatermarkStatus(watermarkStatus); } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 34a10110289f2f..04aeae85db695d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; @@ -151,6 +152,11 @@ public final void setAsyncKeyedContextElement( asyncExecutionController.setCurrentContext(currentProcessingContext); } + @Override + public Object getCurrentKey() { + return currentProcessingContext.getKey(); + } + @Override public final void postProcessElement() { // The processElement will be treated as a callback for dummy request. So reference @@ -259,17 +265,33 @@ public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); return; } - asyncExecutionController.processNonRecord( - () -> { - // todo: make async operator deal with interruptible watermark - if (timeServiceManager != null) { - CompletableFuture future = timeServiceManager.advanceWatermark(mark); - future.thenAccept(v -> output.emitWatermark(mark)); - asyncExecutionController.drainWithTimerIfNeeded(future); - } else { - output.emitWatermark(mark); - } - }); + asyncExecutionController.processNonRecord(() -> doProcessWatermark(mark, null)); + } + + /** + * Handle the watermark and timers, then run a provided {@link Runnable} asynchronously right + * after the watermark is emitted. + * + * @param mark The watermark. + * @param postAction The runnable for post action. + */ + protected void doProcessWatermark(Watermark mark, @Nullable Runnable postAction) + throws Exception { + // todo: make async operator deal with interruptible watermark + if (timeServiceManager != null) { + CompletableFuture future = timeServiceManager.advanceWatermark(mark); + future.thenAccept(v -> emitWatermark(mark, postAction)); + asyncExecutionController.drainWithTimerIfNeeded(future); + } else { + emitWatermark(mark, postAction); + } + } + + private void emitWatermark(Watermark mark, @Nullable Runnable postAction) { + output.emitWatermark(mark); + if (postAction != null) { + postAction.run(); + } } @Override @@ -283,10 +305,12 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId) () -> { boolean wasIdle = combinedWatermark.isIdle(); if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) { - super.processWatermark( - new Watermark(combinedWatermark.getCombinedWatermark())); - } - if (wasIdle != combinedWatermark.isIdle()) { + doProcessWatermark( + new Watermark(combinedWatermark.getCombinedWatermark()), + wasIdle == combinedWatermark.isIdle() + ? null + : () -> output.emitWatermarkStatus(watermarkStatus)); + } else if (wasIdle != combinedWatermark.isIdle()) { output.emitWatermarkStatus(watermarkStatus); } }); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java index 600aecb8a48882..4cda2f323020ef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java @@ -29,9 +29,11 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; @@ -292,7 +294,9 @@ void testWatermarkStatus() throws Exception { () -> { try { processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - testOperator.processWatermarkStatus(new WatermarkStatus(0), 1); + testOperator.processWatermark1(new Watermark(205L)); + testOperator.processWatermark2(new Watermark(105L)); + testOperator.processWatermarkStatus(WatermarkStatus.IDLE, 1); } catch (Exception e) { } }); @@ -309,8 +313,12 @@ void testWatermarkStatus() throws Exception { anotherThread.shutdown(); Thread.sleep(1000); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); - assertThat(testOperator.watermarkStatus.isActive()).isTrue(); - assertThat(testOperator.watermarkIndex).isEqualTo(1); + assertThat(testOperator.watermarkStatus.isActive()).isFalse(); + assertThat(testHarness.getOutput()) + .containsExactly( + new StreamRecord<>("EventTimer-5-105"), + new Watermark(105L), + new Watermark(205L)); } } @@ -332,6 +340,8 @@ private static class TestOperator extends AbstractAsyncStateStreamOperator timerService; + TestOperator(ElementOrder elementOrder) { this.elementOrder = elementOrder; } @@ -339,6 +349,9 @@ private static class TestOperator extends AbstractAsyncStateStreamOperator> element) throws synchronized (objectToWait) { objectToWait.wait(); } + timerService.registerEventTimeTimer( + VoidNamespace.INSTANCE, element.getValue().f0 + 100L); } @Override @@ -369,11 +384,18 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index } @Override - public void onEventTime(InternalTimer timer) throws Exception {} + public void onEventTime(InternalTimer timer) throws Exception { + output.collect( + new StreamRecord<>( + "EventTimer-" + timer.getKey() + "-" + timer.getTimestamp())); + } @Override - public void onProcessingTime(InternalTimer timer) - throws Exception {} + public void onProcessingTime(InternalTimer timer) throws Exception { + output.collect( + new StreamRecord<>( + "ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp())); + } public int getProcessed() { return processed.get(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java index 1bcb924ce493cf..719987fe72bec8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; @@ -293,7 +294,11 @@ void testWatermarkStatus() throws Exception { () -> { try { processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - testOperator.processWatermarkStatus(new WatermarkStatus(0), 1); + testOperator.getInputs().get(0).processWatermark(new Watermark(205L)); + testOperator + .getInputs() + .get(0) + .processWatermarkStatus(WatermarkStatus.IDLE); } catch (Exception e) { } }); @@ -308,8 +313,12 @@ void testWatermarkStatus() throws Exception { Thread.sleep(3000); anotherThread.shutdown(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); - assertThat(testOperator.watermarkStatus.isActive()).isTrue(); - assertThat(testOperator.watermarkIndex).isEqualTo(1); + assertThat(testOperator.watermarkStatus.isActive()).isFalse(); + assertThat(testHarness.getOutput()) + .containsExactly( + new StreamRecord<>("EventTimer-5-105"), + new Watermark(205L), + WatermarkStatus.IDLE); } } @@ -372,6 +381,8 @@ private static class SingleInputTestOperator extends AbstractAsyncStateStreamOpe private WatermarkStatus watermarkStatus = new WatermarkStatus(-1); private int watermarkIndex = -1; + InternalTimerService timerService; + public SingleInputTestOperator( StreamOperatorParameters parameters, ElementOrder elementOrder) { super(parameters, 1); @@ -385,6 +396,8 @@ public void processElement(StreamRecord> element) synchronized (objectToWait) { objectToWait.wait(); } + timerService.registerEventTimeTimer( + VoidNamespace.INSTANCE, element.getValue().f0 + 100L); } }; } @@ -392,6 +405,9 @@ public void processElement(StreamRecord> element) @Override public void open() throws Exception { super.open(); + this.timerService = + getInternalTimerService( + "processing timer", VoidNamespaceSerializer.INSTANCE, this); } @Override @@ -405,11 +421,18 @@ public ElementOrder getElementOrder() { } @Override - public void onEventTime(InternalTimer timer) throws Exception {} + public void onEventTime(InternalTimer timer) throws Exception { + output.collect( + new StreamRecord<>( + "EventTimer-" + timer.getKey() + "-" + timer.getTimestamp())); + } @Override - public void onProcessingTime(InternalTimer timer) - throws Exception {} + public void onProcessingTime(InternalTimer timer) throws Exception { + output.collect( + new StreamRecord<>( + "ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp())); + } @Override public void processRecordAttributes(RecordAttributes recordAttributes, int inputId)