diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index afe380158a23e..1e4b39326e5af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.v2.StateDescriptor; +import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; @@ -48,6 +49,9 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -63,6 +67,9 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator implements AsyncStateProcessingOperator { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAsyncStateStreamOperator.class); + private AsyncExecutionController asyncExecutionController; private RecordContext currentProcessingContext; @@ -98,6 +105,13 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) inFlightRecordsLimit, asyncKeyedStateBackend); asyncKeyedStateBackend.setup(asyncExecutionController); + if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) { + LOG.warn( + "A normal KeyedStateBackend({}) is used when enabling the async state " + + "processing. Parallel asynchronous processing does not work. " + + "All state access will be processed synchronously.", + stateHandler.getKeyedStateBackend()); + } } else if (stateHandler.getKeyedStateBackend() != null) { throw new UnsupportedOperationException( "Current State Backend doesn't support async access, AsyncExecutionController could not work"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index f3322a463015c..01e86a378a01e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.v2.StateDescriptor; +import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -45,6 +46,9 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; import static org.apache.flink.util.Preconditions.checkState; @@ -59,6 +63,9 @@ public abstract class AbstractAsyncStateStreamOperatorV2 extends AbstractStreamOperatorV2 implements AsyncStateProcessingOperator { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAsyncStateStreamOperatorV2.class); + private final Environment environment; private AsyncExecutionController asyncExecutionController; @@ -95,6 +102,13 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana inFlightRecordsLimit, asyncKeyedStateBackend); asyncKeyedStateBackend.setup(asyncExecutionController); + if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) { + LOG.warn( + "A normal KeyedStateBackend({}) is used when enabling the async state " + + "processing. Parallel asynchronous processing does not work. " + + "All state access will be processed synchronously.", + stateHandler.getKeyedStateBackend()); + } } else if (stateHandler.getKeyedStateBackend() != null) { throw new UnsupportedOperationException( "Current State Backend doesn't support async access, AsyncExecutionController could not work"); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index a3db48cbd7188..32a895170a122 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -56,6 +56,7 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext}, for streaming @@ -72,6 +73,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private final ProcessingTimeService processingTimeService; private @Nullable KeyedStateStore keyedStateStore; private @Nullable KeyedStateStoreV2 keyedStateStoreV2; + private SupportKeyedStateApiSet supportKeyedStateApiSet; private final ExternalResourceInfoProvider externalResourceInfoProvider; @VisibleForTesting @@ -110,6 +112,8 @@ public StreamingRuntimeContext( this.operatorUniqueID = checkNotNull(operatorID).toString(); this.processingTimeService = processingTimeService; this.keyedStateStore = keyedStateStore; + // By default, support state v1 + this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V1; this.externalResourceInfoProvider = externalResourceInfoProvider; } @@ -119,6 +123,8 @@ public void setKeyedStateStore(@Nullable KeyedStateStore keyedStateStore) { public void setKeyedStateStoreV2(@Nullable KeyedStateStoreV2 keyedStateStoreV2) { this.keyedStateStoreV2 = keyedStateStoreV2; + // Only if the keyedStateStoreV2 is set, this context is switch to support state v2 + this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2; } // ------------------------------------------------------------------------ @@ -245,6 +251,12 @@ public MapState getMapState(MapStateDescriptor statePro private KeyedStateStore checkPreconditionsAndGetKeyedStateStore( StateDescriptor stateDescriptor) { checkNotNull(stateDescriptor, "The state properties must not be null"); + checkState( + supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1, + "Current operator integrates the logic of async processing, " + + "thus only support state v2 APIs. Please use StateDescriptor under " + + "'org.apache.flink.runtime.state.v2' or make current operator extend " + + "from AbstractStreamOperator/AbstractStreamOperatorV2."); checkNotNull( keyedStateStore, String.format( @@ -294,6 +306,12 @@ org.apache.flink.api.common.state.v2.AggregatingState getAggregatingSta private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2( org.apache.flink.runtime.state.v2.StateDescriptor stateDescriptor) { checkNotNull(stateDescriptor, "The state properties must not be null"); + checkState( + supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2, + "Current operator does not integrate the logic of async processing, " + + "thus only support state v1 APIs. Please use StateDescriptor under " + + "'org.apache.flink.runtime.state' or make current operator extend from " + + "AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2."); checkNotNull( keyedStateStoreV2, String.format( @@ -312,4 +330,13 @@ private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2( public boolean isCheckpointingEnabled() { return streamConfig.isCheckpointingEnabled(); } + + /** + * Currently, we only support one keyed state api set. This is determined by the stream + * operator. This will be set via {@link #setKeyedStateStore} or {@link #setKeyedStateStoreV2}. + */ + private enum SupportKeyedStateApiSet { + STATE_V1, + STATE_V2 + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index d0eb27d065989..dd4b0ba783b67 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -96,7 +96,7 @@ void testValueStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false); ValueStateDescriptor descr = new ValueStateDescriptor<>("name", TaskInfo.class); context.getState(descr); @@ -117,7 +117,7 @@ void testReducingStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false); @SuppressWarnings("unchecked") ReduceFunction reducer = (ReduceFunction) mock(ReduceFunction.class); @@ -143,7 +143,7 @@ void testAggregatingStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false); @SuppressWarnings("unchecked") AggregateFunction aggregate = @@ -172,7 +172,7 @@ void testListStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false); ListStateDescriptor descr = new ListStateDescriptor<>("name", TaskInfo.class); context.getListState(descr); @@ -213,7 +213,7 @@ void testMapStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false); MapStateDescriptor descr = new MapStateDescriptor<>("name", String.class, TaskInfo.class); @@ -257,7 +257,7 @@ void testV2ValueStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true); org.apache.flink.runtime.state.v2.ValueStateDescriptor descr = new org.apache.flink.runtime.state.v2.ValueStateDescriptor<>( "name", TypeInformation.of(TaskInfo.class), serializerConfig); @@ -281,7 +281,7 @@ void testV2ListStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true); org.apache.flink.runtime.state.v2.ListStateDescriptor descr = new org.apache.flink.runtime.state.v2.ListStateDescriptor<>( "name", TypeInformation.of(TaskInfo.class), serializerConfig); @@ -305,7 +305,7 @@ void testV2MapStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true); org.apache.flink.runtime.state.v2.MapStateDescriptor descr = new org.apache.flink.runtime.state.v2.MapStateDescriptor<>( "name", @@ -333,7 +333,7 @@ void testV2ReducingStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true); @SuppressWarnings("unchecked") ReduceFunction reducer = (ReduceFunction) mock(ReduceFunction.class); @@ -363,7 +363,7 @@ void testV2AggregatingStateInstantiation() throws Exception { final AtomicReference descriptorCapture = new AtomicReference<>(); - StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true); @SuppressWarnings("unchecked") AggregateFunction aggregate = @@ -405,11 +405,13 @@ private StreamingRuntimeContext createRuntimeContext() throws Exception { } private StreamingRuntimeContext createRuntimeContext( - AtomicReference descriptorCapture, ExecutionConfig config) throws Exception { + AtomicReference descriptorCapture, ExecutionConfig config, boolean stateV2) + throws Exception { return createDescriptorCapturingMockOp( descriptorCapture, config, - MockEnvironment.builder().setExecutionConfig(config).build()) + MockEnvironment.builder().setExecutionConfig(config).build(), + stateV2) .getRuntimeContext(); } @@ -428,7 +430,8 @@ private StreamingRuntimeContext createRuntimeContext(AbstractStreamOperator o private static AbstractStreamOperator createDescriptorCapturingMockOp( final AtomicReference ref, final ExecutionConfig config, - Environment environment) + Environment environment, + boolean stateV2) throws Exception { AbstractStreamOperator operator = @@ -490,9 +493,12 @@ public TypeSerializer createSerializer( any(org.apache.flink.runtime.state.v2.StateDescriptor.class)); operator.initializeState(streamTaskStateManager); - operator.getRuntimeContext().setKeyedStateStore(keyedStateStore); - operator.getRuntimeContext() - .setKeyedStateStoreV2(new DefaultKeyedStateStoreV2(asyncKeyedStateBackend)); + if (!stateV2) { + operator.getRuntimeContext().setKeyedStateStore(keyedStateStore); + } else { + operator.getRuntimeContext() + .setKeyedStateStoreV2(new DefaultKeyedStateStoreV2(asyncKeyedStateBackend)); + } return operator; }