diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index f69b4d3b72c10f..c2067b874d6754 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -65,6 +65,9 @@ public class AsyncExecutionController implements StateRequestHandler, Closeab */ private final int batchSize; + /** The runner for callbacks. Basically a wrapper of mailbox executor. */ + private final BatchCallbackRunner callbackRunner; + /** * The timeout of {@link StateRequestBuffer#activeQueue} triggering in milliseconds. If the * activeQueue has not reached the {@link #batchSize} within 'buffer-timeout' milliseconds, a @@ -132,6 +135,12 @@ public class AsyncExecutionController implements StateRequestHandler, Closeab */ final ParallelMode epochParallelMode = ParallelMode.SERIAL_BETWEEN_EPOCH; + /** A guard for waiting new mail. */ + private final Object notifyLock = new Object(); + + /** Flag indicating if this AEC is under waiting status. */ + private volatile boolean waitingMail = false; + public AsyncExecutionController( MailboxExecutor mailboxExecutor, AsyncFrameworkExceptionHandler exceptionHandler, @@ -144,7 +153,9 @@ public AsyncExecutionController( this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.exceptionHandler = exceptionHandler; - this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler); + this.callbackRunner = new BatchCallbackRunner(mailboxExecutor, this::notifyNewMail); + this.stateFutureFactory = new StateFutureFactory<>(this, callbackRunner, exceptionHandler); + this.stateExecutor = stateExecutor; this.batchSize = batchSize; this.bufferTimeout = bufferTimeout; @@ -224,9 +235,7 @@ void disposeContext(RecordContext toDispose) { RecordContext nextRecordCtx = stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey()); if (nextRecordCtx != null) { - Preconditions.checkState( - tryOccupyKey(nextRecordCtx), - String.format("key(%s) is already occupied.", nextRecordCtx.getKey())); + Preconditions.checkState(tryOccupyKey(nextRecordCtx)); } } @@ -369,7 +378,7 @@ public void drainInflightRecords(int targetNum) { if (targetNum == 0 || !stateExecutor.fullyLoaded()) { triggerIfNeeded(true); } - Thread.sleep(1); + waitForNewMails(); } } } catch (InterruptedException ignored) { @@ -378,6 +387,30 @@ public void drainInflightRecords(int targetNum) { } } + /** Wait for new mails if there is no more mail. */ + private void waitForNewMails() throws InterruptedException { + if (!callbackRunner.isHasMail()) { + synchronized (notifyLock) { + if (!callbackRunner.isHasMail()) { + waitingMail = true; + notifyLock.wait(5); + waitingMail = false; + } + } + } + } + + /** Notify this AEC there is a mail, wake up from waiting. */ + private void notifyNewMail() { + if (waitingMail) { + synchronized (notifyLock) { + if (waitingMail) { + notifyLock.notify(); + } + } + } + } + public void processNonRecord(ThrowingRunnable action) { Runnable wrappedAction = () -> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java index 99c4d0096e2227..0656ae01eb5ca6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java @@ -21,20 +21,19 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.util.function.ThrowingRunnable; -import javax.annotation.concurrent.GuardedBy; +import net.jcip.annotations.GuardedBy; import java.util.ArrayList; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; /** - * A runner for {@link StateFutureFactory} to build {@link - * org.apache.flink.core.state.InternalStateFuture} that put one mail in {@link MailboxExecutor} - * whenever there are callbacks to run but run multiple callbacks within one mail. + * A {@link org.apache.flink.core.state.StateFutureImpl.CallbackRunner} that put one mail in {@link + * MailboxExecutor} but run multiple callbacks within one mail. */ public class BatchCallbackRunner { - private static final int DEFAULT_BATCH_SIZE = 100; + private static final int DEFAULT_BATCH_SIZE = 3000; private final MailboxExecutor mailboxExecutor; @@ -57,8 +56,12 @@ public class BatchCallbackRunner { /** Whether there is a mail in mailbox. */ private volatile boolean hasMail = false; - BatchCallbackRunner(MailboxExecutor mailboxExecutor) { + /** The logic to notify new mails to AEC. */ + private final Runnable newMailNotify; + + BatchCallbackRunner(MailboxExecutor mailboxExecutor, Runnable newMailNotify) { this.mailboxExecutor = mailboxExecutor; + this.newMailNotify = newMailNotify; this.batchSize = DEFAULT_BATCH_SIZE; this.callbackQueue = new ConcurrentLinkedDeque<>(); this.activeBuffer = new ArrayList<>(); @@ -86,6 +89,7 @@ private void insertMail(boolean force) { if (currentCallbacks.get() > 0) { hasMail = true; mailboxExecutor.execute(this::runBatch, "Batch running callback of state requests"); + notifyNewMail(); } else { hasMail = false; } @@ -114,4 +118,14 @@ public void runBatch() throws Exception { } insertMail(true); } + + private void notifyNewMail() { + if (newMailNotify != null) { + newMailNotify.run(); + } + } + + public boolean isHasMail() { + return hasMail; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java index 104a0a11c4bbd5..7b874a45ec73f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.asyncprocessing; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; @@ -34,10 +33,10 @@ public class StateFutureFactory { StateFutureFactory( AsyncExecutionController asyncExecutionController, - MailboxExecutor mailboxExecutor, + BatchCallbackRunner callbackRunner, AsyncFrameworkExceptionHandler exceptionHandler) { this.asyncExecutionController = asyncExecutionController; - this.callbackRunner = new BatchCallbackRunner(mailboxExecutor); + this.callbackRunner = callbackRunner; this.exceptionHandler = exceptionHandler; }