diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index fce6f24cee79c..f69b4d3b72c10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -35,6 +35,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; @@ -51,10 +53,12 @@ * * @param the type of the key */ -public class AsyncExecutionController implements StateRequestHandler { +public class AsyncExecutionController implements StateRequestHandler, Closeable { private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); + private static final long DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL = 100; + /** * The batch size. When the number of state requests in the active buffer exceeds the batch * size, a batched state execution would be triggered. @@ -68,6 +72,14 @@ public class AsyncExecutionController implements StateRequestHandler { */ private final long bufferTimeout; + /** + * There might be huge overhead when inserting a timer for each buffer. A periodic check is a + * good trade-off to save much GC and CPU for this. This var defines the interval for periodic + * check of timeout. As a result, the real trigger time of timeout buffer might be [timeout, + * timeout+interval]. We don't make it configurable for now. + */ + private final long bufferTimeoutCheckInterval = DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL; + /** The max allowed number of in-flight records. */ private final int maxInFlightRecordNum; @@ -142,6 +154,7 @@ public AsyncExecutionController( this.stateRequestsBuffer = new StateRequestBuffer<>( bufferTimeout, + bufferTimeoutCheckInterval, (scheduledSeq) -> mailboxExecutor.execute( () -> { @@ -387,6 +400,11 @@ public int getInFlightRecordNum() { return inFlightRecordNum.get(); } + @Override + public void close() throws IOException { + stateRequestsBuffer.close(); + } + /** A listener listens the key context switch. */ public interface SwitchContextListener { void switchContext(RecordContext context); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java index 04379d2d34bc2..1433e83c7bf13 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.asyncprocessing; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import java.io.Closeable; +import java.io.IOException; import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; @@ -43,7 +46,7 @@ * @param the type of the key */ @NotThreadSafe -public class StateRequestBuffer { +public class StateRequestBuffer implements Closeable { /** All StateRequestBuffer in the same task manager share one ScheduledExecutorService. */ private static final ScheduledThreadPoolExecutor DELAYER = @@ -75,44 +78,68 @@ public class StateRequestBuffer { /** The timeout of {@link #activeQueue} triggering in milliseconds. */ final long bufferTimeout; + /** The interval of periodic buffer timeout check. */ + final long bufferTimeoutCheckInterval; + /** The handler to trigger when timeout. */ final Consumer timeoutHandler; /** The executor service that schedules and calls the triggers of this task. */ - ScheduledExecutorService scheduledExecutor; + final ScheduledExecutorService scheduledExecutor; /** * The current scheduled future, when the next scheduling occurs, the previous one that has not * yet been executed will be canceled. */ - ScheduledFuture currentScheduledFuture; + ScheduledFuture currentScheduledFuture; /** * The current scheduled trigger sequence number, a timeout trigger is scheduled only if {@code * scheduledSeq} is less than {@code currentSeq}. */ - AtomicLong scheduledSeq; + volatile Tuple2 seqAndTimeout = null; /** * The current trigger sequence number, used to distinguish different triggers. Every time a * trigger occurs, {@code currentSeq} increases by 1. */ - AtomicLong currentSeq; + final AtomicLong currentSeq; - public StateRequestBuffer(long bufferTimeout, Consumer timeoutHandler) { + public StateRequestBuffer( + long bufferTimeout, long bufferTimeoutCheckInterval, Consumer timeoutHandler) { this.activeQueue = new LinkedList<>(); this.blockingQueue = new HashMap<>(); this.blockingQueueSize = 0; this.bufferTimeout = bufferTimeout; + this.bufferTimeoutCheckInterval = bufferTimeoutCheckInterval; this.timeoutHandler = timeoutHandler; - this.scheduledSeq = new AtomicLong(-1); this.currentSeq = new AtomicLong(0); if (bufferTimeout > 0) { this.scheduledExecutor = DELAYER; + initPeriodicTimeoutCheck(); + } else { + this.scheduledExecutor = null; } } + private void initPeriodicTimeoutCheck() { + currentScheduledFuture = + scheduledExecutor.scheduleAtFixedRate( + () -> { + final Tuple2 theSeqAndTimeout = seqAndTimeout; + if (theSeqAndTimeout != null + && theSeqAndTimeout.f0 == currentSeq.get() + && theSeqAndTimeout.f1 <= System.currentTimeMillis()) { + timeoutHandler.accept(theSeqAndTimeout.f0); + } + }, + bufferTimeout, + bufferTimeoutCheckInterval, + TimeUnit.MILLISECONDS); + } + void advanceSeq() { + seqAndTimeout = null; currentSeq.incrementAndGet(); } @@ -125,24 +152,9 @@ void enqueueToActive(StateRequest request) { request.getFuture().complete(null); } else { activeQueue.add(request); - if (bufferTimeout > 0 && currentSeq.get() > scheduledSeq.get()) { - if (currentScheduledFuture != null - && !currentScheduledFuture.isDone() - && !currentScheduledFuture.isCancelled()) { - currentScheduledFuture.cancel(false); - } - final long thisScheduledSeq = currentSeq.get(); - scheduledSeq.set(thisScheduledSeq); - currentScheduledFuture = - (ScheduledFuture) - scheduledExecutor.schedule( - () -> { - if (thisScheduledSeq == currentSeq.get()) { - timeoutHandler.accept(thisScheduledSeq); - } - }, - bufferTimeout, - TimeUnit.MILLISECONDS); + if (bufferTimeout > 0 && seqAndTimeout == null) { + seqAndTimeout = + Tuple2.of(currentSeq.get(), System.currentTimeMillis() + bufferTimeout); } } } @@ -213,4 +225,12 @@ Optional popActive( } return Optional.of(stateRequestContainer); } + + @Override + public synchronized void close() throws IOException { + if (currentScheduledFuture != null) { + currentScheduledFuture.cancel(true); + currentScheduledFuture = null; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index c74befba770df..afe380158a23e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -316,4 +316,10 @@ AsyncExecutionController getAsyncExecutionController() { RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + + @Override + public void close() throws Exception { + super.close(); + asyncExecutionController.close(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index e686efab5ae72..f3322a463015c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -253,4 +253,10 @@ public AsyncExecutionController getAsyncExecutionController() { public RecordContext getCurrentProcessingContext() { return currentProcessingContext; } + + @Override + public void close() throws Exception { + super.close(); + asyncExecutionController.close(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index 0ee676dee483e..8d7853eb7efd6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -28,7 +28,6 @@ import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; -import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.StateBackend; @@ -46,7 +45,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -527,7 +525,7 @@ public void testSyncPoint() throws IOException { } @Test - void testBufferTimeout() throws IOException { + void testBufferTimeout() throws Exception { int batchSize = 5; int timeout = 1000; CloseableRegistry resourceRegistry = new CloseableRegistry(); @@ -538,11 +536,11 @@ void testBufferTimeout() throws IOException { new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), resourceRegistry); - ManuallyTriggeredScheduledExecutorService scheduledExecutor = - new ManuallyTriggeredScheduledExecutorService(); - aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor; + Runnable userCode = () -> valueState.asyncValue(); + assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0L); + assertThat(aec.stateRequestsBuffer.seqAndTimeout).isNull(); // ------------ basic timeout ------------------- for (int i = 0; i < batchSize - 1; i++) { String record = String.format("key%d-r%d", i, i); @@ -551,127 +549,47 @@ void testBufferTimeout() throws IOException { aec.setCurrentContext(recordContext); userCode.run(); } - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0); + assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0L); + assertThat(aec.stateRequestsBuffer.seqAndTimeout.f0).isEqualTo(0L); assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse(); assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1); assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1); assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); // buffer timeout, trigger - scheduledExecutor.triggerNonPeriodicScheduledTasks(); + Thread.sleep(2000); assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue(); - assertThat(aec.inFlightRecordNum.get()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0); - - // ----------------- oldest state request timeout ------------------ - // r5 and r6 should be triggered due to r5 exceeding timeout - String record5 = "key5-r5"; - String key5 = "key5"; - RecordContext recordContext5 = aec.buildContext(record5, key5); - aec.setCurrentContext(recordContext5); - // execute user code - userCode.run(); - assertThat(aec.inFlightRecordNum.get()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse(); - ScheduledFuture scheduledFuture = aec.stateRequestsBuffer.currentScheduledFuture; - String record6 = "key6-r6"; - String key6 = "key6"; - RecordContext recordContext6 = aec.buildContext(record6, key6); - aec.setCurrentContext(recordContext6); - // execute user code - userCode.run(); - assertThat(aec.inFlightRecordNum.get()).isEqualTo(2); - assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2); - - assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size()).isEqualTo(1); - assertThat(scheduledExecutor.getAllNonPeriodicScheduledTask().size()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1); - scheduledExecutor.triggerNonPeriodicScheduledTasks(); - assertThat(aec.inFlightRecordNum.get()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); - assertThat(scheduledFuture).isEqualTo(aec.stateRequestsBuffer.currentScheduledFuture); - assertThat(scheduledFuture.isDone()).isTrue(); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1); - - resourceRegistry.close(); - } + assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L); + assertThat(aec.stateRequestsBuffer.seqAndTimeout).isNull(); - @Test - void testBufferTimeoutSkip() throws IOException { - int batchSize = 3; - int timeout = 1000; - CloseableRegistry resourceRegistry = new CloseableRegistry(); - setup( - batchSize, - timeout, - 1000, - new SyncMailboxExecutor(), - new TestAsyncFrameworkExceptionHandler(), - resourceRegistry); - ManuallyTriggeredScheduledExecutorService scheduledExecutor = - new ManuallyTriggeredScheduledExecutorService(); - aec.stateRequestsBuffer.scheduledExecutor = scheduledExecutor; - Runnable userCode = () -> valueState.asyncValue(); - - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(-1); - // register r1 timeout - RecordContext recordContext = aec.buildContext("record1", "key1"); - aec.setCurrentContext(recordContext); - userCode.run(); - assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); - assertThat(aec.inFlightRecordNum.get()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0); - - // before r1 timeout execute, the active buffer size reach batch size. - RecordContext recordContext2 = aec.buildContext("record2", "key2"); - aec.setCurrentContext(recordContext2); - userCode.run(); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0); - RecordContext recordContext3 = aec.buildContext("record3", "key3"); - aec.setCurrentContext(recordContext3); - userCode.run(); - assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0); - - // r1 timeout executes, but r1 is already triggered in [r1,r2,r3], so r1 timeout should skip - assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size()).isEqualTo(1); - assertThat(scheduledExecutor.getAllNonPeriodicScheduledTask().size()).isEqualTo(1); - scheduledExecutor.triggerNonPeriodicScheduledTask(); - assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue(); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(0); - - RecordContext recordContext4 = aec.buildContext("record4", "key4"); - aec.setCurrentContext(recordContext4); - userCode.run(); - - // register r4 timeout, set new currentScheduledFuture - assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask().size()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); - assertThat(aec.inFlightRecordNum.get()).isEqualTo(1); + // ---------- buffer full before timeout ------------------ + for (int i = 0; i < batchSize - 1; i++) { + String record = String.format("key%d-r%d", i, i); + String key = String.format("key%d", batchSize + i); + RecordContext recordContext = aec.buildContext(record, key); + aec.setCurrentContext(recordContext); + userCode.run(); + } + assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L); + assertThat(aec.stateRequestsBuffer.seqAndTimeout.f0).isEqualTo(1L); assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse(); - assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isCancelled()).isFalse(); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1); - - // r4 timeout - scheduledExecutor.triggerNonPeriodicScheduledTask(); + assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); + for (int i = batchSize - 1; i < batchSize; i++) { + String record = String.format("key%d-r%d", i, i); + String key = String.format("key%d", batchSize + i); + RecordContext recordContext = aec.buildContext(record, key); + aec.setCurrentContext(recordContext); + userCode.run(); + } assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); + assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse(); assertThat(aec.inFlightRecordNum.get()).isEqualTo(0); - assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2); - assertThat(aec.stateRequestsBuffer.scheduledSeq.get()).isEqualTo(1); - assertThat(aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isTrue(); + assertThat(aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2L); + assertThat(aec.stateRequestsBuffer.seqAndTimeout).isNull(); resourceRegistry.close(); }