diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index f3e13ed33d8e1..27cec546bb27a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -196,20 +196,38 @@ public AsyncExecutionController( * @return the built record context. */ public RecordContext buildContext(Object record, K key) { + return buildContext(record, key, false); + } + + /** + * Build a new context based on record and key. Also wired with internal {@link + * KeyAccountingUnit}. + * + * @param record the given record. + * @param key the given key. + * @param inheritEpoch whether to inherit epoch from the current context. Or otherwise create a + * new one. + * @return the built record context. + */ + public RecordContext buildContext(Object record, K key, boolean inheritEpoch) { if (record == null) { return new RecordContext<>( RecordContext.EMPTY_RECORD, key, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism), - epochManager.onRecord()); + inheritEpoch + ? epochManager.onEpoch(currentContext.getEpoch()) + : epochManager.onRecord()); } return new RecordContext<>( record, key, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism), - epochManager.onRecord()); + inheritEpoch + ? epochManager.onEpoch(currentContext.getEpoch()) + : epochManager.onRecord()); } /** @@ -430,16 +448,31 @@ private void notifyNewMail() { } } - public void processNonRecord(ThrowingRunnable action) { - Runnable wrappedAction = - () -> { - try { - action.run(); - } catch (Exception e) { - exceptionHandler.handleException("Failed to process non-record.", e); - } - }; - epochManager.onNonRecord(wrappedAction, epochParallelMode); + public void processNonRecord( + @Nullable ThrowingRunnable triggerAction, + @Nullable ThrowingRunnable finalAction) { + epochManager.onNonRecord( + triggerAction == null + ? null + : () -> { + try { + triggerAction.run(); + } catch (Exception e) { + exceptionHandler.handleException( + "Failed to process non-record.", e); + } + }, + finalAction == null + ? null + : () -> { + try { + finalAction.run(); + } catch (Exception e) { + exceptionHandler.handleException( + "Failed to process non-record.", e); + } + }, + epochParallelMode); } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java index 2d88ebe999256..5154bea8cb122 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java @@ -68,12 +68,23 @@ public enum ParallelMode { /** Current active epoch, only one active epoch at the same time. */ Epoch activeEpoch; + /** + * The epoch that is possibly in finishing status, the following new records should share this + * epoch. + */ + @Nullable Epoch finishingEpoch; + + /** The flag that prevent the recursive call of {@link #tryFinishInQueue()}. */ + boolean recursiveFlag; + public EpochManager(AsyncExecutionController aec) { this.epochNum = 0; this.outputQueue = new LinkedList<>(); this.asyncExecutionController = aec; // init an empty epoch, the epoch action will be updated when non-record is received. this.activeEpoch = new Epoch(epochNum++); + this.finishingEpoch = null; + this.recursiveFlag = false; } /** @@ -83,23 +94,42 @@ public EpochManager(AsyncExecutionController aec) { * @return the current open epoch. */ public Epoch onRecord() { - activeEpoch.ongoingRecordCount++; - return activeEpoch; + if (finishingEpoch != null) { + finishingEpoch.ongoingRecordCount++; + return finishingEpoch; + } else { + activeEpoch.ongoingRecordCount++; + return activeEpoch; + } + } + + /** + * Add a record to a specified epoch. + * + * @param epoch the specified epoch. + * @return the specified epoch itself. + */ + public Epoch onEpoch(Epoch epoch) { + epoch.ongoingRecordCount++; + return epoch; } /** * Add a non-record to the current epoch, close current epoch and open a new epoch. Must be * invoked within task thread. * - * @param action the action associated with this non-record. + * @param triggerAction the action associated with this non-record. * @param parallelMode the parallel mode for this epoch. */ - public void onNonRecord(Runnable action, ParallelMode parallelMode) { + public void onNonRecord( + @Nullable Runnable triggerAction, + @Nullable Runnable finalAction, + ParallelMode parallelMode) { LOG.trace( "on NonRecord, old epoch: {}, outputQueue size: {}", activeEpoch, outputQueue.size()); - switchActiveEpoch(action); + switchActiveEpoch(triggerAction, finalAction); if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) { asyncExecutionController.drainInflightRecords(0); } @@ -118,19 +148,34 @@ public void completeOneRecord(Epoch epoch) { } private void tryFinishInQueue() { + // We don't permit recursive call of this method. + if (recursiveFlag) { + return; + } + recursiveFlag = true; // If one epoch has been closed before and all records in // this epoch have finished, the epoch will be removed from the output queue. - while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) { - LOG.trace( - "Finish epoch: {}, outputQueue size: {}", - outputQueue.peek(), - outputQueue.size()); - outputQueue.pop(); + while (!outputQueue.isEmpty()) { + Epoch epoch = outputQueue.peek(); + // The epoch is set for inheritance during possible trigger action. + finishingEpoch = epoch; + try { + if (epoch.tryFinish()) { + outputQueue.pop(); + } else { + break; + } + } finally { + // Clear the override + finishingEpoch = null; + } } + recursiveFlag = false; } - private void switchActiveEpoch(Runnable action) { - activeEpoch.close(action); + private void switchActiveEpoch( + @Nullable Runnable triggerAction, @Nullable Runnable finalAction) { + activeEpoch.close(triggerAction, finalAction); outputQueue.offer(activeEpoch); this.activeEpoch = new Epoch(epochNum++); tryFinishInQueue(); @@ -151,7 +196,15 @@ enum EpochStatus { /** * One epoch can only be finished when it meets the following three conditions. 1. The * records of this epoch have finished execution. 2. The epoch is closed. 3. The epoch is in - * the front of outputQueue. + * the front of outputQueue. When the status transit from {@link #CLOSED} to {@link + * #FINISHING}, a trigger action will go. + */ + FINISHING, + + /** + * After the action is triggered, there might be more async process bind to this epoch. + * After all these process finished, a final action will go and the epoch will fall into + * {@link #FINISHED} status. */ FINISHED } @@ -167,8 +220,14 @@ public static class Epoch { /** The number of records that are still ongoing in this epoch. */ int ongoingRecordCount; - /** The action associated with non-record of this epoch(e.g. advance watermark). */ - @Nullable Runnable action; + /** The action associated with non-record of this epoch(e.g. triggering timer). */ + @Nullable Runnable triggerAction; + + /** + * The action when we finish this epoch and the triggerAction as well as any async + * processing. + */ + @Nullable Runnable finalAction; EpochStatus status; @@ -176,34 +235,58 @@ public Epoch(long id) { this.id = id; this.ongoingRecordCount = 0; this.status = EpochStatus.OPEN; - this.action = null; + this.triggerAction = null; + this.finalAction = null; } /** - * Try to finish this epoch. + * Try to finish this epoch. This is the core logic of triggering actions. The state machine + * and timeline are as follows: + * + *
+         * Action:     close()       triggerAction       wait             finalAction
+         * Statue:  OPEN ----- CLOSED ----------FINISHING -------- FINISHED -----------
+         * 
* * @return whether this epoch has been normally finished. */ boolean tryFinish() { - if (this.status == EpochStatus.FINISHED) { - // This epoch has been finished for some reason, but it is not finished here. - // Preventing recursive call of #tryFinishInQueue(). - return false; - } - if (ongoingRecordCount == 0 && this.status == EpochStatus.CLOSED) { - this.status = EpochStatus.FINISHED; - if (action != null) { - action.run(); + if (ongoingRecordCount == 0) { + if (status == EpochStatus.CLOSED) { + // CLOSED -> FINISHING + transition(EpochStatus.FINISHING); + if (triggerAction != null) { + // trigger action will use {@link overrideEpoch}. + triggerAction.run(); + } + } + // After the triggerAction run, if there is no new async process, the + // ongoingRecordCount remains 0, then the status should transit to FINISHED. + // Otherwise, we will reach here when ongoingRecordCount reaches 0 again. + if (ongoingRecordCount == 0 && status == EpochStatus.FINISHING) { + // FINISHING -> FINISHED + transition(EpochStatus.FINISHED); + if (finalAction != null) { + finalAction.run(); + } } - return true; + return status == EpochStatus.FINISHED; } return false; } - /** Close this epoch. */ - void close(Runnable action) { - this.action = action; - this.status = EpochStatus.CLOSED; + void transition(EpochStatus newStatus) { + if (status != newStatus) { + LOG.trace("Epoch {} transit from {} to {}", this, status, newStatus); + status = newStatus; + } + } + + /** Close this epoch with defined triggerAction and finalAction. */ + void close(@Nullable Runnable triggerAction, @Nullable Runnable finalAction) { + this.triggerAction = triggerAction; + this.finalAction = finalAction; + transition(EpochStatus.CLOSED); } public String toString() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index e86ee8cc6cf06..c8b70819e7aa9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -55,9 +55,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -178,7 +178,7 @@ public void asyncProcessWithKey(K key, ThrowingRunnable processin RecordContext previousContext = currentProcessingContext; // build a context and switch to the new context - currentProcessingContext = asyncExecutionController.buildContext(null, key); + currentProcessingContext = asyncExecutionController.buildContext(null, key, true); currentProcessingContext.retain(); asyncExecutionController.setCurrentContext(currentProcessingContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key @@ -319,7 +319,8 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) { super.reportOrForwardLatencyMarker(marker); return; } - asyncExecutionController.processNonRecord(() -> super.reportOrForwardLatencyMarker(marker)); + asyncExecutionController.processNonRecord( + null, () -> super.reportOrForwardLatencyMarker(marker)); } // ------------------------------------------------------------------------ @@ -333,33 +334,9 @@ public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); return; } - asyncExecutionController.processNonRecord(() -> doProcessWatermark(mark, null)); - } - - /** - * Handle the watermark and timers, then run a provided {@link Runnable} asynchronously right - * after the watermark is emitted. - * - * @param mark The watermark. - * @param postAction The runnable for post action. - */ - protected void doProcessWatermark(Watermark mark, @Nullable Runnable postAction) - throws Exception { - // todo: make async operator deal with interruptible watermark - if (timeServiceManager != null) { - CompletableFuture future = timeServiceManager.advanceWatermark(mark); - future.thenAccept(v -> emitWatermark(mark, postAction)); - asyncExecutionController.drainWithTimerIfNeeded(future); - } else { - emitWatermark(mark, postAction); - } - } - - private void emitWatermark(Watermark mark, @Nullable Runnable postAction) { - output.emitWatermark(mark); - if (postAction != null) { - postAction.run(); - } + asyncExecutionController.processNonRecord( + timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark), + () -> output.emitWatermark(mark)); } @Override @@ -370,7 +347,7 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Excep return; } asyncExecutionController.processNonRecord( - () -> super.processWatermarkStatus(watermarkStatus)); + null, () -> super.processWatermarkStatus(watermarkStatus)); } @Override @@ -380,17 +357,24 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index super.processWatermarkStatus(watermarkStatus, index); return; } + final AtomicBoolean wasIdle = new AtomicBoolean(false); + final AtomicReference watermarkRef = new AtomicReference<>(null); asyncExecutionController.processNonRecord( () -> { - boolean wasIdle = combinedWatermark.isIdle(); + wasIdle.set(combinedWatermark.isIdle()); // index is 0-based if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) { - doProcessWatermark( - new Watermark(combinedWatermark.getCombinedWatermark()), - wasIdle == combinedWatermark.isIdle() - ? null - : () -> output.emitWatermarkStatus(watermarkStatus)); - } else if (wasIdle != combinedWatermark.isIdle()) { + watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark())); + if (timeServiceManager != null) { + timeServiceManager.advanceWatermark(watermarkRef.get()); + } + } + }, + () -> { + if (watermarkRef.get() != null) { + output.emitWatermark(watermarkRef.get()); + } + if (wasIdle.get() != combinedWatermark.isIdle()) { output.emitWatermarkStatus(watermarkStatus); } }); @@ -404,7 +388,7 @@ public void processRecordAttributes(RecordAttributes recordAttributes) throws Ex return; } asyncExecutionController.processNonRecord( - () -> super.processRecordAttributes(recordAttributes)); + null, () -> super.processRecordAttributes(recordAttributes)); } @Experimental @@ -415,7 +399,7 @@ public void processRecordAttributes1(RecordAttributes recordAttributes) { return; } asyncExecutionController.processNonRecord( - () -> super.processRecordAttributes1(recordAttributes)); + null, () -> super.processRecordAttributes1(recordAttributes)); } @Experimental @@ -426,7 +410,7 @@ public void processRecordAttributes2(RecordAttributes recordAttributes) { return; } asyncExecutionController.processNonRecord( - () -> super.processRecordAttributes2(recordAttributes)); + null, () -> super.processRecordAttributes2(recordAttributes)); } @VisibleForTesting @@ -439,6 +423,10 @@ RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + public AsyncKeyedStateBackend getAsyncKeyedStateBackend() { + return stateHandler.getAsyncKeyedStateBackend(); + } + public void drainStateRequests() { if (isAsyncStateProcessingEnabled()) { asyncExecutionController.drainInflightRecords(0); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 0b0752f122af8..6c90e4f997f9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -51,9 +51,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkState; @@ -179,7 +179,7 @@ public void asyncProcessWithKey(K key, ThrowingRunnable processin RecordContext previousContext = currentProcessingContext; // build a context and switch to the new context - currentProcessingContext = asyncExecutionController.buildContext(null, key); + currentProcessingContext = asyncExecutionController.buildContext(null, key, true); currentProcessingContext.retain(); asyncExecutionController.setCurrentContext(currentProcessingContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key @@ -253,7 +253,8 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) { super.reportOrForwardLatencyMarker(marker); return; } - asyncExecutionController.processNonRecord(() -> super.reportOrForwardLatencyMarker(marker)); + asyncExecutionController.processNonRecord( + null, () -> super.reportOrForwardLatencyMarker(marker)); } // ------------------------------------------------------------------------ @@ -265,33 +266,9 @@ public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); return; } - asyncExecutionController.processNonRecord(() -> doProcessWatermark(mark, null)); - } - - /** - * Handle the watermark and timers, then run a provided {@link Runnable} asynchronously right - * after the watermark is emitted. - * - * @param mark The watermark. - * @param postAction The runnable for post action. - */ - protected void doProcessWatermark(Watermark mark, @Nullable Runnable postAction) - throws Exception { - // todo: make async operator deal with interruptible watermark - if (timeServiceManager != null) { - CompletableFuture future = timeServiceManager.advanceWatermark(mark); - future.thenAccept(v -> emitWatermark(mark, postAction)); - asyncExecutionController.drainWithTimerIfNeeded(future); - } else { - emitWatermark(mark, postAction); - } - } - - private void emitWatermark(Watermark mark, @Nullable Runnable postAction) { - output.emitWatermark(mark); - if (postAction != null) { - postAction.run(); - } + asyncExecutionController.processNonRecord( + timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark), + () -> output.emitWatermark(mark)); } @Override @@ -301,16 +278,23 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId) super.processWatermarkStatus(watermarkStatus, inputId); return; } + final AtomicBoolean wasIdle = new AtomicBoolean(false); + final AtomicReference watermarkRef = new AtomicReference<>(null); asyncExecutionController.processNonRecord( () -> { - boolean wasIdle = combinedWatermark.isIdle(); + wasIdle.set(combinedWatermark.isIdle()); if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) { - doProcessWatermark( - new Watermark(combinedWatermark.getCombinedWatermark()), - wasIdle == combinedWatermark.isIdle() - ? null - : () -> output.emitWatermarkStatus(watermarkStatus)); - } else if (wasIdle != combinedWatermark.isIdle()) { + watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark())); + if (timeServiceManager != null) { + timeServiceManager.advanceWatermark(watermarkRef.get()); + } + } + }, + () -> { + if (watermarkRef.get() != null) { + output.emitWatermark(watermarkRef.get()); + } + if (wasIdle.get() != combinedWatermark.isIdle()) { output.emitWatermarkStatus(watermarkStatus); } }); @@ -324,7 +308,7 @@ public void processRecordAttributes(RecordAttributes recordAttributes, int input return; } asyncExecutionController.processNonRecord( - () -> super.processRecordAttributes(recordAttributes, inputId)); + null, () -> super.processRecordAttributes(recordAttributes, inputId)); } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 5ae07a858dd9c..782bbe7856b9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -31,7 +31,6 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; import java.io.Serializable; -import java.util.concurrent.CompletableFuture; /** * An entity keeping all the time-related services. @@ -85,7 +84,7 @@ InternalTimerService getAsyncInternalTimerService( * Advances the Watermark of all managed {@link InternalTimerService timer services}, * potentially firing event time timers. */ - CompletableFuture advanceWatermark(Watermark watermark) throws Exception; + void advanceWatermark(Watermark watermark) throws Exception; /** * Try to {@link #advanceWatermark(Watermark)}, but if {@link ShouldStopAdvancingFn} returns diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java index 319d29641a199..3b5f81697f81e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java @@ -44,7 +44,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -239,14 +238,10 @@ KeyGroupedInternalPriorityQueue> createTimerPriorit } @Override - public CompletableFuture advanceWatermark(Watermark watermark) throws Exception { - CompletableFuture[] futures = new CompletableFuture[timerServices.size()]; - int index = 0; + public void advanceWatermark(Watermark watermark) throws Exception { for (InternalTimerServiceImpl service : timerServices.values()) { - futures[index] = service.advanceWatermark(watermark.getTimestamp()); - index++; + service.advanceWatermark(watermark.getTimestamp()); } - return CompletableFuture.allOf(futures); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java index 4db3658365eb3..3ccfa8f2eda8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java @@ -30,10 +30,6 @@ import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.ThrowingRunnable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - /** * An implementation of {@link InternalTimerService} that is used by {@link * org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}. @@ -102,23 +98,18 @@ void onProcessingTime(long time) throws Exception { * @param time the time in watermark. */ @Override - public CompletableFuture advanceWatermark(long time) throws Exception { + public void advanceWatermark(long time) throws Exception { currentWatermark = time; - List> futures = new ArrayList<>(); InternalTimer timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time && !cancellationContext.isCancelled()) { eventTimeTimersQueue.poll(); final InternalTimer timerToTrigger = timer; - CompletableFuture future = new CompletableFuture<>(); maintainContextAndProcess( - timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger)) - .thenAccept(v -> future.complete(null)); - futures.add(future); + timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger)); taskIOMetricGroup.getNumFiredTimers().inc(); } - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index bdbba32f5540e..269ba493b1802 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -36,7 +36,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import static org.apache.flink.util.Preconditions.checkArgument; @@ -307,7 +306,7 @@ void onProcessingTime(long time) throws Exception { } } - public CompletableFuture advanceWatermark(long time) throws Exception { + public void advanceWatermark(long time) throws Exception { Preconditions.checkState( tryAdvanceWatermark( time, @@ -315,7 +314,6 @@ public CompletableFuture advanceWatermark(long time) throws Exception { // Never stop advancing. return false; })); - return CompletableFuture.completedFuture(null); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java index 563fd62409be5..57f0b3f6f0552 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java @@ -37,7 +37,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -87,11 +86,10 @@ public InternalTimerService getAsyncInternalTimerService( } @Override - public CompletableFuture advanceWatermark(Watermark watermark) { + public void advanceWatermark(Watermark watermark) { if (watermark.getTimestamp() == Long.MAX_VALUE) { keySelected(null); } - return CompletableFuture.completedFuture(null); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index ae0c96cb3d95a..c52432e53b6fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -691,7 +691,7 @@ void testEpochManager() throws Exception { assertThat(epoch1).isEqualTo(epoch2); assertThat(epoch1.ongoingRecordCount).isEqualTo(2); - aec.processNonRecord(() -> output.incrementAndGet()); + aec.processNonRecord(null, () -> output.incrementAndGet()); assertThat(output.get()).isEqualTo(3); // SERIAL_BETWEEN_EPOCH mode would drain in-flight records on non-record arriving. @@ -724,7 +724,7 @@ void testMixEpochMode() throws Exception { userCode.run(); aec.epochManager.onNonRecord( - () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); + null, () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); assertThat(epoch1.ongoingRecordCount).isEqualTo(1); String record2 = "key2-r2"; @@ -736,7 +736,7 @@ void testMixEpochMode() throws Exception { assertThat(epoch1.ongoingRecordCount).isEqualTo(1); assertThat(epoch2.ongoingRecordCount).isEqualTo(1); aec.epochManager.onNonRecord( - () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); + null, () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); assertThat(epoch1.ongoingRecordCount).isEqualTo(1); assertThat(epoch2.ongoingRecordCount).isEqualTo(1); assertThat(output.get()).isEqualTo(0); @@ -751,7 +751,7 @@ void testMixEpochMode() throws Exception { assertThat(epoch2.ongoingRecordCount).isEqualTo(1); assertThat(epoch3.ongoingRecordCount).isEqualTo(1); aec.epochManager.onNonRecord( - () -> output.incrementAndGet(), ParallelMode.SERIAL_BETWEEN_EPOCH); + null, () -> output.incrementAndGet(), ParallelMode.SERIAL_BETWEEN_EPOCH); assertThat(epoch1.ongoingRecordCount).isEqualTo(0); assertThat(epoch2.ongoingRecordCount).isEqualTo(0); assertThat(epoch3.ongoingRecordCount).isEqualTo(0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java index df69d780d4181..b6e5f957f0dfd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java @@ -39,7 +39,7 @@ void testBasic() { assertThat(epoch1.ongoingRecordCount).isEqualTo(2); AtomicInteger output = new AtomicInteger(0); epochManager.onNonRecord( - () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); + null, () -> output.incrementAndGet(), ParallelMode.PARALLEL_BETWEEN_EPOCH); // record3 is in a new epoch Epoch epoch3 = epochManager.onRecord(); assertThat(epoch3).isNotEqualTo(epoch1); @@ -60,6 +60,7 @@ void testBasic() { // Test if in the action there is a record processing. Should not be any error. epochManager.onNonRecord( + null, () -> { output.incrementAndGet(); Epoch epoch4 = epochManager.onRecord(); @@ -68,4 +69,27 @@ void testBasic() { ParallelMode.PARALLEL_BETWEEN_EPOCH); assertThat(output.get()).isEqualTo(2); } + + @Test + void testTwoAction() { + EpochManager epochManager = new EpochManager(null); + Epoch epoch1 = epochManager.onRecord(); + Epoch epoch2 = epochManager.onRecord(); + assertThat(epoch1).isEqualTo(epoch2); + assertThat(epoch1.ongoingRecordCount).isEqualTo(2); + AtomicInteger output = new AtomicInteger(0); + epochManager.onNonRecord( + () -> epochManager.onEpoch(epoch1), + () -> output.incrementAndGet(), + ParallelMode.PARALLEL_BETWEEN_EPOCH); + assertThat(epoch1.status).isEqualTo(EpochStatus.CLOSED); + assertThat(output.get()).isEqualTo(0); + epochManager.completeOneRecord(epoch1); + epochManager.completeOneRecord(epoch2); + assertThat(epoch1.status).isEqualTo(EpochStatus.FINISHING); + assertThat(output.get()).isEqualTo(0); + epochManager.completeOneRecord(epoch1); + assertThat(epoch1.status).isEqualTo(EpochStatus.FINISHED); + assertThat(output.get()).isEqualTo(1); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java index 0dfd19ea0fe89..496d46e3f3499 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java @@ -56,6 +56,23 @@ /** Basic tests for {@link AbstractAsyncStateStreamOperator}. */ public class AbstractAsyncStateStreamOperatorTest { + protected AsyncKeyedOneInputStreamOperatorTestHarness, String> + createTestHarness( + int maxParalelism, int numSubtasks, int subtaskIndex, TestOperator testOperator) + throws Exception { + AsyncKeyedOneInputStreamOperatorTestHarness, String> + testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( + testOperator, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + maxParalelism, + numSubtasks, + subtaskIndex); + testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); + return testHarness; + } + protected AsyncKeyedOneInputStreamOperatorTestHarness, String> createTestHarness( int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) @@ -137,7 +154,6 @@ void testRecordProcessorWithRecordOrder() throws Exception { @Test void testAsyncProcessWithKey() throws Exception { - TestOperatorWithDirectAsyncProcess testOperator = new TestOperatorWithDirectAsyncProcess(ElementOrder.RECORD_ORDER); AsyncKeyedOneInputStreamOperatorTestHarness, String> @@ -243,6 +259,29 @@ void testNonRecordProcess() throws Exception { } } + @Test + void testWatermark() throws Exception { + TestOperatorWithAsyncProcessTimer testOperator = + new TestOperatorWithAsyncProcessTimer(ElementOrder.RECORD_ORDER); + try (AsyncKeyedOneInputStreamOperatorTestHarness, String> + testHarness = createTestHarness(128, 1, 0, testOperator)) { + testHarness.open(); + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "1"))); + expectedOutput.add(new StreamRecord<>("EventTimer-1-1")); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "3"))); + expectedOutput.add(new StreamRecord<>("EventTimer-1-3")); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "6"))); + expectedOutput.add(new StreamRecord<>("EventTimer-1-6")); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "9"))); + expectedOutput.add(new StreamRecord<>("EventTimer-1-9")); + testHarness.processWatermark(10L); + expectedOutput.add(new Watermark(10L)); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + } + } + @Test void testWatermarkStatus() throws Exception { try (AsyncKeyedOneInputStreamOperatorTestHarness, String> @@ -417,22 +456,42 @@ private static class TestOperatorWithDirectAsyncProcess extends TestOperator { @Override public void processElement(StreamRecord> element) throws Exception { - System.out.println("processElement " + Thread.currentThread().getName()); asyncProcessWithKey( element.getValue().f0, () -> { - System.out.println( - "asyncProcessWithKey " + Thread.currentThread().getName()); processed.incrementAndGet(); }); synchronized (objectToWait) { objectToWait.wait(); } - System.out.println("processElement " + Thread.currentThread().getName()); processed.incrementAndGet(); } } + private static class TestOperatorWithAsyncProcessTimer extends TestOperator { + + TestOperatorWithAsyncProcessTimer(ElementOrder elementOrder) { + super(elementOrder); + } + + @Override + public void processElement(StreamRecord> element) throws Exception { + processed.incrementAndGet(); + timerService.registerEventTimeTimer( + VoidNamespace.INSTANCE, Long.parseLong(element.getValue().f1)); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + asyncProcessWithKey(timer.getKey(), () -> super.onEventTime(timer)); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + asyncProcessWithKey(timer.getKey(), () -> super.onProcessingTime(timer)); + } + } + private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOperator implements TwoInputStreamOperator, Triggerable { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java index 56801a63b2333..a876b8cfacf31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java @@ -39,9 +39,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; - import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link InternalTimerServiceAsyncImpl}. */ @@ -187,11 +184,8 @@ void testSameKeyEventTimerFireOrder() throws Exception { service.registerEventTimeTimer("event-timer-2", 2L); service.registerEventTimeTimer("event-timer-3", 3L); assertThat(testTriggerable.eventTriggerCount).isEqualTo(1); - CompletableFuture future = service.advanceWatermark(3L); - AtomicBoolean done = new AtomicBoolean(false); + service.advanceWatermark(3L); assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0); - future.thenAccept((v) -> done.set(true)).get(); - assertThat(done.get()).isTrue(); assertThat(testTriggerable.eventTriggerCount).isEqualTo(3); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java index be73282d878fb..87f19f4032d2c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java @@ -36,7 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; @@ -108,7 +107,7 @@ public InternalTimerService getAsyncInternalTimerService( } @Override - public CompletableFuture advanceWatermark(Watermark watermark) throws Exception { + public void advanceWatermark(Watermark watermark) throws Exception { throw new UnsupportedOperationException(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java index 11cb749d50405..0a9521dcb5af5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java @@ -240,6 +240,40 @@ void testNonRecordProcess() throws Exception { } } + @Test + void testWatermark() throws Exception { + KeyedOneInputStreamOperatorV2TestHarness, String> + testHarness = + KeyedOneInputStreamOperatorV2TestHarness.create( + new TestWithAsyncProcessTimerOperatorFactory( + ElementOrder.RECORD_ORDER), + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + 128, + 1, + 0); + testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); + + try { + testHarness.open(); + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "1"))); + expectedOutput.add(new StreamRecord<>("EventTimer-1-1")); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "3"))); + expectedOutput.add(new StreamRecord<>("EventTimer-1-3")); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "6"))); + expectedOutput.add(new StreamRecord<>("EventTimer-1-6")); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "9"))); + expectedOutput.add(new StreamRecord<>("EventTimer-1-9")); + testHarness.processWatermark(10L); + expectedOutput.add(new Watermark(10L)); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + } finally { + testHarness.close(); + } + } + @Test void testWatermarkStatus() throws Exception { try (KeyedOneInputStreamOperatorV2TestHarness, String> @@ -611,6 +645,56 @@ public void processElement(StreamRecord> element) } } + private static class TestWithAsyncProcessTimerOperatorFactory + extends AbstractStreamOperatorFactory { + + private final ElementOrder elementOrder; + + TestWithAsyncProcessTimerOperatorFactory(ElementOrder elementOrder) { + this.elementOrder = elementOrder; + } + + @Override + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) new SingleInputTestOperatorWithAsyncProcessTimer(parameters, elementOrder); + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return SingleInputTestOperatorWithAsyncProcessTimer.class; + } + } + + private static class SingleInputTestOperatorWithAsyncProcessTimer + extends SingleInputTestOperator { + + SingleInputTestOperatorWithAsyncProcessTimer( + StreamOperatorParameters parameters, ElementOrder elementOrder) { + super(parameters, elementOrder); + input = + new AbstractInput, String>(this, 1) { + @Override + public void processElement(StreamRecord> element) + throws Exception { + processed.incrementAndGet(); + timerService.registerEventTimeTimer( + VoidNamespace.INSTANCE, Long.parseLong(element.getValue().f1)); + } + }; + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + asyncProcessWithKey(timer.getKey(), () -> super.onEventTime(timer)); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + asyncProcessWithKey(timer.getKey(), () -> super.onProcessingTime(timer)); + } + } + private static class WatermarkTestingOperatorFactory extends AbstractStreamOperatorFactory { @Override