diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java index ffab4d4f1778f..d106f62bfbb35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java @@ -33,7 +33,7 @@ */ public class BatchCallbackRunner { - private static final int DEFAULT_BATCH_SIZE = 3000; + static final int DEFAULT_BATCH_SIZE = 3000; private final MailboxExecutor mailboxExecutor; @@ -80,18 +80,34 @@ public void submit(ThrowingRunnable task) { activeBuffer = new ArrayList<>(batchSize); } } - currentCallbacks.incrementAndGet(); - insertMail(false); + if (currentCallbacks.getAndIncrement() == 0) { + // Only when the single first callback is inserted, the mail should be inserted if not + // exist. Otherwise, the #runBatch() from task threads will keep the mail exist. + insertMail(false, true); + } } - private void insertMail(boolean force) { - if (force || !hasMail) { - if (currentCallbacks.get() > 0) { - hasMail = true; - mailboxExecutor.execute(this::runBatch, "Batch running callback of state requests"); - notifyNewMail(); - } else { - hasMail = false; + /** + * Insert a mail for a batch of mails. + * + * @param force force check if there should insert a callback regardless of the #hasMail flag. + * @param notify should notify the AEC (typically on the task thread) if new mail inserted. + */ + private void insertMail(boolean force, boolean notify) { + // This method will be invoked via all ForSt I/O threads(from #submit) or task thread (from + // #runBatch()). The #hasMail flag should be protected by synchronized. + synchronized (this) { + if (force || !hasMail) { + if (currentCallbacks.get() > 0) { + hasMail = true; + mailboxExecutor.execute( + this::runBatch, "Batch running callback of state requests"); + if (notify) { + notifyNewMail(); + } + } else { + hasMail = false; + } } } } @@ -116,7 +132,8 @@ public void runBatch() throws Exception { } currentCallbacks.addAndGet(-batch.size()); } - insertMail(true); + // If we are on the task thread, there is no need to notify. + insertMail(true, false); } private void notifyNewMail() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java new file mode 100644 index 0000000000000..28ef9cc7f9bd7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunnerTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BatchCallbackRunner}. */ +public class BatchCallbackRunnerTest { + + private static final ThrowingRunnable DUMMY = () -> {}; + + @Test + void testSingleSubmit() { + ManualMailboxExecutor executor = new ManualMailboxExecutor(); + AtomicBoolean notified = new AtomicBoolean(false); + BatchCallbackRunner runner = new BatchCallbackRunner(executor, () -> notified.set(true)); + runner.submit(DUMMY); + assertThat(runner.isHasMail()).isTrue(); + assertThat(notified.get()).isTrue(); + executor.runOne(); + assertThat(runner.isHasMail()).isFalse(); + } + + @Test + void testHugeBatch() { + ManualMailboxExecutor executor = new ManualMailboxExecutor(); + AtomicBoolean notified = new AtomicBoolean(false); + BatchCallbackRunner runner = new BatchCallbackRunner(executor, () -> notified.set(true)); + for (int i = 0; i < BatchCallbackRunner.DEFAULT_BATCH_SIZE + 1; i++) { + runner.submit(DUMMY); + } + assertThat(runner.isHasMail()).isTrue(); + assertThat(notified.get()).isTrue(); + executor.runOne(); + assertThat(runner.isHasMail()).isTrue(); + notified.set(false); + runner.submit(DUMMY); + assertThat(notified.get()).isFalse(); + executor.runOne(); + assertThat(runner.isHasMail()).isFalse(); + runner.submit(DUMMY); + assertThat(runner.isHasMail()).isTrue(); + assertThat(notified.get()).isTrue(); + } + + /** A mailbox executor that immediately executes code in the current thread. */ + public static class ManualMailboxExecutor implements MailboxExecutor { + + Deque> commands = new ArrayDeque<>(); + + @Override + public void execute( + MailOptions mailOptions, + ThrowingRunnable command, + String descriptionFormat, + Object... descriptionArgs) { + commands.push(command); + } + + public void runOne() { + ThrowingRunnable command = commands.pop(); + if (command != null) { + try { + command.run(); + } catch (Exception e) { + throw new FlinkRuntimeException("Cannot execute mail", e); + } + } + } + + @Override + public void yield() throws FlinkRuntimeException {} + + @Override + public boolean tryYield() throws FlinkRuntimeException { + return false; + } + + @Override + public boolean shouldInterrupt() { + return false; + } + } +}