From 9e38f32889e097f5a9b81f949643a2b037c1035c Mon Sep 17 00:00:00 2001
From: Zakelly <zakelly.lan@gmail.com>
Date: Thu, 19 Dec 2024 17:51:44 +0800
Subject: [PATCH] [FLINK-36928] Split the action of Epoch into two parts
 (trigger & final) (#25816)

---
 .../AsyncExecutionController.java             |  57 +++++--
 .../runtime/asyncprocessing/EpochManager.java | 147 ++++++++++++++----
 .../AbstractAsyncStateStreamOperator.java     |  72 ++++-----
 .../AbstractAsyncStateStreamOperatorV2.java   |  62 +++-----
 .../operators/InternalTimeServiceManager.java |   3 +-
 .../InternalTimeServiceManagerImpl.java       |   9 +-
 .../InternalTimerServiceAsyncImpl.java        |  13 +-
 .../operators/InternalTimerServiceImpl.java   |   4 +-
 ...chExecutionInternalTimeServiceManager.java |   4 +-
 .../AsyncExecutionControllerTest.java         |   8 +-
 .../asyncprocessing/EpochManagerTest.java     |  26 +++-
 .../AbstractAsyncStateStreamOperatorTest.java |  69 +++++++-
 .../InternalTimerServiceAsyncImplTest.java    |   8 +-
 .../MailboxWatermarkProcessorTest.java        |   3 +-
 ...bstractAsyncStateStreamOperatorV2Test.java |  84 ++++++++++
 15 files changed, 399 insertions(+), 170 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index f3e13ed33d8e1..27cec546bb27a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -196,20 +196,38 @@ public AsyncExecutionController(
      * @return the built record context.
      */
     public RecordContext<K> buildContext(Object record, K key) {
+        return buildContext(record, key, false);
+    }
+
+    /**
+     * Build a new context based on record and key. Also wired with internal {@link
+     * KeyAccountingUnit}.
+     *
+     * @param record the given record.
+     * @param key the given key.
+     * @param inheritEpoch whether to inherit epoch from the current context. Or otherwise create a
+     *     new one.
+     * @return the built record context.
+     */
+    public RecordContext<K> buildContext(Object record, K key, boolean inheritEpoch) {
         if (record == null) {
             return new RecordContext<>(
                     RecordContext.EMPTY_RECORD,
                     key,
                     this::disposeContext,
                     KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
-                    epochManager.onRecord());
+                    inheritEpoch
+                            ? epochManager.onEpoch(currentContext.getEpoch())
+                            : epochManager.onRecord());
         }
         return new RecordContext<>(
                 record,
                 key,
                 this::disposeContext,
                 KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
-                epochManager.onRecord());
+                inheritEpoch
+                        ? epochManager.onEpoch(currentContext.getEpoch())
+                        : epochManager.onRecord());
     }
 
     /**
@@ -430,16 +448,31 @@ private void notifyNewMail() {
         }
     }
 
-    public void processNonRecord(ThrowingRunnable<? extends Exception> action) {
-        Runnable wrappedAction =
-                () -> {
-                    try {
-                        action.run();
-                    } catch (Exception e) {
-                        exceptionHandler.handleException("Failed to process non-record.", e);
-                    }
-                };
-        epochManager.onNonRecord(wrappedAction, epochParallelMode);
+    public void processNonRecord(
+            @Nullable ThrowingRunnable<? extends Exception> triggerAction,
+            @Nullable ThrowingRunnable<? extends Exception> finalAction) {
+        epochManager.onNonRecord(
+                triggerAction == null
+                        ? null
+                        : () -> {
+                            try {
+                                triggerAction.run();
+                            } catch (Exception e) {
+                                exceptionHandler.handleException(
+                                        "Failed to process non-record.", e);
+                            }
+                        },
+                finalAction == null
+                        ? null
+                        : () -> {
+                            try {
+                                finalAction.run();
+                            } catch (Exception e) {
+                                exceptionHandler.handleException(
+                                        "Failed to process non-record.", e);
+                            }
+                        },
+                epochParallelMode);
     }
 
     @VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java
index 2d88ebe999256..5154bea8cb122 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java
@@ -68,12 +68,23 @@ public enum ParallelMode {
     /** Current active epoch, only one active epoch at the same time. */
     Epoch activeEpoch;
 
+    /**
+     * The epoch that is possibly in finishing status, the following new records should share this
+     * epoch.
+     */
+    @Nullable Epoch finishingEpoch;
+
+    /** The flag that prevent the recursive call of {@link #tryFinishInQueue()}. */
+    boolean recursiveFlag;
+
     public EpochManager(AsyncExecutionController<?> aec) {
         this.epochNum = 0;
         this.outputQueue = new LinkedList<>();
         this.asyncExecutionController = aec;
         // init an empty epoch, the epoch action will be updated when non-record is received.
         this.activeEpoch = new Epoch(epochNum++);
+        this.finishingEpoch = null;
+        this.recursiveFlag = false;
     }
 
     /**
@@ -83,23 +94,42 @@ public EpochManager(AsyncExecutionController<?> aec) {
      * @return the current open epoch.
      */
     public Epoch onRecord() {
-        activeEpoch.ongoingRecordCount++;
-        return activeEpoch;
+        if (finishingEpoch != null) {
+            finishingEpoch.ongoingRecordCount++;
+            return finishingEpoch;
+        } else {
+            activeEpoch.ongoingRecordCount++;
+            return activeEpoch;
+        }
+    }
+
+    /**
+     * Add a record to a specified epoch.
+     *
+     * @param epoch the specified epoch.
+     * @return the specified epoch itself.
+     */
+    public Epoch onEpoch(Epoch epoch) {
+        epoch.ongoingRecordCount++;
+        return epoch;
     }
 
     /**
      * Add a non-record to the current epoch, close current epoch and open a new epoch. Must be
      * invoked within task thread.
      *
-     * @param action the action associated with this non-record.
+     * @param triggerAction the action associated with this non-record.
      * @param parallelMode the parallel mode for this epoch.
      */
-    public void onNonRecord(Runnable action, ParallelMode parallelMode) {
+    public void onNonRecord(
+            @Nullable Runnable triggerAction,
+            @Nullable Runnable finalAction,
+            ParallelMode parallelMode) {
         LOG.trace(
                 "on NonRecord, old epoch: {}, outputQueue size: {}",
                 activeEpoch,
                 outputQueue.size());
-        switchActiveEpoch(action);
+        switchActiveEpoch(triggerAction, finalAction);
         if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
             asyncExecutionController.drainInflightRecords(0);
         }
@@ -118,19 +148,34 @@ public void completeOneRecord(Epoch epoch) {
     }
 
     private void tryFinishInQueue() {
+        // We don't permit recursive call of this method.
+        if (recursiveFlag) {
+            return;
+        }
+        recursiveFlag = true;
         // If one epoch has been closed before and all records in
         // this epoch have finished, the epoch will be removed from the output queue.
-        while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
-            LOG.trace(
-                    "Finish epoch: {}, outputQueue size: {}",
-                    outputQueue.peek(),
-                    outputQueue.size());
-            outputQueue.pop();
+        while (!outputQueue.isEmpty()) {
+            Epoch epoch = outputQueue.peek();
+            // The epoch is set for inheritance during possible trigger action.
+            finishingEpoch = epoch;
+            try {
+                if (epoch.tryFinish()) {
+                    outputQueue.pop();
+                } else {
+                    break;
+                }
+            } finally {
+                // Clear the override
+                finishingEpoch = null;
+            }
         }
+        recursiveFlag = false;
     }
 
-    private void switchActiveEpoch(Runnable action) {
-        activeEpoch.close(action);
+    private void switchActiveEpoch(
+            @Nullable Runnable triggerAction, @Nullable Runnable finalAction) {
+        activeEpoch.close(triggerAction, finalAction);
         outputQueue.offer(activeEpoch);
         this.activeEpoch = new Epoch(epochNum++);
         tryFinishInQueue();
@@ -151,7 +196,15 @@ enum EpochStatus {
         /**
          * One epoch can only be finished when it meets the following three conditions. 1. The
          * records of this epoch have finished execution. 2. The epoch is closed. 3. The epoch is in
-         * the front of outputQueue.
+         * the front of outputQueue. When the status transit from {@link #CLOSED} to {@link
+         * #FINISHING}, a trigger action will go.
+         */
+        FINISHING,
+
+        /**
+         * After the action is triggered, there might be more async process bind to this epoch.
+         * After all these process finished, a final action will go and the epoch will fall into
+         * {@link #FINISHED} status.
          */
         FINISHED
     }
@@ -167,8 +220,14 @@ public static class Epoch {
         /** The number of records that are still ongoing in this epoch. */
         int ongoingRecordCount;
 
-        /** The action associated with non-record of this epoch(e.g. advance watermark). */
-        @Nullable Runnable action;
+        /** The action associated with non-record of this epoch(e.g. triggering timer). */
+        @Nullable Runnable triggerAction;
+
+        /**
+         * The action when we finish this epoch and the triggerAction as well as any async
+         * processing.
+         */
+        @Nullable Runnable finalAction;
 
         EpochStatus status;
 
@@ -176,34 +235,58 @@ public Epoch(long id) {
             this.id = id;
             this.ongoingRecordCount = 0;
             this.status = EpochStatus.OPEN;
-            this.action = null;
+            this.triggerAction = null;
+            this.finalAction = null;
         }
 
         /**
-         * Try to finish this epoch.
+         * Try to finish this epoch. This is the core logic of triggering actions. The state machine
+         * and timeline are as follows:
+         *
+         * <pre>
+         * Action:     close()       triggerAction       wait             finalAction
+         * Statue:  OPEN ----- CLOSED ----------FINISHING -------- FINISHED -----------
+         * </pre>
          *
          * @return whether this epoch has been normally finished.
          */
         boolean tryFinish() {
-            if (this.status == EpochStatus.FINISHED) {
-                // This epoch has been finished for some reason, but it is not finished here.
-                // Preventing recursive call of #tryFinishInQueue().
-                return false;
-            }
-            if (ongoingRecordCount == 0 && this.status == EpochStatus.CLOSED) {
-                this.status = EpochStatus.FINISHED;
-                if (action != null) {
-                    action.run();
+            if (ongoingRecordCount == 0) {
+                if (status == EpochStatus.CLOSED) {
+                    // CLOSED -> FINISHING
+                    transition(EpochStatus.FINISHING);
+                    if (triggerAction != null) {
+                        // trigger action will use {@link overrideEpoch}.
+                        triggerAction.run();
+                    }
+                }
+                // After the triggerAction run, if there is no new async process, the
+                // ongoingRecordCount remains 0, then the status should transit to FINISHED.
+                // Otherwise, we will reach here when ongoingRecordCount reaches 0 again.
+                if (ongoingRecordCount == 0 && status == EpochStatus.FINISHING) {
+                    // FINISHING -> FINISHED
+                    transition(EpochStatus.FINISHED);
+                    if (finalAction != null) {
+                        finalAction.run();
+                    }
                 }
-                return true;
+                return status == EpochStatus.FINISHED;
             }
             return false;
         }
 
-        /** Close this epoch. */
-        void close(Runnable action) {
-            this.action = action;
-            this.status = EpochStatus.CLOSED;
+        void transition(EpochStatus newStatus) {
+            if (status != newStatus) {
+                LOG.trace("Epoch {} transit from {} to {}", this, status, newStatus);
+                status = newStatus;
+            }
+        }
+
+        /** Close this epoch with defined triggerAction and finalAction. */
+        void close(@Nullable Runnable triggerAction, @Nullable Runnable finalAction) {
+            this.triggerAction = triggerAction;
+            this.finalAction = finalAction;
+            transition(EpochStatus.CLOSED);
         }
 
         public String toString() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index e86ee8cc6cf06..c8b70819e7aa9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -55,9 +55,9 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -178,7 +178,7 @@ public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processin
         RecordContext<K> previousContext = currentProcessingContext;
 
         // build a context and switch to the new context
-        currentProcessingContext = asyncExecutionController.buildContext(null, key);
+        currentProcessingContext = asyncExecutionController.buildContext(null, key, true);
         currentProcessingContext.retain();
         asyncExecutionController.setCurrentContext(currentProcessingContext);
         // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key
@@ -319,7 +319,8 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
             super.reportOrForwardLatencyMarker(marker);
             return;
         }
-        asyncExecutionController.processNonRecord(() -> super.reportOrForwardLatencyMarker(marker));
+        asyncExecutionController.processNonRecord(
+                null, () -> super.reportOrForwardLatencyMarker(marker));
     }
 
     // ------------------------------------------------------------------------
@@ -333,33 +334,9 @@ public void processWatermark(Watermark mark) throws Exception {
             super.processWatermark(mark);
             return;
         }
-        asyncExecutionController.processNonRecord(() -> doProcessWatermark(mark, null));
-    }
-
-    /**
-     * Handle the watermark and timers, then run a provided {@link Runnable} asynchronously right
-     * after the watermark is emitted.
-     *
-     * @param mark The watermark.
-     * @param postAction The runnable for post action.
-     */
-    protected void doProcessWatermark(Watermark mark, @Nullable Runnable postAction)
-            throws Exception {
-        // todo: make async operator deal with interruptible watermark
-        if (timeServiceManager != null) {
-            CompletableFuture<Void> future = timeServiceManager.advanceWatermark(mark);
-            future.thenAccept(v -> emitWatermark(mark, postAction));
-            asyncExecutionController.drainWithTimerIfNeeded(future);
-        } else {
-            emitWatermark(mark, postAction);
-        }
-    }
-
-    private void emitWatermark(Watermark mark, @Nullable Runnable postAction) {
-        output.emitWatermark(mark);
-        if (postAction != null) {
-            postAction.run();
-        }
+        asyncExecutionController.processNonRecord(
+                timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark),
+                () -> output.emitWatermark(mark));
     }
 
     @Override
@@ -370,7 +347,7 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Excep
             return;
         }
         asyncExecutionController.processNonRecord(
-                () -> super.processWatermarkStatus(watermarkStatus));
+                null, () -> super.processWatermarkStatus(watermarkStatus));
     }
 
     @Override
@@ -380,17 +357,24 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index
             super.processWatermarkStatus(watermarkStatus, index);
             return;
         }
+        final AtomicBoolean wasIdle = new AtomicBoolean(false);
+        final AtomicReference<Watermark> watermarkRef = new AtomicReference<>(null);
         asyncExecutionController.processNonRecord(
                 () -> {
-                    boolean wasIdle = combinedWatermark.isIdle();
+                    wasIdle.set(combinedWatermark.isIdle());
                     // index is 0-based
                     if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
-                        doProcessWatermark(
-                                new Watermark(combinedWatermark.getCombinedWatermark()),
-                                wasIdle == combinedWatermark.isIdle()
-                                        ? null
-                                        : () -> output.emitWatermarkStatus(watermarkStatus));
-                    } else if (wasIdle != combinedWatermark.isIdle()) {
+                        watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark()));
+                        if (timeServiceManager != null) {
+                            timeServiceManager.advanceWatermark(watermarkRef.get());
+                        }
+                    }
+                },
+                () -> {
+                    if (watermarkRef.get() != null) {
+                        output.emitWatermark(watermarkRef.get());
+                    }
+                    if (wasIdle.get() != combinedWatermark.isIdle()) {
                         output.emitWatermarkStatus(watermarkStatus);
                     }
                 });
@@ -404,7 +388,7 @@ public void processRecordAttributes(RecordAttributes recordAttributes) throws Ex
             return;
         }
         asyncExecutionController.processNonRecord(
-                () -> super.processRecordAttributes(recordAttributes));
+                null, () -> super.processRecordAttributes(recordAttributes));
     }
 
     @Experimental
@@ -415,7 +399,7 @@ public void processRecordAttributes1(RecordAttributes recordAttributes) {
             return;
         }
         asyncExecutionController.processNonRecord(
-                () -> super.processRecordAttributes1(recordAttributes));
+                null, () -> super.processRecordAttributes1(recordAttributes));
     }
 
     @Experimental
@@ -426,7 +410,7 @@ public void processRecordAttributes2(RecordAttributes recordAttributes) {
             return;
         }
         asyncExecutionController.processNonRecord(
-                () -> super.processRecordAttributes2(recordAttributes));
+                null, () -> super.processRecordAttributes2(recordAttributes));
     }
 
     @VisibleForTesting
@@ -439,6 +423,10 @@ RecordContext getCurrentProcessingContext() {
         return currentProcessingContext;
     }
 
+    public <K> AsyncKeyedStateBackend<K> getAsyncKeyedStateBackend() {
+        return stateHandler.getAsyncKeyedStateBackend();
+    }
+
     public void drainStateRequests() {
         if (isAsyncStateProcessingEnabled()) {
             asyncExecutionController.drainInflightRecords(0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
index 0b0752f122af8..6c90e4f997f9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
@@ -51,9 +51,9 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -179,7 +179,7 @@ public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processin
         RecordContext<K> previousContext = currentProcessingContext;
 
         // build a context and switch to the new context
-        currentProcessingContext = asyncExecutionController.buildContext(null, key);
+        currentProcessingContext = asyncExecutionController.buildContext(null, key, true);
         currentProcessingContext.retain();
         asyncExecutionController.setCurrentContext(currentProcessingContext);
         // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key
@@ -253,7 +253,8 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
             super.reportOrForwardLatencyMarker(marker);
             return;
         }
-        asyncExecutionController.processNonRecord(() -> super.reportOrForwardLatencyMarker(marker));
+        asyncExecutionController.processNonRecord(
+                null, () -> super.reportOrForwardLatencyMarker(marker));
     }
 
     // ------------------------------------------------------------------------
@@ -265,33 +266,9 @@ public void processWatermark(Watermark mark) throws Exception {
             super.processWatermark(mark);
             return;
         }
-        asyncExecutionController.processNonRecord(() -> doProcessWatermark(mark, null));
-    }
-
-    /**
-     * Handle the watermark and timers, then run a provided {@link Runnable} asynchronously right
-     * after the watermark is emitted.
-     *
-     * @param mark The watermark.
-     * @param postAction The runnable for post action.
-     */
-    protected void doProcessWatermark(Watermark mark, @Nullable Runnable postAction)
-            throws Exception {
-        // todo: make async operator deal with interruptible watermark
-        if (timeServiceManager != null) {
-            CompletableFuture<Void> future = timeServiceManager.advanceWatermark(mark);
-            future.thenAccept(v -> emitWatermark(mark, postAction));
-            asyncExecutionController.drainWithTimerIfNeeded(future);
-        } else {
-            emitWatermark(mark, postAction);
-        }
-    }
-
-    private void emitWatermark(Watermark mark, @Nullable Runnable postAction) {
-        output.emitWatermark(mark);
-        if (postAction != null) {
-            postAction.run();
-        }
+        asyncExecutionController.processNonRecord(
+                timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark),
+                () -> output.emitWatermark(mark));
     }
 
     @Override
@@ -301,16 +278,23 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId)
             super.processWatermarkStatus(watermarkStatus, inputId);
             return;
         }
+        final AtomicBoolean wasIdle = new AtomicBoolean(false);
+        final AtomicReference<Watermark> watermarkRef = new AtomicReference<>(null);
         asyncExecutionController.processNonRecord(
                 () -> {
-                    boolean wasIdle = combinedWatermark.isIdle();
+                    wasIdle.set(combinedWatermark.isIdle());
                     if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) {
-                        doProcessWatermark(
-                                new Watermark(combinedWatermark.getCombinedWatermark()),
-                                wasIdle == combinedWatermark.isIdle()
-                                        ? null
-                                        : () -> output.emitWatermarkStatus(watermarkStatus));
-                    } else if (wasIdle != combinedWatermark.isIdle()) {
+                        watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark()));
+                        if (timeServiceManager != null) {
+                            timeServiceManager.advanceWatermark(watermarkRef.get());
+                        }
+                    }
+                },
+                () -> {
+                    if (watermarkRef.get() != null) {
+                        output.emitWatermark(watermarkRef.get());
+                    }
+                    if (wasIdle.get() != combinedWatermark.isIdle()) {
                         output.emitWatermarkStatus(watermarkStatus);
                     }
                 });
@@ -324,7 +308,7 @@ public void processRecordAttributes(RecordAttributes recordAttributes, int input
             return;
         }
         asyncExecutionController.processNonRecord(
-                () -> super.processRecordAttributes(recordAttributes, inputId));
+                null, () -> super.processRecordAttributes(recordAttributes, inputId));
     }
 
     @VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 5ae07a858dd9c..782bbe7856b9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -31,7 +31,6 @@
 import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
 
 import java.io.Serializable;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * An entity keeping all the time-related services.
@@ -85,7 +84,7 @@ <N> InternalTimerService<N> getAsyncInternalTimerService(
      * Advances the Watermark of all managed {@link InternalTimerService timer services},
      * potentially firing event time timers.
      */
-    CompletableFuture<Void> advanceWatermark(Watermark watermark) throws Exception;
+    void advanceWatermark(Watermark watermark) throws Exception;
 
     /**
      * Try to {@link #advanceWatermark(Watermark)}, but if {@link ShouldStopAdvancingFn} returns
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
index 319d29641a199..3b5f81697f81e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
@@ -44,7 +44,6 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -239,14 +238,10 @@ KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorit
     }
 
     @Override
-    public CompletableFuture<Void> advanceWatermark(Watermark watermark) throws Exception {
-        CompletableFuture<?>[] futures = new CompletableFuture[timerServices.size()];
-        int index = 0;
+    public void advanceWatermark(Watermark watermark) throws Exception {
         for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
-            futures[index] = service.advanceWatermark(watermark.getTimestamp());
-            index++;
+            service.advanceWatermark(watermark.getTimestamp());
         }
-        return CompletableFuture.allOf(futures);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
index 4db3658365eb3..3ccfa8f2eda8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
@@ -30,10 +30,6 @@
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
 /**
  * An implementation of {@link InternalTimerService} that is used by {@link
  * org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
@@ -102,23 +98,18 @@ void onProcessingTime(long time) throws Exception {
      * @param time the time in watermark.
      */
     @Override
-    public CompletableFuture<Void> advanceWatermark(long time) throws Exception {
+    public void advanceWatermark(long time) throws Exception {
         currentWatermark = time;
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
         InternalTimer<K, N> timer;
         while ((timer = eventTimeTimersQueue.peek()) != null
                 && timer.getTimestamp() <= time
                 && !cancellationContext.isCancelled()) {
             eventTimeTimersQueue.poll();
             final InternalTimer<K, N> timerToTrigger = timer;
-            CompletableFuture<Void> future = new CompletableFuture<>();
             maintainContextAndProcess(
-                            timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger))
-                    .thenAccept(v -> future.complete(null));
-            futures.add(future);
+                    timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger));
             taskIOMetricGroup.getNumFiredTimers().inc();
         }
-        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
     }
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index bdbba32f5540e..269ba493b1802 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -36,7 +36,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -307,7 +306,7 @@ void onProcessingTime(long time) throws Exception {
         }
     }
 
-    public CompletableFuture<Void> advanceWatermark(long time) throws Exception {
+    public void advanceWatermark(long time) throws Exception {
         Preconditions.checkState(
                 tryAdvanceWatermark(
                         time,
@@ -315,7 +314,6 @@ public CompletableFuture<Void> advanceWatermark(long time) throws Exception {
                             // Never stop advancing.
                             return false;
                         }));
-        return CompletableFuture.completedFuture(null);
     }
 
     /**
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
index 563fd62409be5..57f0b3f6f0552 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
@@ -37,7 +37,6 @@
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -87,11 +86,10 @@ public <N> InternalTimerService<N> getAsyncInternalTimerService(
     }
 
     @Override
-    public CompletableFuture<Void> advanceWatermark(Watermark watermark) {
+    public void advanceWatermark(Watermark watermark) {
         if (watermark.getTimestamp() == Long.MAX_VALUE) {
             keySelected(null);
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index ae0c96cb3d95a..c52432e53b6fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -691,7 +691,7 @@ void testEpochManager() throws Exception {
 
         assertThat(epoch1).isEqualTo(epoch2);
         assertThat(epoch1.ongoingRecordCount).isEqualTo(2);
-        aec.processNonRecord(() -> output.incrementAndGet());
+        aec.processNonRecord(null, () -> output.incrementAndGet());
 
         assertThat(output.get()).isEqualTo(3);
         // SERIAL_BETWEEN_EPOCH mode would drain in-flight records on non-record arriving.
@@ -724,7 +724,7 @@ void testMixEpochMode() throws Exception {
         userCode.run();
 
         aec.epochManager.onNonRecord(
-                () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH);
+                null, () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH);
         assertThat(epoch1.ongoingRecordCount).isEqualTo(1);
 
         String record2 = "key2-r2";
@@ -736,7 +736,7 @@ void testMixEpochMode() throws Exception {
         assertThat(epoch1.ongoingRecordCount).isEqualTo(1);
         assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
         aec.epochManager.onNonRecord(
-                () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH);
+                null, () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH);
         assertThat(epoch1.ongoingRecordCount).isEqualTo(1);
         assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
         assertThat(output.get()).isEqualTo(0);
@@ -751,7 +751,7 @@ void testMixEpochMode() throws Exception {
         assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
         assertThat(epoch3.ongoingRecordCount).isEqualTo(1);
         aec.epochManager.onNonRecord(
-                () -> output.incrementAndGet(), ParallelMode.SERIAL_BETWEEN_EPOCH);
+                null, () -> output.incrementAndGet(), ParallelMode.SERIAL_BETWEEN_EPOCH);
         assertThat(epoch1.ongoingRecordCount).isEqualTo(0);
         assertThat(epoch2.ongoingRecordCount).isEqualTo(0);
         assertThat(epoch3.ongoingRecordCount).isEqualTo(0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java
index df69d780d4181..b6e5f957f0dfd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java
@@ -39,7 +39,7 @@ void testBasic() {
         assertThat(epoch1.ongoingRecordCount).isEqualTo(2);
         AtomicInteger output = new AtomicInteger(0);
         epochManager.onNonRecord(
-                () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH);
+                null, () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH);
         // record3 is in a new epoch
         Epoch epoch3 = epochManager.onRecord();
         assertThat(epoch3).isNotEqualTo(epoch1);
@@ -60,6 +60,7 @@ void testBasic() {
 
         // Test if in the action there is a record processing. Should not be any error.
         epochManager.onNonRecord(
+                null,
                 () -> {
                     output.incrementAndGet();
                     Epoch epoch4 = epochManager.onRecord();
@@ -68,4 +69,27 @@ void testBasic() {
                 ParallelMode.PARALLEL_BETWEEN_EPOCH);
         assertThat(output.get()).isEqualTo(2);
     }
+
+    @Test
+    void testTwoAction() {
+        EpochManager epochManager = new EpochManager(null);
+        Epoch epoch1 = epochManager.onRecord();
+        Epoch epoch2 = epochManager.onRecord();
+        assertThat(epoch1).isEqualTo(epoch2);
+        assertThat(epoch1.ongoingRecordCount).isEqualTo(2);
+        AtomicInteger output = new AtomicInteger(0);
+        epochManager.onNonRecord(
+                () -> epochManager.onEpoch(epoch1),
+                () -> output.incrementAndGet(),
+                ParallelMode.PARALLEL_BETWEEN_EPOCH);
+        assertThat(epoch1.status).isEqualTo(EpochStatus.CLOSED);
+        assertThat(output.get()).isEqualTo(0);
+        epochManager.completeOneRecord(epoch1);
+        epochManager.completeOneRecord(epoch2);
+        assertThat(epoch1.status).isEqualTo(EpochStatus.FINISHING);
+        assertThat(output.get()).isEqualTo(0);
+        epochManager.completeOneRecord(epoch1);
+        assertThat(epoch1.status).isEqualTo(EpochStatus.FINISHED);
+        assertThat(output.get()).isEqualTo(1);
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
index 0dfd19ea0fe89..496d46e3f3499 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
@@ -56,6 +56,23 @@
 /** Basic tests for {@link AbstractAsyncStateStreamOperator}. */
 public class AbstractAsyncStateStreamOperatorTest {
 
+    protected AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
+            createTestHarness(
+                    int maxParalelism, int numSubtasks, int subtaskIndex, TestOperator testOperator)
+                    throws Exception {
+        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
+                testHarness =
+                        AsyncKeyedOneInputStreamOperatorTestHarness.create(
+                                testOperator,
+                                new TestKeySelector(),
+                                BasicTypeInfo.INT_TYPE_INFO,
+                                maxParalelism,
+                                numSubtasks,
+                                subtaskIndex);
+        testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
+        return testHarness;
+    }
+
     protected AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
             createTestHarness(
                     int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder)
@@ -137,7 +154,6 @@ void testRecordProcessorWithRecordOrder() throws Exception {
 
     @Test
     void testAsyncProcessWithKey() throws Exception {
-
         TestOperatorWithDirectAsyncProcess testOperator =
                 new TestOperatorWithDirectAsyncProcess(ElementOrder.RECORD_ORDER);
         AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
@@ -243,6 +259,29 @@ void testNonRecordProcess() throws Exception {
         }
     }
 
+    @Test
+    void testWatermark() throws Exception {
+        TestOperatorWithAsyncProcessTimer testOperator =
+                new TestOperatorWithAsyncProcessTimer(ElementOrder.RECORD_ORDER);
+        try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
+                testHarness = createTestHarness(128, 1, 0, testOperator)) {
+            testHarness.open();
+            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+            testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "1")));
+            expectedOutput.add(new StreamRecord<>("EventTimer-1-1"));
+            testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "3")));
+            expectedOutput.add(new StreamRecord<>("EventTimer-1-3"));
+            testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "6")));
+            expectedOutput.add(new StreamRecord<>("EventTimer-1-6"));
+            testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "9")));
+            expectedOutput.add(new StreamRecord<>("EventTimer-1-9"));
+            testHarness.processWatermark(10L);
+            expectedOutput.add(new Watermark(10L));
+            TestHarnessUtil.assertOutputEquals(
+                    "Output was not correct", expectedOutput, testHarness.getOutput());
+        }
+    }
+
     @Test
     void testWatermarkStatus() throws Exception {
         try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
@@ -417,22 +456,42 @@ private static class TestOperatorWithDirectAsyncProcess extends TestOperator {
 
         @Override
         public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
-            System.out.println("processElement " + Thread.currentThread().getName());
             asyncProcessWithKey(
                     element.getValue().f0,
                     () -> {
-                        System.out.println(
-                                "asyncProcessWithKey " + Thread.currentThread().getName());
                         processed.incrementAndGet();
                     });
             synchronized (objectToWait) {
                 objectToWait.wait();
             }
-            System.out.println("processElement " + Thread.currentThread().getName());
             processed.incrementAndGet();
         }
     }
 
+    private static class TestOperatorWithAsyncProcessTimer extends TestOperator {
+
+        TestOperatorWithAsyncProcessTimer(ElementOrder elementOrder) {
+            super(elementOrder);
+        }
+
+        @Override
+        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
+            processed.incrementAndGet();
+            timerService.registerEventTimeTimer(
+                    VoidNamespace.INSTANCE, Long.parseLong(element.getValue().f1));
+        }
+
+        @Override
+        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
+            asyncProcessWithKey(timer.getKey(), () -> super.onEventTime(timer));
+        }
+
+        @Override
+        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
+            asyncProcessWithKey(timer.getKey(), () -> super.onProcessingTime(timer));
+        }
+    }
+
     private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOperator<Long>
             implements TwoInputStreamOperator<Long, Long, Long>,
                     Triggerable<Integer, VoidNamespace> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
index 56801a63b2333..a876b8cfacf31 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
@@ -39,9 +39,6 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link InternalTimerServiceAsyncImpl}. */
@@ -187,11 +184,8 @@ void testSameKeyEventTimerFireOrder() throws Exception {
         service.registerEventTimeTimer("event-timer-2", 2L);
         service.registerEventTimeTimer("event-timer-3", 3L);
         assertThat(testTriggerable.eventTriggerCount).isEqualTo(1);
-        CompletableFuture<Void> future = service.advanceWatermark(3L);
-        AtomicBoolean done = new AtomicBoolean(false);
+        service.advanceWatermark(3L);
         assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
-        future.thenAccept((v) -> done.set(true)).get();
-        assertThat(done.get()).isTrue();
         assertThat(testTriggerable.eventTriggerCount).isEqualTo(3);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
index be73282d878fb..87f19f4032d2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
@@ -36,7 +36,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -108,7 +107,7 @@ public <N> InternalTimerService<N> getAsyncInternalTimerService(
         }
 
         @Override
-        public CompletableFuture<Void> advanceWatermark(Watermark watermark) throws Exception {
+        public void advanceWatermark(Watermark watermark) throws Exception {
             throw new UnsupportedOperationException();
         }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
index 11cb749d50405..0a9521dcb5af5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
@@ -240,6 +240,40 @@ void testNonRecordProcess() throws Exception {
         }
     }
 
+    @Test
+    void testWatermark() throws Exception {
+        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String>
+                testHarness =
+                        KeyedOneInputStreamOperatorV2TestHarness.create(
+                                new TestWithAsyncProcessTimerOperatorFactory(
+                                        ElementOrder.RECORD_ORDER),
+                                new TestKeySelector(),
+                                BasicTypeInfo.INT_TYPE_INFO,
+                                128,
+                                1,
+                                0);
+        testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
+
+        try {
+            testHarness.open();
+            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+            testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "1")));
+            expectedOutput.add(new StreamRecord<>("EventTimer-1-1"));
+            testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "3")));
+            expectedOutput.add(new StreamRecord<>("EventTimer-1-3"));
+            testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "6")));
+            expectedOutput.add(new StreamRecord<>("EventTimer-1-6"));
+            testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "9")));
+            expectedOutput.add(new StreamRecord<>("EventTimer-1-9"));
+            testHarness.processWatermark(10L);
+            expectedOutput.add(new Watermark(10L));
+            TestHarnessUtil.assertOutputEquals(
+                    "Output was not correct", expectedOutput, testHarness.getOutput());
+        } finally {
+            testHarness.close();
+        }
+    }
+
     @Test
     void testWatermarkStatus() throws Exception {
         try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String>
@@ -611,6 +645,56 @@ public void processElement(StreamRecord<Tuple2<Integer, String>> element)
         }
     }
 
+    private static class TestWithAsyncProcessTimerOperatorFactory
+            extends AbstractStreamOperatorFactory<String> {
+
+        private final ElementOrder elementOrder;
+
+        TestWithAsyncProcessTimerOperatorFactory(ElementOrder elementOrder) {
+            this.elementOrder = elementOrder;
+        }
+
+        @Override
+        public <T extends StreamOperator<String>> T createStreamOperator(
+                StreamOperatorParameters<String> parameters) {
+            return (T) new SingleInputTestOperatorWithAsyncProcessTimer(parameters, elementOrder);
+        }
+
+        @Override
+        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+            return SingleInputTestOperatorWithAsyncProcessTimer.class;
+        }
+    }
+
+    private static class SingleInputTestOperatorWithAsyncProcessTimer
+            extends SingleInputTestOperator {
+
+        SingleInputTestOperatorWithAsyncProcessTimer(
+                StreamOperatorParameters<String> parameters, ElementOrder elementOrder) {
+            super(parameters, elementOrder);
+            input =
+                    new AbstractInput<Tuple2<Integer, String>, String>(this, 1) {
+                        @Override
+                        public void processElement(StreamRecord<Tuple2<Integer, String>> element)
+                                throws Exception {
+                            processed.incrementAndGet();
+                            timerService.registerEventTimeTimer(
+                                    VoidNamespace.INSTANCE, Long.parseLong(element.getValue().f1));
+                        }
+                    };
+        }
+
+        @Override
+        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
+            asyncProcessWithKey(timer.getKey(), () -> super.onEventTime(timer));
+        }
+
+        @Override
+        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
+            asyncProcessWithKey(timer.getKey(), () -> super.onProcessingTime(timer));
+        }
+    }
+
     private static class WatermarkTestingOperatorFactory
             extends AbstractStreamOperatorFactory<Long> {
         @Override