Skip to content

Commit

Permalink
[FLINK-35411] Use wait/notify instead of frequent sleep in AsyncExecu…
Browse files Browse the repository at this point in the history
…tionController
  • Loading branch information
Zakelly committed Sep 20, 2024
1 parent 6b37bd9 commit 0784bd4
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab
*/
private final int batchSize;

/** The runner for callbacks. Basically a wrapper of mailbox executor. */
private final BatchCallbackRunner callbackRunner;

/**
* The timeout of {@link StateRequestBuffer#activeQueue} triggering in milliseconds. If the
* activeQueue has not reached the {@link #batchSize} within 'buffer-timeout' milliseconds, a
Expand Down Expand Up @@ -129,6 +132,12 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab
*/
final ParallelMode epochParallelMode = ParallelMode.SERIAL_BETWEEN_EPOCH;

/** A guard for waiting new mail. */
private final Object notifyLock = new Object();

/** Flag indicating if this AEC is under waiting status. */
private volatile boolean waitingMail = false;

public AsyncExecutionController(
MailboxExecutor mailboxExecutor,
AsyncFrameworkExceptionHandler exceptionHandler,
Expand All @@ -140,7 +149,9 @@ public AsyncExecutionController(
this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
this.mailboxExecutor = mailboxExecutor;
this.exceptionHandler = exceptionHandler;
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler);
this.callbackRunner = new BatchCallbackRunner(mailboxExecutor, this::notifyNewMail);
this.stateFutureFactory = new StateFutureFactory<>(this, callbackRunner, exceptionHandler);

this.stateExecutor = stateExecutor;
this.batchSize = batchSize;
this.bufferTimeout = bufferTimeout;
Expand Down Expand Up @@ -216,9 +227,7 @@ void disposeContext(RecordContext<K> toDispose) {
RecordContext<K> nextRecordCtx =
stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey());
if (nextRecordCtx != null) {
Preconditions.checkState(
tryOccupyKey(nextRecordCtx),
String.format("key(%s) is already occupied.", nextRecordCtx.getKey()));
Preconditions.checkState(tryOccupyKey(nextRecordCtx));
}
}

Expand Down Expand Up @@ -361,7 +370,7 @@ public void drainInflightRecords(int targetNum) {
if (targetNum == 0 || stateExecutor.ongoingRequests() == 0L) {
triggerIfNeeded(true);
}
Thread.sleep(1);
waitForNewMails();
}
}
} catch (InterruptedException ignored) {
Expand All @@ -370,6 +379,30 @@ public void drainInflightRecords(int targetNum) {
}
}

/** Wait for new mails if there is no more mail. */
private void waitForNewMails() throws InterruptedException {
if (!callbackRunner.isHasMail()) {
synchronized (notifyLock) {
if (!callbackRunner.isHasMail()) {
waitingMail = true;
notifyLock.wait(5);
waitingMail = false;
}
}
}
}

/** Notify this AEC there is a mail, wake up from waiting. */
private void notifyNewMail() {
if (waitingMail) {
synchronized (notifyLock) {
if (waitingMail) {
notifyLock.notify();
}
}
}
}

public void processNonRecord(ThrowingRunnable<? extends Exception> action) {
Runnable wrappedAction =
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,19 @@
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.util.function.ThrowingRunnable;

import javax.annotation.concurrent.GuardedBy;
import net.jcip.annotations.GuardedBy;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A runner for {@link StateFutureFactory} to build {@link
* org.apache.flink.core.state.InternalStateFuture} that put one mail in {@link MailboxExecutor}
* whenever there are callbacks to run but run multiple callbacks within one mail.
* A {@link org.apache.flink.core.state.StateFutureImpl.CallbackRunner} that put one mail in {@link
* MailboxExecutor} but run multiple callbacks within one mail.
*/
public class BatchCallbackRunner {

private static final int DEFAULT_BATCH_SIZE = 100;
private static final int DEFAULT_BATCH_SIZE = 3000;

private final MailboxExecutor mailboxExecutor;

Expand All @@ -57,8 +56,12 @@ public class BatchCallbackRunner {
/** Whether there is a mail in mailbox. */
private volatile boolean hasMail = false;

BatchCallbackRunner(MailboxExecutor mailboxExecutor) {
/** The logic to notify new mails to AEC. */
private final Runnable newMailNotify;

BatchCallbackRunner(MailboxExecutor mailboxExecutor, Runnable newMailNotify) {
this.mailboxExecutor = mailboxExecutor;
this.newMailNotify = newMailNotify;
this.batchSize = DEFAULT_BATCH_SIZE;
this.callbackQueue = new ConcurrentLinkedDeque<>();
this.activeBuffer = new ArrayList<>();
Expand Down Expand Up @@ -86,6 +89,7 @@ private void insertMail(boolean force) {
if (currentCallbacks.get() > 0) {
hasMail = true;
mailboxExecutor.execute(this::runBatch, "Batch running callback of state requests");
notifyNewMail();
} else {
hasMail = false;
}
Expand Down Expand Up @@ -114,4 +118,14 @@ public void runBatch() throws Exception {
}
insertMail(true);
}

private void notifyNewMail() {
if (newMailNotify != null) {
newMailNotify.run();
}
}

public boolean isHasMail() {
return hasMail;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;

Expand All @@ -34,10 +33,10 @@ public class StateFutureFactory<K> {

StateFutureFactory(
AsyncExecutionController<K> asyncExecutionController,
MailboxExecutor mailboxExecutor,
BatchCallbackRunner callbackRunner,
AsyncFrameworkExceptionHandler exceptionHandler) {
this.asyncExecutionController = asyncExecutionController;
this.callbackRunner = new BatchCallbackRunner(mailboxExecutor);
this.callbackRunner = callbackRunner;
this.exceptionHandler = exceptionHandler;
}

Expand Down

0 comments on commit 0784bd4

Please sign in to comment.