diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 1c9086fc7ad22..f44717f37bb1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -89,6 +89,7 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStre public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { super.initializeState(streamTaskStateManager); + this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager(); getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); final StreamTask containingTask = checkNotNull(getContainingTask()); environment = containingTask.getEnvironment(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 1ecf28a1c12ab..874d8709151bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -89,6 +89,7 @@ public AbstractAsyncStateStreamOperatorV2( public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { super.initializeState(streamTaskStateManager); + this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager(); getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java index a819703cab49b..aebd8af24dc9a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; +import org.apache.flink.runtime.checkpoint.SnapshotType; import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.util.Disposable; @@ -41,6 +42,7 @@ public interface AsyncKeyedStateBackend extends Snapshotable>, InternalCheckpointListener, + PriorityQueueSetFactory, Disposable, Closeable, AsyncExecutionController.SwitchContextListener { @@ -83,10 +85,24 @@ S createState( @Nonnull StateExecutor createStateExecutor(); + /** Returns the key groups which this state backend is responsible for. */ + KeyGroupRange getKeyGroupRange(); + /** By default, a state backend does nothing when a key is switched in async processing. */ @Override default void switchContext(RecordContext context) {} + // TODO remove this once heap-based timers are working with ForSt incremental snapshots! + /** + * Whether the keyed state backend requires legacy synchronous timer snapshots. + * + * @param checkpointType + * @return true as default in case of AsyncKeyedStateBackend + */ + default boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType) { + return true; + } + @Override void dispose(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java index 6d4d645f7dc48..bf48415801f37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java @@ -26,12 +26,19 @@ import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.SnapshotType; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; @@ -96,6 +103,11 @@ public StateExecutor createStateExecutor() { return null; } + @Override + public KeyGroupRange getKeyGroupRange() { + return keyedStateBackend.getKeyGroupRange(); + } + @Override public void switchContext(RecordContext context) { keyedStateBackend.setCurrentKeyAndKeyGroup(context.getKey(), context.getKeyGroup()); @@ -139,4 +151,32 @@ public RunnableFuture> snapshot( return keyedStateBackend.snapshot( checkpointId, timestamp, streamFactory, checkpointOptions); } + + @Nonnull + @Override + public & Keyed> + KeyGroupedInternalPriorityQueue create( + @Nonnull String stateName, + @Nonnull TypeSerializer byteOrderedElementSerializer) { + return keyedStateBackend.create(stateName, byteOrderedElementSerializer); + } + + @Override + public & Keyed> + KeyGroupedInternalPriorityQueue create( + @Nonnull String stateName, + @Nonnull TypeSerializer byteOrderedElementSerializer, + boolean allowFutureMetadataUpdates) { + return keyedStateBackend.create( + stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates); + } + + @Override + public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType) { + if (keyedStateBackend instanceof AbstractKeyedStateBackend) { + return ((AbstractKeyedStateBackend) keyedStateBackend) + .requiresLegacySynchronousTimerSnapshots(checkpointType); + } + return false; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index a1b89031f5e4a..84e110289f873 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -22,9 +22,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; @@ -110,7 +111,8 @@ void snapshotToRawKeyedState( interface Provider extends Serializable { InternalTimeServiceManager create( TaskIOMetricGroup taskIOMetricGroup, - CheckpointableKeyedStateBackend keyedStatedBackend, + PriorityQueueSetFactory factory, + KeyGroupRange keyGroupRange, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java index 6bdfe05f4b913..3b5f81697f81e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java @@ -25,7 +25,6 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; @@ -101,21 +100,21 @@ private InternalTimeServiceManagerImpl( */ public static InternalTimeServiceManagerImpl create( TaskIOMetricGroup taskIOMetricGroup, - CheckpointableKeyedStateBackend keyedStateBackend, + PriorityQueueSetFactory factory, + KeyGroupRange keyGroupRange, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable rawKeyedStates, StreamTaskCancellationContext cancellationContext) throws Exception { - final KeyGroupRange keyGroupRange = keyedStateBackend.getKeyGroupRange(); final InternalTimeServiceManagerImpl timeServiceManager = new InternalTimeServiceManagerImpl<>( taskIOMetricGroup, keyGroupRange, keyContext, - keyedStateBackend, + factory, processingTimeService, cancellationContext); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java index 485f431f3d569..d6b1adc0132d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java @@ -67,6 +67,12 @@ default boolean isRestored() { */ InternalTimeServiceManager internalTimerServiceManager(); + /** + * Returns the internal timer service manager create by async state backend for the stream + * operator. This method returns null for non-keyed operators. + */ + InternalTimeServiceManager asyncInternalTimerServiceManager(); + /** * Returns an iterable to obtain input streams for previously stored operator state partitions * that are assigned to this stream operator. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index 226572030d1d9..a8f3af027e150 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -244,6 +244,11 @@ void snapshotState( && ((AbstractKeyedStateBackend) keyedStateBackend) .requiresLegacySynchronousTimerSnapshots( checkpointOptions.getCheckpointType()); + requiresLegacyRawKeyedStateSnapshots |= + keyedStateBackend instanceof AsyncKeyedStateBackend + && ((AsyncKeyedStateBackend) keyedStateBackend) + .requiresLegacySynchronousTimerSnapshots( + checkpointOptions.getCheckpointType()); if (requiresLegacyRawKeyedStateSnapshots) { checkState( @@ -459,6 +464,10 @@ public Object getCurrentKey() { } } + public InternalTimeServiceManager getAsyncInternalTimerServiceManager() { + return context.asyncInternalTimerServiceManager(); + } + public Optional getKeyedStateStore() { return Optional.ofNullable(keyedStateStore); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 8ec1a86fe0758..4195bacee219c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -179,7 +179,8 @@ public StreamOperatorStateContext streamOperatorStateContext( OperatorStateBackend operatorStateBackend = null; CloseableIterable rawKeyedStateInputs = null; CloseableIterable rawOperatorStateInputs = null; - InternalTimeServiceManager timeServiceManager; + InternalTimeServiceManager timeServiceManager = null; + InternalTimeServiceManager asyncTimeServiceManager = null; final StateObject.StateObjectSizeStatsCollector statsCollector = StateObject.StateObjectSizeStatsCollector.create(); @@ -241,34 +242,42 @@ public StreamOperatorStateContext streamOperatorStateContext( streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); // -------------- Internal Timer Service Manager -------------- + // if the operator indicates that it is using custom raw keyed state, + // then whatever was written in the raw keyed state snapshot was NOT written + // by the internal timer services (because there is only ever one user of raw keyed + // state); + // in this case, timers should not attempt to restore timers from the raw keyed + // state. + final Iterable restoredRawKeyedStateTimers = + (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) + ? rawKeyedStateInputs + : Collections.emptyList(); if (keyedStatedBackend != null) { - - // if the operator indicates that it is using custom raw keyed state, - // then whatever was written in the raw keyed state snapshot was NOT written - // by the internal timer services (because there is only ever one user of raw keyed - // state); - // in this case, timers should not attempt to restore timers from the raw keyed - // state. - final Iterable restoredRawKeyedStateTimers = - (prioritizedOperatorSubtaskStates.isRestored() - && !isUsingCustomRawKeyedState) - ? rawKeyedStateInputs - : Collections.emptyList(); - timeServiceManager = timeServiceManagerProvider.create( environment.getMetricGroup().getIOMetricGroup(), keyedStatedBackend, + keyedStatedBackend.getKeyGroupRange(), + environment.getUserCodeClassLoader().asClassLoader(), + keyContext, + processingTimeService, + restoredRawKeyedStateTimers, + cancellationContext); + } + if (stateBackend.supportsAsyncKeyedStateBackend()) { + asyncTimeServiceManager = + timeServiceManagerProvider.create( + environment.getMetricGroup().getIOMetricGroup(), + asyncKeyedStateBackend, + asyncKeyedStateBackend.getKeyGroupRange(), environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers, cancellationContext); } else { - timeServiceManager = null; + asyncTimeServiceManager = timeServiceManager; } - // TODO: Support Timer for AsyncKeyedStateBackend - // Add stats for input channel and result partition state Stream.concat( prioritizedOperatorSubtaskStates.getPrioritizedInputChannelState() @@ -295,6 +304,7 @@ public StreamOperatorStateContext streamOperatorStateContext( keyedStatedBackend, asyncKeyedStateBackend, timeServiceManager, + asyncTimeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); } catch (Exception ex) { @@ -778,6 +788,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta private final CheckpointableKeyedStateBackend keyedStateBackend; private final AsyncKeyedStateBackend asyncKeyedStateBackend; private final InternalTimeServiceManager internalTimeServiceManager; + private final InternalTimeServiceManager asyncInternalTimeServiceManager; private final CloseableIterable rawOperatorStateInputs; private final CloseableIterable rawKeyedStateInputs; @@ -788,6 +799,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta CheckpointableKeyedStateBackend keyedStateBackend, AsyncKeyedStateBackend asyncKeyedStateBackend, InternalTimeServiceManager internalTimeServiceManager, + InternalTimeServiceManager asyncInternalTimeServiceManager, CloseableIterable rawOperatorStateInputs, CloseableIterable rawKeyedStateInputs) { @@ -796,6 +808,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta this.keyedStateBackend = keyedStateBackend; this.asyncKeyedStateBackend = asyncKeyedStateBackend; this.internalTimeServiceManager = internalTimeServiceManager; + this.asyncInternalTimeServiceManager = asyncInternalTimeServiceManager; this.rawOperatorStateInputs = rawOperatorStateInputs; this.rawKeyedStateInputs = rawKeyedStateInputs; } @@ -827,6 +840,11 @@ public InternalTimeServiceManager internalTimerServiceManager() { return internalTimeServiceManager; } + @Override + public InternalTimeServiceManager asyncInternalTimerServiceManager() { + return asyncInternalTimeServiceManager; + } + @Override public CloseableIterable rawOperatorStateInputs() { return rawOperatorStateInputs; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java index bc08bf635bf57..57f0b3f6f0552 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java @@ -21,10 +21,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; @@ -106,19 +107,21 @@ public void snapshotToRawKeyedState( public static InternalTimeServiceManager create( TaskIOMetricGroup taskIOMetricGroup, - CheckpointableKeyedStateBackend keyedStatedBackend, + PriorityQueueSetFactory factory, + KeyGroupRange keyGroupRange, ClassLoader userClassloader, KeyContext keyContext, // the operator ProcessingTimeService processingTimeService, Iterable rawKeyedStates, StreamTaskCancellationContext cancellationContext) { checkState( - keyedStatedBackend instanceof BatchExecutionKeyedStateBackend, + factory instanceof BatchExecutionKeyedStateBackend, "Batch execution specific time service can work only with BatchExecutionKeyedStateBackend"); BatchExecutionInternalTimeServiceManager timeServiceManager = new BatchExecutionInternalTimeServiceManager<>(processingTimeService); - keyedStatedBackend.registerKeySelectionListener(timeServiceManager); + ((BatchExecutionKeyedStateBackend) factory) + .registerKeySelectionListener(timeServiceManager); return timeServiceManager; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java index 735f11b727e56..600aecb8a4888 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.function.ThrowingConsumer; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.concurrent.ExecutorService; @@ -222,7 +221,6 @@ void testCheckpointDrain() throws Exception { } } - @Disabled("Support Timer for AsyncKeyedStateBackend") @Test void testTimerServiceIsAsync() throws Exception { try (KeyedOneInputStreamOperatorTestHarness, String> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java index 2b7732433a291..ff01605809d72 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.util.function.FunctionWithException; import javax.annotation.Nonnull; @@ -114,12 +115,14 @@ private static class TestAsyncKeyedStateBackend implements AsyncKeyedStateBac private final Supplier innerStateSupplier; private final StateExecutor stateExecutor; + private final PriorityQueueSetFactory factory; public TestAsyncKeyedStateBackend( Supplier innerStateSupplier, StateExecutor stateExecutor) { this.innerStateSupplier = innerStateSupplier; this.stateExecutor = stateExecutor; + this.factory = new HeapPriorityQueueSetFactory(new KeyGroupRange(0, 127), 128, 128); } @Override @@ -143,6 +146,11 @@ public StateExecutor createStateExecutor() { return stateExecutor; } + @Override + public KeyGroupRange getKeyGroupRange() { + return new KeyGroupRange(0, 127); + } + @Override public void dispose() { // do nothing @@ -173,6 +181,15 @@ public RunnableFuture> snapshot( // do nothing return null; } + + @Nonnull + @Override + public & Keyed> + KeyGroupedInternalPriorityQueue create( + @Nonnull String stateName, + @Nonnull TypeSerializer byteOrderedElementSerializer) { + return factory.create(stateName, byteOrderedElementSerializer); + } } /** Wrapper of state backend which supports apply the snapshot result. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java index 4e4db4a73b3f6..bb09983de6f0b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java @@ -31,10 +31,15 @@ import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.PriorityComparable; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -128,6 +133,18 @@ public boolean supportsAsyncKeyedStateBackend() { public AsyncKeyedStateBackend createAsyncKeyedStateBackend( KeyedStateBackendParameters parameters) { return new AsyncKeyedStateBackend() { + @Nonnull + @Override + public < + T extends + HeapPriorityQueueElement & PriorityComparable + & Keyed> + KeyGroupedInternalPriorityQueue create( + @Nonnull String stateName, + @Nonnull TypeSerializer byteOrderedElementSerializer) { + throw new UnsupportedOperationException("Not support for test yet."); + } + @Nonnull @Override public RunnableFuture> snapshot( @@ -170,6 +187,11 @@ public StateExecutor createStateExecutor() { return new TestStateExecutor(); } + @Override + public KeyGroupRange getKeyGroupRange() { + return new KeyGroupRange(0, 127); + } + @Override public void dispose() { // do nothing diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java index ea0c58bc4c5b2..b28ca4d7fa2cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SnapshotResult; @@ -49,6 +50,8 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.statemigration.TestType; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.IOUtils; import org.junit.jupiter.api.AfterEach; @@ -61,6 +64,7 @@ import java.util.List; import java.util.concurrent.RunnableFuture; +import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; /** @@ -337,6 +341,70 @@ void testAsyncKeyedStateBackendSnapshot() throws Exception { assertThat(testExceptionHandler.exception).isNull(); } + @TestTemplate + void testKeyGroupedInternalPriorityQueue() throws Exception { + testKeyGroupedInternalPriorityQueue(false); + } + + @TestTemplate + void testKeyGroupedInternalPriorityQueueAddAll() throws Exception { + testKeyGroupedInternalPriorityQueue(true); + } + + void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception { + String fieldName = "key-grouped-priority-queue"; + AsyncKeyedStateBackend backend = + createAsyncKeyedBackend(IntSerializer.INSTANCE, 128, env); + try { + KeyGroupedInternalPriorityQueue priorityQueue = + backend.create(fieldName, new TestType.V1TestTypeSerializer()); + + TestType elementA42 = new TestType("a", 42); + TestType elementA44 = new TestType("a", 44); + TestType elementB1 = new TestType("b", 1); + TestType elementB3 = new TestType("b", 3); + + TestType[] elements = { + elementA44, elementB1, elementB1, elementB3, elementA42, + }; + + if (addAll) { + priorityQueue.addAll(asList(elements)); + } else { + assertThat(priorityQueue.add(elements[0])).isTrue(); + assertThat(priorityQueue.add(elements[1])).isTrue(); + assertThat(priorityQueue.add(elements[2])).isFalse(); + assertThat(priorityQueue.add(elements[3])).isFalse(); + assertThat(priorityQueue.add(elements[4])).isFalse(); + } + assertThat(priorityQueue.isEmpty()).isFalse(); + assertThat(priorityQueue.getSubsetForKeyGroup(81)) + .containsExactlyInAnyOrder(elementA42, elementA44); + assertThat(priorityQueue.getSubsetForKeyGroup(22)) + .containsExactlyInAnyOrder(elementB1, elementB3); + + assertThat(priorityQueue.peek()).isEqualTo(elementB1); + assertThat(priorityQueue.poll()).isEqualTo(elementB1); + assertThat(priorityQueue.peek()).isEqualTo(elementB3); + + List actualList = new ArrayList<>(); + try (CloseableIterator iterator = priorityQueue.iterator()) { + iterator.forEachRemaining(actualList::add); + } + + assertThat(actualList).containsExactlyInAnyOrder(elementB3, elementA42, elementA44); + + assertThat(priorityQueue.size()).isEqualTo(3); + + assertThat(priorityQueue.remove(elementB1)).isFalse(); + assertThat(priorityQueue.remove(elementB3)).isTrue(); + assertThat(priorityQueue.peek()).isEqualTo(elementA42); + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + static class TestAsyncFrameworkExceptionHandler implements StateFutureImpl.AsyncFrameworkExceptionHandler { String message = null; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index edac09ab3a324..faedc0e00ab60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; @@ -46,6 +45,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StatePartitionStreamProvider; @@ -196,7 +196,8 @@ void setUp() throws Exception { @Override public InternalTimeServiceManager create( TaskIOMetricGroup taskIOMetricGroup, - CheckpointableKeyedStateBackend keyedStatedBackend, + PriorityQueueSetFactory factory, + KeyGroupRange keyGroupRange, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index ab4569d4e5fa6..9c3be957b9f7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -36,10 +36,12 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StatePartitionStreamProvider; @@ -345,7 +347,8 @@ private StreamTaskStateInitializer streamTaskStateManager( @Override public InternalTimeServiceManager create( TaskIOMetricGroup taskIOMetricGroup, - CheckpointableKeyedStateBackend keyedStatedBackend, + PriorityQueueSetFactory factory, + KeyGroupRange keyGroupRange, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceTest.java index 7688ed2e480e2..f53eb41c7e9d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceTest.java @@ -91,6 +91,7 @@ void testBatchExecutionManagerCanBeInstantiatedWithBatchStateBackend() throws Ex UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() .getIOMetricGroup(), stateBackend, + null, this.getClass().getClassLoader(), new DummyKeyContext(), new TestProcessingTimeService(), @@ -147,6 +148,7 @@ void testFiringEventTimeTimers() throws Exception { UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() .getIOMetricGroup(), keyedStatedBackend, + null, this.getClass().getClassLoader(), new DummyKeyContext(), new TestProcessingTimeService(), @@ -185,6 +187,7 @@ void testSettingSameKeyDoesNotFireTimers() { UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() .getIOMetricGroup(), keyedStatedBackend, + null, this.getClass().getClassLoader(), new DummyKeyContext(), new TestProcessingTimeService(), @@ -216,6 +219,7 @@ void testCurrentWatermark() throws Exception { UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() .getIOMetricGroup(), keyedStatedBackend, + null, this.getClass().getClassLoader(), new DummyKeyContext(), new TestProcessingTimeService(), @@ -265,6 +269,7 @@ void testProcessingTimeTimers() { UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() .getIOMetricGroup(), keyedStatedBackend, + null, this.getClass().getClassLoader(), new DummyKeyContext(), processingTimeService, @@ -302,6 +307,7 @@ void testIgnoringEventTimeTimersFromWithinCallback() { UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() .getIOMetricGroup(), keyedStatedBackend, + null, this.getClass().getClassLoader(), new DummyKeyContext(), processingTimeService, @@ -344,6 +350,7 @@ void testIgnoringProcessingTimeTimersFromWithinCallback() { UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() .getIOMetricGroup(), keyedStatedBackend, + null, this.getClass().getClassLoader(), new DummyKeyContext(), processingTimeService, diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 2d93e3375f388..edfc16b823772 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -44,11 +44,11 @@ import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; -import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; @@ -152,7 +152,8 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable { @Override public InternalTimeServiceManager create( TaskIOMetricGroup taskIOMetricGroup, - CheckpointableKeyedStateBackend keyedStatedBackend, + PriorityQueueSetFactory factory, + KeyGroupRange keyGroupRange, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService, @@ -162,13 +163,16 @@ public InternalTimeServiceManager create( InternalTimeServiceManagerImpl typedTimeServiceManager = InternalTimeServiceManagerImpl.create( taskIOMetricGroup, - keyedStatedBackend, + factory, + keyGroupRange, userClassloader, keyContext, processingTimeService, rawKeyedStates, cancellationContext); - timeServiceManager = typedTimeServiceManager; + if (timeServiceManager == null) { + timeServiceManager = typedTimeServiceManager; + } return typedTimeServiceManager; } }; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 3010033583246..4d3c82acc4a25 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -31,12 +31,22 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.HeapPriorityQueuesManager; +import org.apache.flink.runtime.state.InternalKeyContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.SnapshotStrategyRunner; import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; @@ -62,6 +72,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.RunnableFuture; @@ -81,6 +92,9 @@ public class ForStKeyedStateBackend implements AsyncKeyedStateBackend { /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; + /** The key groups which this state backend is responsible for. */ + private final KeyGroupRange keyGroupRange; + /** The key serializer. */ protected final TypeSerializer keySerializer; @@ -111,6 +125,11 @@ public class ForStKeyedStateBackend implements AsyncKeyedStateBackend { private final ForStSnapshotStrategyBase snapshotStrategy; + /** Factory for priority queue state. */ + private final PriorityQueueSetFactory priorityQueueFactory; + + private final HeapPriorityQueuesManager heapPriorityQueuesManager; + /** * Registry for all opened streams, so they can be closed if the task using this backend is * closed. @@ -157,14 +176,18 @@ public ForStKeyedStateBackend( Supplier valueDeserializerView, RocksDB db, LinkedHashMap kvStateInformation, + Map> registeredPQStates, Function columnFamilyOptionsFactory, ColumnFamilyHandle defaultColumnFamilyHandle, ForStSnapshotStrategyBase snapshotStrategy, + PriorityQueueSetFactory priorityQueueFactory, CloseableRegistry cancelStreamRegistry, - ForStNativeMetricMonitor nativeMetricMonitor) { + ForStNativeMetricMonitor nativeMetricMonitor, + InternalKeyContext keyContext) { this.backendUID = backendUID; this.optionsContainer = Preconditions.checkNotNull(optionsContainer); this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.keyGroupRange = keyContext.getKeyGroupRange(); this.keySerializer = keySerializer; this.serializedKeyBuilder = serializedKeyBuilder; this.valueSerializerView = valueSerializerView; @@ -177,6 +200,17 @@ public ForStKeyedStateBackend( this.cancelStreamRegistry = cancelStreamRegistry; this.nativeMetricMonitor = nativeMetricMonitor; this.managedStateExecutors = new HashSet<>(1); + this.priorityQueueFactory = priorityQueueFactory; + if (priorityQueueFactory instanceof HeapPriorityQueueSetFactory) { + this.heapPriorityQueuesManager = + new HeapPriorityQueuesManager( + registeredPQStates, + (HeapPriorityQueueSetFactory) priorityQueueFactory, + keyContext.getKeyGroupRange(), + keyContext.getNumberOfKeyGroups()); + } else { + this.heapPriorityQueuesManager = null; + } } @Override @@ -370,6 +404,11 @@ public StateExecutor createStateExecutor() { } } + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + @Nonnull @Override public RunnableFuture> snapshot( @@ -469,6 +508,30 @@ public void close() throws IOException { dispose(); } + @Nonnull + @Override + public & Keyed> + KeyGroupedInternalPriorityQueue create( + @Nonnull String stateName, + @Nonnull TypeSerializer byteOrderedElementSerializer) { + return create(stateName, byteOrderedElementSerializer, false); + } + + @Override + public & Keyed> + KeyGroupedInternalPriorityQueue create( + @Nonnull String stateName, + @Nonnull TypeSerializer byteOrderedElementSerializer, + boolean allowFutureMetadataUpdates) { + if (this.heapPriorityQueuesManager != null) { + return this.heapPriorityQueuesManager.createOrUpdate( + stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates); + } else { + return priorityQueueFactory.create( + stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates); + } + } + /** ForSt specific information about the k/v states. */ public static class ForStKvStateInfo implements AutoCloseable { public final ColumnFamilyHandle columnFamilyHandle; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index ec4601683e7ec..db57bfc20d05e 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -27,19 +27,26 @@ import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; +import org.apache.flink.runtime.state.InternalKeyContext; +import org.apache.flink.runtime.state.InternalKeyContextImpl; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendBuilder; import org.apache.flink.runtime.state.StateSerializerProvider; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.state.forst.fs.ForStFlinkFileSystem; +import org.apache.flink.state.forst.restore.ForStHeapTimersFullRestoreOperation; import org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation; import org.apache.flink.state.forst.restore.ForStNoneRestoreOperation; import org.apache.flink.state.forst.restore.ForStRestoreOperation; import org.apache.flink.state.forst.restore.ForStRestoreResult; import org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy; import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase; +import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -83,6 +90,9 @@ public class ForStKeyedStateBackendBuilder /** String that identifies the operator that owns this backend. */ private final String operatorIdentifier; + /** The configuration of rocksDB priorityQueue state. */ + private final ForStPriorityQueueConfig priorityQueueConfig; + protected final ClassLoader userCodeClassLoader; protected final CloseableRegistry cancelStreamRegistry; @@ -117,6 +127,7 @@ public ForStKeyedStateBackendBuilder( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, + ForStPriorityQueueConfig priorityQueueConfig, MetricGroup metricGroup, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection stateHandles, @@ -129,6 +140,7 @@ public ForStKeyedStateBackendBuilder( StateSerializerProvider.fromNewRegisteredSerializer(keySerializer); this.numberOfKeyGroups = numberOfKeyGroups; this.keyGroupRange = keyGroupRange; + this.priorityQueueConfig = priorityQueueConfig; this.metricGroup = metricGroup; this.customInitializationMetrics = customInitializationMetrics; this.restoreStateHandles = stateHandles; @@ -157,6 +169,8 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { LinkedHashMap kvStateInformation = new LinkedHashMap<>(); + LinkedHashMap> registeredPQStates = + new LinkedHashMap<>(); RocksDB db = null; ForStRestoreOperation restoreOperation = null; @@ -183,10 +197,11 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { Supplier valueDeserializerView = DataInputDeserializer::new; UUID backendUID = UUID.randomUUID(); + PriorityQueueSetFactory priorityQueueFactory; try { optionsContainer.prepareDirectories(); - restoreOperation = getForStRestoreOperation(kvStateInformation); + restoreOperation = getForStRestoreOperation(kvStateInformation, registeredPQStates); ForStRestoreResult restoreResult = restoreOperation.restore(); db = restoreResult.getDb(); defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle(); @@ -209,6 +224,8 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { materializedSstFiles, lastCompletedCheckpointId); + priorityQueueFactory = createHeapQueueFactory(); + } catch (Throwable e) { // Do clean up IOUtils.closeQuietly(cancelStreamRegistryForBackend); @@ -237,6 +254,8 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { throw new BackendBuildingException(errMsg, e); } } + InternalKeyContext keyContext = + new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups); logger.info( "Finished building ForSt keyed state-backend at local base path: {}, remote base path: {}.", optionsContainer.getLocalBasePath(), @@ -251,15 +270,19 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { valueDeserializerView, db, kvStateInformation, + registeredPQStates, columnFamilyOptionsFactory, defaultColumnFamilyHandle, snapshotStrategy, + priorityQueueFactory, cancelStreamRegistryForBackend, - nativeMetricMonitor); + nativeMetricMonitor, + keyContext); } private ForStRestoreOperation getForStRestoreOperation( - LinkedHashMap kvStateInformation) { + LinkedHashMap kvStateInformation, + LinkedHashMap> registeredPQStates) { // Currently, ForStDB does not support mixing local-dir and remote-dir, and ForStDB will // concatenates the dfs directory with the local directory as working dir when using flink // env. We expect to directly use the dfs directory in flink env or local directory as @@ -299,7 +322,27 @@ private ForStRestoreOperation getForStRestoreOperation( customInitializationMetrics, CollectionUtil.checkedSubTypeCast( restoreStateHandles, IncrementalRemoteKeyedStateHandle.class)); + } else if (priorityQueueConfig.getPriorityQueueStateType() + == ForStStateBackend.PriorityQueueStateType.HEAP) { + // Note: This branch can be touched after ForSt Support canonical savepoint, + // Timers are stored as raw keyed state instead of managed keyed state now. + return new ForStHeapTimersFullRestoreOperation<>( + keyGroupRange, + numberOfKeyGroups, + userCodeClassLoader, + kvStateInformation, + registeredPQStates, + createHeapQueueFactory(), + keySerializerProvider, + instanceForStPath, + optionsContainer.getDbOptions(), + columnFamilyOptionsFactory, + nativeMetricOptions, + metricGroup, + restoreStateHandles, + cancelStreamRegistry); } + // TODO: Support Restoring throw new UnsupportedOperationException("Not support restoring yet for ForStStateBackend"); } @@ -351,4 +394,8 @@ private ForStRestoreOperation getForStRestoreOperation( } return snapshotStrategy; } + + private HeapPriorityQueueSetFactory createHeapQueueFactory() { + return new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128); + } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index f8ddc75d54c71..6e61ecd3920b2 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -362,6 +362,7 @@ public ForStKeyedStateBackend createAsyncKeyedStateBackend( parameters.getKeySerializer(), parameters.getNumberOfKeyGroups(), parameters.getKeyGroupRange(), + priorityQueueConfig, parameters.getMetricGroup(), parameters.getCustomInitializationMetrics(), parameters.getStateHandles(), diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java new file mode 100644 index 0000000000000..aa673a69658f3 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.restore; + +import org.apache.flink.core.fs.ICloseableRegistry; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; +import org.apache.flink.runtime.state.StateSerializerProvider; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType; +import org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation; +import org.apache.flink.runtime.state.restore.KeyGroup; +import org.apache.flink.runtime.state.restore.KeyGroupEntry; +import org.apache.flink.runtime.state.restore.SavepointRestoreResult; +import org.apache.flink.runtime.state.restore.ThrowingIterator; +import org.apache.flink.state.forst.ForStDBWriteBatchWrapper; +import org.apache.flink.state.forst.ForStKeyedStateBackend; +import org.apache.flink.state.forst.ForStNativeMetricOptions; +import org.apache.flink.util.StateMigrationException; + +import org.forstdb.ColumnFamilyHandle; +import org.forstdb.ColumnFamilyOptions; +import org.forstdb.DBOptions; +import org.forstdb.RocksDBException; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** Encapsulates the process of restoring a ForStDB instance from a full snapshot. */ +public class ForStHeapTimersFullRestoreOperation implements ForStRestoreOperation { + private final FullSnapshotRestoreOperation savepointRestoreOperation; + + private final LinkedHashMap> + registeredPQStates; + private final HeapPriorityQueueSetFactory priorityQueueFactory; + private final int numberOfKeyGroups; + private final DataInputDeserializer deserializer = new DataInputDeserializer(); + + private final ForStHandle rocksHandle; + private final KeyGroupRange keyGroupRange; + private final int keyGroupPrefixBytes; + private final ICloseableRegistry cancelStreamRegistryForRestore; + + public ForStHeapTimersFullRestoreOperation( + KeyGroupRange keyGroupRange, + int numberOfKeyGroups, + ClassLoader userCodeClassLoader, + Map kvStateInformation, + LinkedHashMap> registeredPQStates, + HeapPriorityQueueSetFactory priorityQueueFactory, + StateSerializerProvider keySerializerProvider, + File instanceRocksDBPath, + DBOptions dbOptions, + Function columnFamilyOptionsFactory, + ForStNativeMetricOptions nativeMetricOptions, + MetricGroup metricGroup, + @Nonnull Collection restoreStateHandles, + ICloseableRegistry cancelStreamRegistryForRestore) { + this.rocksHandle = + new ForStHandle( + kvStateInformation, + instanceRocksDBPath, + dbOptions, + columnFamilyOptionsFactory, + nativeMetricOptions, + metricGroup); + this.savepointRestoreOperation = + new FullSnapshotRestoreOperation<>( + keyGroupRange, + userCodeClassLoader, + restoreStateHandles, + keySerializerProvider); + this.registeredPQStates = registeredPQStates; + this.priorityQueueFactory = priorityQueueFactory; + this.numberOfKeyGroups = numberOfKeyGroups; + this.keyGroupRange = keyGroupRange; + this.keyGroupPrefixBytes = + CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix( + numberOfKeyGroups); + this.cancelStreamRegistryForRestore = cancelStreamRegistryForRestore; + } + + /** Restores all key-groups data that is referenced by the passed state handles. */ + @Override + public ForStRestoreResult restore() + throws IOException, StateMigrationException, RocksDBException { + rocksHandle.openDB(); + try (ThrowingIterator restore = + savepointRestoreOperation.restore()) { + while (restore.hasNext()) { + applyRestoreResult(restore.next()); + } + } + return new ForStRestoreResult( + this.rocksHandle.getDb(), + this.rocksHandle.getDefaultColumnFamilyHandle(), + this.rocksHandle.getNativeMetricMonitor(), + -1, + null, + null); + } + + private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult) + throws IOException, RocksDBException, StateMigrationException { + List restoredMetaInfos = + savepointRestoreResult.getStateMetaInfoSnapshots(); + Map columnFamilyHandles = new HashMap<>(); + Map> restoredPQStates = new HashMap<>(); + for (int i = 0; i < restoredMetaInfos.size(); i++) { + StateMetaInfoSnapshot restoredMetaInfo = restoredMetaInfos.get(i); + if (restoredMetaInfo.getBackendStateType() == BackendStateType.PRIORITY_QUEUE) { + String stateName = restoredMetaInfo.getName(); + HeapPriorityQueueSnapshotRestoreWrapper queueWrapper = + registeredPQStates.computeIfAbsent( + stateName, + key -> + createInternal( + new RegisteredPriorityQueueStateBackendMetaInfo<>( + restoredMetaInfo))); + restoredPQStates.put(i, queueWrapper); + } else { + ForStKeyedStateBackend.ForStKvStateInfo registeredStateCFHandle = + this.rocksHandle.getOrRegisterStateColumnFamilyHandle( + null, restoredMetaInfo); + columnFamilyHandles.put(i, registeredStateCFHandle.columnFamilyHandle); + } + } + + try (ThrowingIterator keyGroups = savepointRestoreResult.getRestoredKeyGroups()) { + restoreKVStateData(keyGroups, columnFamilyHandles, restoredPQStates); + } + } + + /** + * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state + * handle. + */ + private void restoreKVStateData( + ThrowingIterator keyGroups, + Map columnFamilies, + Map> restoredPQStates) + throws IOException, RocksDBException, StateMigrationException { + // for all key-groups in the current state handle... + try (ForStDBWriteBatchWrapper writeBatchWrapper = + new ForStDBWriteBatchWrapper(this.rocksHandle.getDb(), 0); + Closeable ignored = + cancelStreamRegistryForRestore.registerCloseableTemporarily( + writeBatchWrapper.getCancelCloseable())) { + HeapPriorityQueueSnapshotRestoreWrapper restoredPQ = null; + ColumnFamilyHandle handle = null; + while (keyGroups.hasNext()) { + KeyGroup keyGroup = keyGroups.next(); + try (ThrowingIterator groupEntries = keyGroup.getKeyGroupEntries()) { + int oldKvStateId = -1; + while (groupEntries.hasNext()) { + KeyGroupEntry groupEntry = groupEntries.next(); + int kvStateId = groupEntry.getKvStateId(); + if (kvStateId != oldKvStateId) { + oldKvStateId = kvStateId; + handle = columnFamilies.get(kvStateId); + restoredPQ = getRestoredPQ(restoredPQStates, kvStateId); + } + if (restoredPQ != null) { + restoreQueueElement(restoredPQ, groupEntry); + } else if (handle != null) { + writeBatchWrapper.put( + handle, groupEntry.getKey(), groupEntry.getValue()); + } else { + throw new IllegalStateException("Unknown state id: " + kvStateId); + } + } + } + } + } + } + + private void restoreQueueElement( + HeapPriorityQueueSnapshotRestoreWrapper restoredPQ, + KeyGroupEntry groupEntry) + throws IOException { + deserializer.setBuffer(groupEntry.getKey()); + deserializer.skipBytesToRead(keyGroupPrefixBytes); + HeapPriorityQueueElement queueElement = + restoredPQ.getMetaInfo().getElementSerializer().deserialize(deserializer); + restoredPQ.getPriorityQueue().add(queueElement); + } + + @SuppressWarnings("unchecked") + private HeapPriorityQueueSnapshotRestoreWrapper getRestoredPQ( + Map> restoredPQStates, + int kvStateId) { + return (HeapPriorityQueueSnapshotRestoreWrapper) + restoredPQStates.get(kvStateId); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private & Keyed> + HeapPriorityQueueSnapshotRestoreWrapper createInternal( + RegisteredPriorityQueueStateBackendMetaInfo metaInfo) { + + final String stateName = metaInfo.getName(); + final HeapPriorityQueueSet priorityQueue = + priorityQueueFactory.create(stateName, metaInfo.getElementSerializer()); + + return new HeapPriorityQueueSnapshotRestoreWrapper<>( + priorityQueue, + metaInfo, + KeyExtractorFunction.forKeyedObjects(), + keyGroupRange, + numberOfKeyGroups); + } + + @Override + public void close() throws Exception { + this.rocksHandle.close(); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java index 79e7b82989e2f..1bcb924ce493c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java @@ -38,7 +38,6 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.function.ThrowingConsumer; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -223,7 +222,6 @@ void testCheckpointDrain() throws Exception { } } - @Disabled("Support Timer for AsyncKeyedStateBackend") @Test void testTimerServiceIsAsync() throws Exception { try (KeyedOneInputStreamOperatorV2TestHarness, String> diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 9593fa8541878..ef99125a6de6c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -2361,6 +2361,13 @@ public InternalTimeServiceManager internalTimerServiceManager() { return timeServiceManager != null ? spy(timeServiceManager) : null; } + @Override + public InternalTimeServiceManager asyncInternalTimerServiceManager() { + InternalTimeServiceManager timeServiceManager = + controller.internalTimerServiceManager(); + return timeServiceManager != null ? spy(timeServiceManager) : null; + } + @Override public CloseableIterable rawOperatorStateInputs() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java index 51e4fd501da22..8f7e1e1af1e37 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java @@ -37,13 +37,14 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider; import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StatePartitionStreamProvider; @@ -239,7 +240,8 @@ private void testOperatorStatesSnapshotRestoreInternal(final int mode) throws Ex @Override public InternalTimeServiceManager create( TaskIOMetricGroup taskIOMetricGroup, - CheckpointableKeyedStateBackend keyedStatedBackend, + PriorityQueueSetFactory factory, + KeyGroupRange keyGroupRange, ClassLoader userClassloader, KeyContext keyContext, ProcessingTimeService processingTimeService,