diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 19aee130b5197..f3e13ed33d8e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; @@ -38,6 +39,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; /** @@ -367,8 +369,9 @@ private void seizeCapacity() { * * @param callback the callback to run if it finishes (once the record is not blocked). */ - public void syncPointRequestWithCallback(ThrowingRunnable callback) { - handleRequest(null, StateRequestType.SYNC_POINT, null).thenAccept(v -> callback.run()); + public StateFuture syncPointRequestWithCallback(ThrowingRunnable callback) { + return handleRequest(null, StateRequestType.SYNC_POINT, null) + .thenAccept(v -> callback.run()); } /** @@ -395,6 +398,14 @@ public void drainInflightRecords(int targetNum) { } } + /** A helper function to drain in-flight requests emitted by timer. */ + public void drainWithTimerIfNeeded(CompletableFuture timerFuture) { + if (epochParallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) { + drainInflightRecords(0); + Preconditions.checkState(timerFuture.isDone()); + } + } + /** Wait for new mails if there is no more mail. */ private void waitForNewMails() throws InterruptedException { if (!callbackRunner.isHasMail()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 697aaf5b64bb3..2facdaee597ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -56,6 +56,8 @@ import javax.annotation.Nonnull; +import java.util.concurrent.CompletableFuture; + import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -330,7 +332,17 @@ public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); return; } - asyncExecutionController.processNonRecord(() -> super.processWatermark(mark)); + asyncExecutionController.processNonRecord( + () -> { + // todo: make async operator deal with interruptible watermark + if (timeServiceManager != null) { + CompletableFuture future = timeServiceManager.advanceWatermark(mark); + future.thenAccept(v -> output.emitWatermark(mark)); + asyncExecutionController.drainWithTimerIfNeeded(future); + } else { + output.emitWatermark(mark); + } + }); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 92c256066b47a..34a10110289f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -52,6 +52,8 @@ import javax.annotation.Nonnull; +import java.util.concurrent.CompletableFuture; + import static org.apache.flink.util.Preconditions.checkState; /** @@ -257,7 +259,17 @@ public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); return; } - asyncExecutionController.processNonRecord(() -> super.processWatermark(mark)); + asyncExecutionController.processNonRecord( + () -> { + // todo: make async operator deal with interruptible watermark + if (timeServiceManager != null) { + CompletableFuture future = timeServiceManager.advanceWatermark(mark); + future.thenAccept(v -> output.emitWatermark(mark)); + asyncExecutionController.drainWithTimerIfNeeded(future); + } else { + output.emitWatermark(mark); + } + }); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 782bbe7856b9e..5ae07a858dd9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; import java.io.Serializable; +import java.util.concurrent.CompletableFuture; /** * An entity keeping all the time-related services. @@ -84,7 +85,7 @@ InternalTimerService getAsyncInternalTimerService( * Advances the Watermark of all managed {@link InternalTimerService timer services}, * potentially firing event time timers. */ - void advanceWatermark(Watermark watermark) throws Exception; + CompletableFuture advanceWatermark(Watermark watermark) throws Exception; /** * Try to {@link #advanceWatermark(Watermark)}, but if {@link ShouldStopAdvancingFn} returns diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java index 3b5f81697f81e..319d29641a199 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -238,10 +239,14 @@ KeyGroupedInternalPriorityQueue> createTimerPriorit } @Override - public void advanceWatermark(Watermark watermark) throws Exception { + public CompletableFuture advanceWatermark(Watermark watermark) throws Exception { + CompletableFuture[] futures = new CompletableFuture[timerServices.size()]; + int index = 0; for (InternalTimerServiceImpl service : timerServices.values()) { - service.advanceWatermark(watermark.getTimestamp()); + futures[index] = service.advanceWatermark(watermark.getTimestamp()); + index++; } + return CompletableFuture.allOf(futures); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java index 82083ecff2a2c..4db3658365eb3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; @@ -29,6 +30,10 @@ import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.ThrowingRunnable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + /** * An implementation of {@link InternalTimerService} that is used by {@link * org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}. @@ -97,20 +102,23 @@ void onProcessingTime(long time) throws Exception { * @param time the time in watermark. */ @Override - public void advanceWatermark(long time) throws Exception { + public CompletableFuture advanceWatermark(long time) throws Exception { currentWatermark = time; - + List> futures = new ArrayList<>(); InternalTimer timer; - while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time && !cancellationContext.isCancelled()) { eventTimeTimersQueue.poll(); final InternalTimer timerToTrigger = timer; + CompletableFuture future = new CompletableFuture<>(); maintainContextAndProcess( - timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger)); + timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger)) + .thenAccept(v -> future.complete(null)); + futures.add(future); taskIOMetricGroup.getNumFiredTimers().inc(); } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } /** @@ -125,13 +133,14 @@ protected void foreachTimer( "Batch operation is not supported when using async state."); } - private void maintainContextAndProcess( + private StateFuture maintainContextAndProcess( InternalTimer timer, ThrowingRunnable runnable) { - RecordContext recordCtx = asyncExecutionController.buildContext(null, timer.getKey()); + RecordContext recordCtx = asyncExecutionController.buildContext(timer, timer.getKey()); recordCtx.retain(); asyncExecutionController.setCurrentContext(recordCtx); keyContext.setCurrentKey(timer.getKey()); - asyncExecutionController.syncPointRequestWithCallback(runnable); + StateFuture future = asyncExecutionController.syncPointRequestWithCallback(runnable); recordCtx.release(); + return future; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index 269ba493b1802..bdbba32f5540e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import static org.apache.flink.util.Preconditions.checkArgument; @@ -306,7 +307,7 @@ void onProcessingTime(long time) throws Exception { } } - public void advanceWatermark(long time) throws Exception { + public CompletableFuture advanceWatermark(long time) throws Exception { Preconditions.checkState( tryAdvanceWatermark( time, @@ -314,6 +315,7 @@ public void advanceWatermark(long time) throws Exception { // Never stop advancing. return false; })); + return CompletableFuture.completedFuture(null); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java index 57f0b3f6f0552..563fd62409be5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -86,10 +87,11 @@ public InternalTimerService getAsyncInternalTimerService( } @Override - public void advanceWatermark(Watermark watermark) { + public CompletableFuture advanceWatermark(Watermark watermark) { if (watermark.getTimestamp() == Long.MAX_VALUE) { keySelected(null); } + return CompletableFuture.completedFuture(null); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java index 1bf396bad81de..56801a63b2333 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java @@ -39,6 +39,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link InternalTimerServiceAsyncImpl}. */ @@ -166,6 +169,32 @@ void testEventTimerFireOrder() throws Exception { assertThat(testTriggerable.eventTriggerCount).isEqualTo(2); } + @Test + void testSameKeyEventTimerFireOrder() throws Exception { + keyContext.setCurrentKey("key-1"); + service.registerEventTimeTimer("event-timer-1", 1L); + + SameTimerTriggerable testTriggerable = new SameTimerTriggerable(asyncExecutionController); + service.startTimerService( + IntSerializer.INSTANCE, StringSerializer.INSTANCE, testTriggerable); + assertThat(testTriggerable.eventTriggerCount).isEqualTo(0); + // the event timer should be triggered at watermark 1 + service.advanceWatermark(1L); + assertThat(testTriggerable.eventTriggerCount).isEqualTo(1); + assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0); + + keyContext.setCurrentKey("key-1"); + service.registerEventTimeTimer("event-timer-2", 2L); + service.registerEventTimeTimer("event-timer-3", 3L); + assertThat(testTriggerable.eventTriggerCount).isEqualTo(1); + CompletableFuture future = service.advanceWatermark(3L); + AtomicBoolean done = new AtomicBoolean(false); + assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0); + future.thenAccept((v) -> done.set(true)).get(); + assertThat(done.get()).isTrue(); + assertThat(testTriggerable.eventTriggerCount).isEqualTo(3); + } + private static InternalTimerServiceAsyncImpl createInternalTimerService( TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupsList, @@ -190,6 +219,30 @@ private static InternalTimerServiceAsyncImpl createInternalTimerSer asyncExecutionController); } + private static class SameTimerTriggerable implements Triggerable { + + private AsyncExecutionController aec; + + private static int eventTriggerCount = 0; + + public SameTimerTriggerable(AsyncExecutionController aec) { + this.aec = aec; + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + RecordContext recordContext = aec.buildContext("record", "key"); + aec.setCurrentContext(recordContext); + aec.handleRequestSync(null, StateRequestType.SYNC_POINT, null); + eventTriggerCount++; + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + // skip + } + } + private static class TestTriggerable implements Triggerable { private static int eventTriggerCount = 0; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java index 87f19f4032d2c..be73282d878fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; @@ -107,7 +108,7 @@ public InternalTimerService getAsyncInternalTimerService( } @Override - public void advanceWatermark(Watermark watermark) throws Exception { + public CompletableFuture advanceWatermark(Watermark watermark) throws Exception { throw new UnsupportedOperationException(); }