Skip to content

Commit

Permalink
[FLINK-35411][Runtime] Introduce periodic timeout check for state req…
Browse files Browse the repository at this point in the history
…uests buffer
  • Loading branch information
Zakelly committed Sep 23, 2024
1 parent 9292c64 commit 3dbebac
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -51,10 +53,12 @@
*
* @param <K> the type of the key
*/
public class AsyncExecutionController<K> implements StateRequestHandler {
public class AsyncExecutionController<K> implements StateRequestHandler, Closeable {

private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);

private static final long DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL = 100;

/**
* The batch size. When the number of state requests in the active buffer exceeds the batch
* size, a batched state execution would be triggered.
Expand All @@ -68,6 +72,14 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
*/
private final long bufferTimeout;

/**
* There might be huge overhead when inserting a timer for each buffer. A periodic check is a
* good trade-off to save much GC and CPU for this. This var defines the interval for periodic
* check of timeout. As a result, the real trigger time of timeout buffer might be [timeout,
* timeout+interval]. We don't make it configurable for now.
*/
private final long bufferTimeoutCheckInterval = DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL;

/** The max allowed number of in-flight records. */
private final int maxInFlightRecordNum;

Expand Down Expand Up @@ -142,6 +154,7 @@ public AsyncExecutionController(
this.stateRequestsBuffer =
new StateRequestBuffer<>(
bufferTimeout,
bufferTimeoutCheckInterval,
(scheduledSeq) ->
mailboxExecutor.execute(
() -> {
Expand Down Expand Up @@ -387,6 +400,11 @@ public int getInFlightRecordNum() {
return inFlightRecordNum.get();
}

@Override
public void close() throws IOException {
stateRequestsBuffer.close();
}

/** A listener listens the key context switch. */
public interface SwitchContextListener<K> {
void switchContext(RecordContext<K> context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.io.Closeable;
import java.io.IOException;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -43,7 +46,7 @@
* @param <K> the type of the key
*/
@NotThreadSafe
public class StateRequestBuffer<K> {
public class StateRequestBuffer<K> implements Closeable {

/** All StateRequestBuffer in the same task manager share one ScheduledExecutorService. */
private static final ScheduledThreadPoolExecutor DELAYER =
Expand Down Expand Up @@ -75,44 +78,68 @@ public class StateRequestBuffer<K> {
/** The timeout of {@link #activeQueue} triggering in milliseconds. */
final long bufferTimeout;

/** The interval of periodic buffer timeout check. */
final long bufferTimeoutCheckInterval;

/** The handler to trigger when timeout. */
final Consumer<Long> timeoutHandler;

/** The executor service that schedules and calls the triggers of this task. */
ScheduledExecutorService scheduledExecutor;
final ScheduledExecutorService scheduledExecutor;

/**
* The current scheduled future, when the next scheduling occurs, the previous one that has not
* yet been executed will be canceled.
*/
ScheduledFuture<Void> currentScheduledFuture;
ScheduledFuture<?> currentScheduledFuture;

/**
* The current scheduled trigger sequence number, a timeout trigger is scheduled only if {@code
* scheduledSeq} is less than {@code currentSeq}.
*/
AtomicLong scheduledSeq;
volatile Tuple2<Long, Long> seqAndTimeout = null;

/**
* The current trigger sequence number, used to distinguish different triggers. Every time a
* trigger occurs, {@code currentSeq} increases by 1.
*/
AtomicLong currentSeq;
final AtomicLong currentSeq;

public StateRequestBuffer(long bufferTimeout, Consumer<Long> timeoutHandler) {
public StateRequestBuffer(
long bufferTimeout, long bufferTimeoutCheckInterval, Consumer<Long> timeoutHandler) {
this.activeQueue = new LinkedList<>();
this.blockingQueue = new HashMap<>();
this.blockingQueueSize = 0;
this.bufferTimeout = bufferTimeout;
this.bufferTimeoutCheckInterval = bufferTimeoutCheckInterval;
this.timeoutHandler = timeoutHandler;
this.scheduledSeq = new AtomicLong(-1);
this.currentSeq = new AtomicLong(0);
if (bufferTimeout > 0) {
this.scheduledExecutor = DELAYER;
initPeriodicTimeoutCheck();
} else {
this.scheduledExecutor = null;
}
}

private void initPeriodicTimeoutCheck() {
currentScheduledFuture =
scheduledExecutor.scheduleAtFixedRate(
() -> {
final Tuple2<Long, Long> theSeqAndTimeout = seqAndTimeout;
if (theSeqAndTimeout != null
&& theSeqAndTimeout.f0 == currentSeq.get()
&& theSeqAndTimeout.f1 <= System.currentTimeMillis()) {
timeoutHandler.accept(theSeqAndTimeout.f0);
}
},
bufferTimeout,
bufferTimeoutCheckInterval,
TimeUnit.MILLISECONDS);
}

void advanceSeq() {
seqAndTimeout = null;
currentSeq.incrementAndGet();
}

Expand All @@ -125,24 +152,9 @@ void enqueueToActive(StateRequest<K, ?, ?, ?> request) {
request.getFuture().complete(null);
} else {
activeQueue.add(request);
if (bufferTimeout > 0 && currentSeq.get() > scheduledSeq.get()) {
if (currentScheduledFuture != null
&& !currentScheduledFuture.isDone()
&& !currentScheduledFuture.isCancelled()) {
currentScheduledFuture.cancel(false);
}
final long thisScheduledSeq = currentSeq.get();
scheduledSeq.set(thisScheduledSeq);
currentScheduledFuture =
(ScheduledFuture<Void>)
scheduledExecutor.schedule(
() -> {
if (thisScheduledSeq == currentSeq.get()) {
timeoutHandler.accept(thisScheduledSeq);
}
},
bufferTimeout,
TimeUnit.MILLISECONDS);
if (bufferTimeout > 0 && seqAndTimeout == null) {
seqAndTimeout =
Tuple2.of(currentSeq.get(), System.currentTimeMillis() + bufferTimeout);
}
}
}
Expand Down Expand Up @@ -213,4 +225,12 @@ Optional<StateRequestContainer> popActive(
}
return Optional.of(stateRequestContainer);
}

@Override
public synchronized void close() throws IOException {
if (currentScheduledFuture != null) {
currentScheduledFuture.cancel(true);
currentScheduledFuture = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,4 +316,10 @@ AsyncExecutionController<?> getAsyncExecutionController() {
RecordContext getCurrentProcessingContext() {
return currentProcessingContext;
}

@Override
public void close() throws Exception {
super.close();
asyncExecutionController.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,10 @@ public AsyncExecutionController<?> getAsyncExecutionController() {
public RecordContext getCurrentProcessingContext() {
return currentProcessingContext;
}

@Override
public void close() throws Exception {
super.close();
asyncExecutionController.close();
}
}
Loading

0 comments on commit 3dbebac

Please sign in to comment.