Skip to content

Commit

Permalink
[FLINK-36376] More friendly error or warn message for misconfigured s…
Browse files Browse the repository at this point in the history
…tatebackend with async state processing
  • Loading branch information
Zakelly committed Sep 26, 2024
1 parent 82582b3 commit 6f27db8
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
Expand All @@ -48,6 +49,9 @@
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -63,6 +67,9 @@
public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStreamOperator<OUT>
implements AsyncStateProcessingOperator {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractAsyncStateStreamOperator.class);

private AsyncExecutionController asyncExecutionController;

private RecordContext currentProcessingContext;
Expand Down Expand Up @@ -98,6 +105,13 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
inFlightRecordsLimit,
asyncKeyedStateBackend);
asyncKeyedStateBackend.setup(asyncExecutionController);
if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) {
LOG.warn(
"A normal KeyedStateBackend({}) is used when enabling the async state "
+ "processing. Parallel asynchronous processing does not work. "
+ "All state access will be processed synchronously.",
stateHandler.getKeyedStateBackend());
}
} else if (stateHandler.getKeyedStateBackend() != null) {
throw new UnsupportedOperationException(
"Current State Backend doesn't support async access, AsyncExecutionController could not work");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
Expand All @@ -45,6 +46,9 @@
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -59,6 +63,9 @@
public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractStreamOperatorV2<OUT>
implements AsyncStateProcessingOperator {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractAsyncStateStreamOperatorV2.class);

private final Environment environment;
private AsyncExecutionController asyncExecutionController;

Expand Down Expand Up @@ -95,6 +102,13 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana
inFlightRecordsLimit,
asyncKeyedStateBackend);
asyncKeyedStateBackend.setup(asyncExecutionController);
if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) {
LOG.warn(
"A normal KeyedStateBackend({}) is used when enabling the async state "
+ "processing. Parallel asynchronous processing does not work. "
+ "All state access will be processed synchronously.",
stateHandler.getKeyedStateBackend());
}
} else if (stateHandler.getKeyedStateBackend() != null) {
throw new UnsupportedOperationException(
"Current State Backend doesn't support async access, AsyncExecutionController could not work");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext}, for streaming
Expand All @@ -72,6 +73,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
private final ProcessingTimeService processingTimeService;
private @Nullable KeyedStateStore keyedStateStore;
private @Nullable KeyedStateStoreV2 keyedStateStoreV2;
private SupportKeyedStateApiSet supportKeyedStateApiSet;
private final ExternalResourceInfoProvider externalResourceInfoProvider;

@VisibleForTesting
Expand Down Expand Up @@ -110,6 +112,8 @@ public StreamingRuntimeContext(
this.operatorUniqueID = checkNotNull(operatorID).toString();
this.processingTimeService = processingTimeService;
this.keyedStateStore = keyedStateStore;
// By default, support state v1
this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V1;
this.externalResourceInfoProvider = externalResourceInfoProvider;
}

Expand All @@ -119,6 +123,8 @@ public void setKeyedStateStore(@Nullable KeyedStateStore keyedStateStore) {

public void setKeyedStateStoreV2(@Nullable KeyedStateStoreV2 keyedStateStoreV2) {
this.keyedStateStoreV2 = keyedStateStoreV2;
// Only if the keyedStateStoreV2 is set, this context is switch to support state v2
this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -245,6 +251,12 @@ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> statePro
private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(
StateDescriptor<?, ?> stateDescriptor) {
checkNotNull(stateDescriptor, "The state properties must not be null");
checkState(
supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1,
"Current operator integrates the logic of async processing, "
+ "thus only support state v2 APIs. Please use StateDescriptor under "
+ "'org.apache.flink.runtime.state.v2' or make current operator extend "
+ "from AbstractStreamOperator/AbstractStreamOperatorV2.");
checkNotNull(
keyedStateStore,
String.format(
Expand Down Expand Up @@ -294,6 +306,12 @@ org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingSta
private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2(
org.apache.flink.runtime.state.v2.StateDescriptor<?> stateDescriptor) {
checkNotNull(stateDescriptor, "The state properties must not be null");
checkState(
supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2,
"Current operator does not integrate the logic of async processing, "
+ "thus only support state v1 APIs. Please use StateDescriptor under "
+ "'org.apache.flink.runtime.state' or make current operator extend from "
+ "AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2.");
checkNotNull(
keyedStateStoreV2,
String.format(
Expand All @@ -312,4 +330,13 @@ private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2(
public boolean isCheckpointingEnabled() {
return streamConfig.isCheckpointingEnabled();
}

/**
* Currently, we only support one keyed state api set. This is determined by the stream
* operator. This will be set via {@link #setKeyedStateStore} or {@link #setKeyedStateStoreV2}.
*/
private enum SupportKeyedStateApiSet {
STATE_V1,
STATE_V2
}
}

0 comments on commit 6f27db8

Please sign in to comment.