Skip to content

Commit

Permalink
[hotfix][runtime] Fix concurrency issue in callback runner of async s…
Browse files Browse the repository at this point in the history
…tate processing (apache#25426)
  • Loading branch information
Zakelly authored Sep 30, 2024
1 parent 7a60c19 commit 8391cfe
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
public class BatchCallbackRunner {

private static final int DEFAULT_BATCH_SIZE = 3000;
static final int DEFAULT_BATCH_SIZE = 3000;

private final MailboxExecutor mailboxExecutor;

Expand Down Expand Up @@ -80,18 +80,34 @@ public void submit(ThrowingRunnable<? extends Exception> task) {
activeBuffer = new ArrayList<>(batchSize);
}
}
currentCallbacks.incrementAndGet();
insertMail(false);
if (currentCallbacks.getAndIncrement() == 0) {
// Only when the single first callback is inserted, the mail should be inserted if not
// exist. Otherwise, the #runBatch() from task threads will keep the mail exist.
insertMail(false, true);
}
}

private void insertMail(boolean force) {
if (force || !hasMail) {
if (currentCallbacks.get() > 0) {
hasMail = true;
mailboxExecutor.execute(this::runBatch, "Batch running callback of state requests");
notifyNewMail();
} else {
hasMail = false;
/**
* Insert a mail for a batch of mails.
*
* @param force force check if there should insert a callback regardless of the #hasMail flag.
* @param notify should notify the AEC (typically on the task thread) if new mail inserted.
*/
private void insertMail(boolean force, boolean notify) {
// This method will be invoked via all ForSt I/O threads(from #submit) or task thread (from
// #runBatch()). The #hasMail flag should be protected by synchronized.
synchronized (this) {
if (force || !hasMail) {
if (currentCallbacks.get() > 0) {
hasMail = true;
mailboxExecutor.execute(
this::runBatch, "Batch running callback of state requests");
if (notify) {
notifyNewMail();
}
} else {
hasMail = false;
}
}
}
}
Expand All @@ -116,7 +132,8 @@ public void runBatch() throws Exception {
}
currentCallbacks.addAndGet(-batch.size());
}
insertMail(true);
// If we are on the task thread, there is no need to notify.
insertMail(true, false);
}

private void notifyNewMail() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;

import org.junit.jupiter.api.Test;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link BatchCallbackRunner}. */
public class BatchCallbackRunnerTest {

private static final ThrowingRunnable<? extends Exception> DUMMY = () -> {};

@Test
void testSingleSubmit() {
ManualMailboxExecutor executor = new ManualMailboxExecutor();
AtomicBoolean notified = new AtomicBoolean(false);
BatchCallbackRunner runner = new BatchCallbackRunner(executor, () -> notified.set(true));
runner.submit(DUMMY);
assertThat(runner.isHasMail()).isTrue();
assertThat(notified.get()).isTrue();
executor.runOne();
assertThat(runner.isHasMail()).isFalse();
}

@Test
void testHugeBatch() {
ManualMailboxExecutor executor = new ManualMailboxExecutor();
AtomicBoolean notified = new AtomicBoolean(false);
BatchCallbackRunner runner = new BatchCallbackRunner(executor, () -> notified.set(true));
for (int i = 0; i < BatchCallbackRunner.DEFAULT_BATCH_SIZE + 1; i++) {
runner.submit(DUMMY);
}
assertThat(runner.isHasMail()).isTrue();
assertThat(notified.get()).isTrue();
executor.runOne();
assertThat(runner.isHasMail()).isTrue();
notified.set(false);
runner.submit(DUMMY);
assertThat(notified.get()).isFalse();
executor.runOne();
assertThat(runner.isHasMail()).isFalse();
runner.submit(DUMMY);
assertThat(runner.isHasMail()).isTrue();
assertThat(notified.get()).isTrue();
}

/** A mailbox executor that immediately executes code in the current thread. */
public static class ManualMailboxExecutor implements MailboxExecutor {

Deque<ThrowingRunnable<? extends Exception>> commands = new ArrayDeque<>();

@Override
public void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
commands.push(command);
}

public void runOne() {
ThrowingRunnable<? extends Exception> command = commands.pop();
if (command != null) {
try {
command.run();
} catch (Exception e) {
throw new FlinkRuntimeException("Cannot execute mail", e);
}
}
}

@Override
public void yield() throws FlinkRuntimeException {}

@Override
public boolean tryYield() throws FlinkRuntimeException {
return false;
}

@Override
public boolean shouldInterrupt() {
return false;
}
}
}

0 comments on commit 8391cfe

Please sign in to comment.