Skip to content

Commit

Permalink
[FLINK-36875][runtime] Complete timer before emiting watermark (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia authored Dec 12, 2024
1 parent cb5fc57 commit 43c50b9
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
Expand All @@ -38,6 +39,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -367,8 +369,9 @@ private void seizeCapacity() {
*
* @param callback the callback to run if it finishes (once the record is not blocked).
*/
public void syncPointRequestWithCallback(ThrowingRunnable<Exception> callback) {
handleRequest(null, StateRequestType.SYNC_POINT, null).thenAccept(v -> callback.run());
public StateFuture<Void> syncPointRequestWithCallback(ThrowingRunnable<Exception> callback) {
return handleRequest(null, StateRequestType.SYNC_POINT, null)
.thenAccept(v -> callback.run());
}

/**
Expand All @@ -395,6 +398,14 @@ public void drainInflightRecords(int targetNum) {
}
}

/** A helper function to drain in-flight requests emitted by timer. */
public void drainWithTimerIfNeeded(CompletableFuture<Void> timerFuture) {
if (epochParallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
drainInflightRecords(0);
Preconditions.checkState(timerFuture.isDone());
}
}

/** Wait for new mails if there is no more mail. */
private void waitForNewMails() throws InterruptedException {
if (!callbackRunner.isHasMail()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@

import javax.annotation.Nonnull;

import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -330,7 +332,17 @@ public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
return;
}
asyncExecutionController.processNonRecord(() -> super.processWatermark(mark));
asyncExecutionController.processNonRecord(
() -> {
// todo: make async operator deal with interruptible watermark
if (timeServiceManager != null) {
CompletableFuture<Void> future = timeServiceManager.advanceWatermark(mark);
future.thenAccept(v -> output.emitWatermark(mark));
asyncExecutionController.drainWithTimerIfNeeded(future);
} else {
output.emitWatermark(mark);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@

import javax.annotation.Nonnull;

import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkState;

/**
Expand Down Expand Up @@ -257,7 +259,17 @@ public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
return;
}
asyncExecutionController.processNonRecord(() -> super.processWatermark(mark));
asyncExecutionController.processNonRecord(
() -> {
// todo: make async operator deal with interruptible watermark
if (timeServiceManager != null) {
CompletableFuture<Void> future = timeServiceManager.advanceWatermark(mark);
future.thenAccept(v -> output.emitWatermark(mark));
asyncExecutionController.drainWithTimerIfNeeded(future);
} else {
output.emitWatermark(mark);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;

/**
* An entity keeping all the time-related services.
Expand Down Expand Up @@ -84,7 +85,7 @@ <N> InternalTimerService<N> getAsyncInternalTimerService(
* Advances the Watermark of all managed {@link InternalTimerService timer services},
* potentially firing event time timers.
*/
void advanceWatermark(Watermark watermark) throws Exception;
CompletableFuture<Void> advanceWatermark(Watermark watermark) throws Exception;

/**
* Try to {@link #advanceWatermark(Watermark)}, but if {@link ShouldStopAdvancingFn} returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -238,10 +239,14 @@ KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorit
}

@Override
public void advanceWatermark(Watermark watermark) throws Exception {
public CompletableFuture<Void> advanceWatermark(Watermark watermark) throws Exception {
CompletableFuture<?>[] futures = new CompletableFuture[timerServices.size()];
int index = 0;
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
futures[index] = service.advanceWatermark(watermark.getTimestamp());
index++;
}
return CompletableFuture.allOf(futures);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
Expand All @@ -29,6 +30,10 @@
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* An implementation of {@link InternalTimerService} that is used by {@link
* org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
Expand Down Expand Up @@ -97,20 +102,23 @@ void onProcessingTime(long time) throws Exception {
* @param time the time in watermark.
*/
@Override
public void advanceWatermark(long time) throws Exception {
public CompletableFuture<Void> advanceWatermark(long time) throws Exception {
currentWatermark = time;

List<CompletableFuture<Void>> futures = new ArrayList<>();
InternalTimer<K, N> timer;

while ((timer = eventTimeTimersQueue.peek()) != null
&& timer.getTimestamp() <= time
&& !cancellationContext.isCancelled()) {
eventTimeTimersQueue.poll();
final InternalTimer<K, N> timerToTrigger = timer;
CompletableFuture<Void> future = new CompletableFuture<>();
maintainContextAndProcess(
timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger));
timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger))
.thenAccept(v -> future.complete(null));
futures.add(future);
taskIOMetricGroup.getNumFiredTimers().inc();
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

/**
Expand All @@ -125,13 +133,14 @@ protected void foreachTimer(
"Batch operation is not supported when using async state.");
}

private void maintainContextAndProcess(
private StateFuture<Void> maintainContextAndProcess(
InternalTimer<K, N> timer, ThrowingRunnable<Exception> runnable) {
RecordContext<K> recordCtx = asyncExecutionController.buildContext(null, timer.getKey());
RecordContext<K> recordCtx = asyncExecutionController.buildContext(timer, timer.getKey());
recordCtx.retain();
asyncExecutionController.setCurrentContext(recordCtx);
keyContext.setCurrentKey(timer.getKey());
asyncExecutionController.syncPointRequestWithCallback(runnable);
StateFuture<Void> future = asyncExecutionController.syncPointRequestWithCallback(runnable);
recordCtx.release();
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -306,14 +307,15 @@ void onProcessingTime(long time) throws Exception {
}
}

public void advanceWatermark(long time) throws Exception {
public CompletableFuture<Void> advanceWatermark(long time) throws Exception {
Preconditions.checkState(
tryAdvanceWatermark(
time,
() -> {
// Never stop advancing.
return false;
}));
return CompletableFuture.completedFuture(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -86,10 +87,11 @@ public <N> InternalTimerService<N> getAsyncInternalTimerService(
}

@Override
public void advanceWatermark(Watermark watermark) {
public CompletableFuture<Void> advanceWatermark(Watermark watermark) {
if (watermark.getTimestamp() == Long.MAX_VALUE) {
keySelected(null);
}
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link InternalTimerServiceAsyncImpl}. */
Expand Down Expand Up @@ -166,6 +169,32 @@ void testEventTimerFireOrder() throws Exception {
assertThat(testTriggerable.eventTriggerCount).isEqualTo(2);
}

@Test
void testSameKeyEventTimerFireOrder() throws Exception {
keyContext.setCurrentKey("key-1");
service.registerEventTimeTimer("event-timer-1", 1L);

SameTimerTriggerable testTriggerable = new SameTimerTriggerable(asyncExecutionController);
service.startTimerService(
IntSerializer.INSTANCE, StringSerializer.INSTANCE, testTriggerable);
assertThat(testTriggerable.eventTriggerCount).isEqualTo(0);
// the event timer should be triggered at watermark 1
service.advanceWatermark(1L);
assertThat(testTriggerable.eventTriggerCount).isEqualTo(1);
assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);

keyContext.setCurrentKey("key-1");
service.registerEventTimeTimer("event-timer-2", 2L);
service.registerEventTimeTimer("event-timer-3", 3L);
assertThat(testTriggerable.eventTriggerCount).isEqualTo(1);
CompletableFuture<Void> future = service.advanceWatermark(3L);
AtomicBoolean done = new AtomicBoolean(false);
assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
future.thenAccept((v) -> done.set(true)).get();
assertThat(done.get()).isTrue();
assertThat(testTriggerable.eventTriggerCount).isEqualTo(3);
}

private static <K, N> InternalTimerServiceAsyncImpl<K, N> createInternalTimerService(
TaskIOMetricGroup taskIOMetricGroup,
KeyGroupRange keyGroupsList,
Expand All @@ -190,6 +219,30 @@ private static <K, N> InternalTimerServiceAsyncImpl<K, N> createInternalTimerSer
asyncExecutionController);
}

private static class SameTimerTriggerable implements Triggerable<Integer, String> {

private AsyncExecutionController aec;

private static int eventTriggerCount = 0;

public SameTimerTriggerable(AsyncExecutionController aec) {
this.aec = aec;
}

@Override
public void onEventTime(InternalTimer<Integer, String> timer) throws Exception {
RecordContext<Integer> recordContext = aec.buildContext("record", "key");
aec.setCurrentContext(recordContext);
aec.handleRequestSync(null, StateRequestType.SYNC_POINT, null);
eventTriggerCount++;
}

@Override
public void onProcessingTime(InternalTimer<Integer, String> timer) throws Exception {
// skip
}
}

private static class TestTriggerable implements Triggerable<Integer, String> {

private static int eventTriggerCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -107,7 +108,7 @@ public <N> InternalTimerService<N> getAsyncInternalTimerService(
}

@Override
public void advanceWatermark(Watermark watermark) throws Exception {
public CompletableFuture<Void> advanceWatermark(Watermark watermark) throws Exception {
throw new UnsupportedOperationException();
}

Expand Down

0 comments on commit 43c50b9

Please sign in to comment.