Skip to content

Commit

Permalink
[hotfix][runtime] Fix concurrency issue in callback runner of async s…
Browse files Browse the repository at this point in the history
…tate processing
  • Loading branch information
Zakelly committed Sep 29, 2024
1 parent ca11cb9 commit c92aac5
Showing 1 changed file with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,34 @@ public void submit(ThrowingRunnable<? extends Exception> task) {
activeBuffer = new ArrayList<>(batchSize);
}
}
currentCallbacks.incrementAndGet();
insertMail(false);
if (currentCallbacks.getAndIncrement() == 0) {
// Only when the single first callback is inserted, the mail should be inserted if not
// exist. Otherwise, the #runBatch() from task threads will keep the mail exist.
insertMail(false, true);
}
}

private void insertMail(boolean force) {
if (force || !hasMail) {
if (currentCallbacks.get() > 0) {
hasMail = true;
mailboxExecutor.execute(this::runBatch, "Batch running callback of state requests");
notifyNewMail();
} else {
hasMail = false;
/**
* Insert a mail for a batch of mails.
*
* @param force force check if there should insert a callback regardless #hasMail flag.
* @param notify should notify the AEC (typically on the task thread) if new mail inserted.
*/
private void insertMail(boolean force, boolean notify) {
// This method will be invoked via all ForSt I/O threads(from #submit) or task thread (from
// #runBatch()). The #hasMail flag should be protected by synchronized.
synchronized (this) {
if (force || !hasMail) {
if (currentCallbacks.get() > 0) {
hasMail = true;
mailboxExecutor.execute(
this::runBatch, "Batch running callback of state requests");
if (notify) {
notifyNewMail();
}
} else {
hasMail = false;
}
}
}
}
Expand All @@ -116,7 +132,8 @@ public void runBatch() throws Exception {
}
currentCallbacks.addAndGet(-batch.size());
}
insertMail(true);
// If we are on the task thread, there is no need to notify.
insertMail(true, false);
}

private void notifyNewMail() {
Expand Down

0 comments on commit c92aac5

Please sign in to comment.