diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index afe380158a23ee..1e4b39326e5afc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.v2.StateDescriptor; +import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; @@ -48,6 +49,9 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -63,6 +67,9 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator implements AsyncStateProcessingOperator { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAsyncStateStreamOperator.class); + private AsyncExecutionController asyncExecutionController; private RecordContext currentProcessingContext; @@ -98,6 +105,13 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) inFlightRecordsLimit, asyncKeyedStateBackend); asyncKeyedStateBackend.setup(asyncExecutionController); + if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) { + LOG.warn( + "A normal KeyedStateBackend({}) is used when enabling the async state " + + "processing. Parallel asynchronous processing does not work. " + + "All state access will be processed synchronously.", + stateHandler.getKeyedStateBackend()); + } } else if (stateHandler.getKeyedStateBackend() != null) { throw new UnsupportedOperationException( "Current State Backend doesn't support async access, AsyncExecutionController could not work"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index f3322a463015cd..01e86a378a01e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.v2.StateDescriptor; +import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -45,6 +46,9 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; import static org.apache.flink.util.Preconditions.checkState; @@ -59,6 +63,9 @@ public abstract class AbstractAsyncStateStreamOperatorV2 extends AbstractStreamOperatorV2 implements AsyncStateProcessingOperator { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAsyncStateStreamOperatorV2.class); + private final Environment environment; private AsyncExecutionController asyncExecutionController; @@ -95,6 +102,13 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana inFlightRecordsLimit, asyncKeyedStateBackend); asyncKeyedStateBackend.setup(asyncExecutionController); + if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) { + LOG.warn( + "A normal KeyedStateBackend({}) is used when enabling the async state " + + "processing. Parallel asynchronous processing does not work. " + + "All state access will be processed synchronously.", + stateHandler.getKeyedStateBackend()); + } } else if (stateHandler.getKeyedStateBackend() != null) { throw new UnsupportedOperationException( "Current State Backend doesn't support async access, AsyncExecutionController could not work"); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index a3db48cbd7188d..32a895170a1224 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -56,6 +56,7 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext}, for streaming @@ -72,6 +73,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private final ProcessingTimeService processingTimeService; private @Nullable KeyedStateStore keyedStateStore; private @Nullable KeyedStateStoreV2 keyedStateStoreV2; + private SupportKeyedStateApiSet supportKeyedStateApiSet; private final ExternalResourceInfoProvider externalResourceInfoProvider; @VisibleForTesting @@ -110,6 +112,8 @@ public StreamingRuntimeContext( this.operatorUniqueID = checkNotNull(operatorID).toString(); this.processingTimeService = processingTimeService; this.keyedStateStore = keyedStateStore; + // By default, support state v1 + this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V1; this.externalResourceInfoProvider = externalResourceInfoProvider; } @@ -119,6 +123,8 @@ public void setKeyedStateStore(@Nullable KeyedStateStore keyedStateStore) { public void setKeyedStateStoreV2(@Nullable KeyedStateStoreV2 keyedStateStoreV2) { this.keyedStateStoreV2 = keyedStateStoreV2; + // Only if the keyedStateStoreV2 is set, this context is switch to support state v2 + this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2; } // ------------------------------------------------------------------------ @@ -245,6 +251,12 @@ public MapState getMapState(MapStateDescriptor statePro private KeyedStateStore checkPreconditionsAndGetKeyedStateStore( StateDescriptor stateDescriptor) { checkNotNull(stateDescriptor, "The state properties must not be null"); + checkState( + supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1, + "Current operator integrates the logic of async processing, " + + "thus only support state v2 APIs. Please use StateDescriptor under " + + "'org.apache.flink.runtime.state.v2' or make current operator extend " + + "from AbstractStreamOperator/AbstractStreamOperatorV2."); checkNotNull( keyedStateStore, String.format( @@ -294,6 +306,12 @@ org.apache.flink.api.common.state.v2.AggregatingState getAggregatingSta private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2( org.apache.flink.runtime.state.v2.StateDescriptor stateDescriptor) { checkNotNull(stateDescriptor, "The state properties must not be null"); + checkState( + supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2, + "Current operator does not integrate the logic of async processing, " + + "thus only support state v1 APIs. Please use StateDescriptor under " + + "'org.apache.flink.runtime.state' or make current operator extend from " + + "AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2."); checkNotNull( keyedStateStoreV2, String.format( @@ -312,4 +330,13 @@ private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2( public boolean isCheckpointingEnabled() { return streamConfig.isCheckpointingEnabled(); } + + /** + * Currently, we only support one keyed state api set. This is determined by the stream + * operator. This will be set via {@link #setKeyedStateStore} or {@link #setKeyedStateStoreV2}. + */ + private enum SupportKeyedStateApiSet { + STATE_V1, + STATE_V2 + } }