From c92aac5232a4f020e8e6cabeb2990f181711c329 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Sun, 29 Sep 2024 16:00:15 +0800 Subject: [PATCH] [hotfix][runtime] Fix concurrency issue in callback runner of async state processing --- .../asyncprocessing/BatchCallbackRunner.java | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java index ffab4d4f1778fc..2f7393495081f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/BatchCallbackRunner.java @@ -80,18 +80,34 @@ public void submit(ThrowingRunnable task) { activeBuffer = new ArrayList<>(batchSize); } } - currentCallbacks.incrementAndGet(); - insertMail(false); + if (currentCallbacks.getAndIncrement() == 0) { + // Only when the single first callback is inserted, the mail should be inserted if not + // exist. Otherwise, the #runBatch() from task threads will keep the mail exist. + insertMail(false, true); + } } - private void insertMail(boolean force) { - if (force || !hasMail) { - if (currentCallbacks.get() > 0) { - hasMail = true; - mailboxExecutor.execute(this::runBatch, "Batch running callback of state requests"); - notifyNewMail(); - } else { - hasMail = false; + /** + * Insert a mail for a batch of mails. + * + * @param force force check if there should insert a callback regardless #hasMail flag. + * @param notify should notify the AEC (typically on the task thread) if new mail inserted. + */ + private void insertMail(boolean force, boolean notify) { + // This method will be invoked via all ForSt I/O threads(from #submit) or task thread (from + // #runBatch()). The #hasMail flag should be protected by synchronized. + synchronized (this) { + if (force || !hasMail) { + if (currentCallbacks.get() > 0) { + hasMail = true; + mailboxExecutor.execute( + this::runBatch, "Batch running callback of state requests"); + if (notify) { + notifyNewMail(); + } + } else { + hasMail = false; + } } } } @@ -116,7 +132,8 @@ public void runBatch() throws Exception { } currentCallbacks.addAndGet(-batch.size()); } - insertMail(true); + // If we are on the task thread, there is no need to notify. + insertMail(true, false); } private void notifyNewMail() {