Skip to content

Commit

Permalink
[FLINK-36376] More friendly error or warn message for misconfigured s…
Browse files Browse the repository at this point in the history
…tatebackend with async state processing
  • Loading branch information
Zakelly committed Sep 26, 2024
1 parent 82582b3 commit ada22a6
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
Expand All @@ -48,6 +49,9 @@
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -63,6 +67,9 @@
public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStreamOperator<OUT>
implements AsyncStateProcessingOperator {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractAsyncStateStreamOperator.class);

private AsyncExecutionController asyncExecutionController;

private RecordContext currentProcessingContext;
Expand Down Expand Up @@ -98,6 +105,13 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
inFlightRecordsLimit,
asyncKeyedStateBackend);
asyncKeyedStateBackend.setup(asyncExecutionController);
if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) {
LOG.warn(
"A normal KeyedStateBackend({}) is used when enabling the async state "
+ "processing. Parallel asynchronous processing does not work. "
+ "All state access will be processed synchronously.",
stateHandler.getKeyedStateBackend());
}
} else if (stateHandler.getKeyedStateBackend() != null) {
throw new UnsupportedOperationException(
"Current State Backend doesn't support async access, AsyncExecutionController could not work");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
Expand All @@ -45,6 +46,9 @@
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -59,6 +63,9 @@
public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractStreamOperatorV2<OUT>
implements AsyncStateProcessingOperator {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractAsyncStateStreamOperatorV2.class);

private final Environment environment;
private AsyncExecutionController asyncExecutionController;

Expand Down Expand Up @@ -95,6 +102,13 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana
inFlightRecordsLimit,
asyncKeyedStateBackend);
asyncKeyedStateBackend.setup(asyncExecutionController);
if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) {
LOG.warn(
"A normal KeyedStateBackend({}) is used when enabling the async state "
+ "processing. Parallel asynchronous processing does not work. "
+ "All state access will be processed synchronously.",
stateHandler.getKeyedStateBackend());
}
} else if (stateHandler.getKeyedStateBackend() != null) {
throw new UnsupportedOperationException(
"Current State Backend doesn't support async access, AsyncExecutionController could not work");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext}, for streaming
Expand All @@ -72,6 +73,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
private final ProcessingTimeService processingTimeService;
private @Nullable KeyedStateStore keyedStateStore;
private @Nullable KeyedStateStoreV2 keyedStateStoreV2;
private SupportKeyedStateApiSet supportKeyedStateApiSet;
private final ExternalResourceInfoProvider externalResourceInfoProvider;

@VisibleForTesting
Expand Down Expand Up @@ -110,6 +112,8 @@ public StreamingRuntimeContext(
this.operatorUniqueID = checkNotNull(operatorID).toString();
this.processingTimeService = processingTimeService;
this.keyedStateStore = keyedStateStore;
// By default, support state v1
this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V1;
this.externalResourceInfoProvider = externalResourceInfoProvider;
}

Expand All @@ -119,6 +123,8 @@ public void setKeyedStateStore(@Nullable KeyedStateStore keyedStateStore) {

public void setKeyedStateStoreV2(@Nullable KeyedStateStoreV2 keyedStateStoreV2) {
this.keyedStateStoreV2 = keyedStateStoreV2;
// Only if the keyedStateStoreV2 is set, this context is switch to support state v2
this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -245,6 +251,12 @@ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> statePro
private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(
StateDescriptor<?, ?> stateDescriptor) {
checkNotNull(stateDescriptor, "The state properties must not be null");
checkState(
supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1,
"Current operator integrates the logic of async processing, "
+ "thus only support state v2 APIs. Please use StateDescriptor under "
+ "'org.apache.flink.runtime.state.v2' or make current operator extend "
+ "from AbstractStreamOperator/AbstractStreamOperatorV2.");
checkNotNull(
keyedStateStore,
String.format(
Expand Down Expand Up @@ -294,6 +306,12 @@ org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingSta
private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2(
org.apache.flink.runtime.state.v2.StateDescriptor<?> stateDescriptor) {
checkNotNull(stateDescriptor, "The state properties must not be null");
checkState(
supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2,
"Current operator does not integrate the logic of async processing, "
+ "thus only support state v1 APIs. Please use StateDescriptor under "
+ "'org.apache.flink.runtime.state' or make current operator extend from "
+ "AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2.");
checkNotNull(
keyedStateStoreV2,
String.format(
Expand All @@ -312,4 +330,13 @@ private KeyedStateStoreV2 checkPreconditionsAndGetKeyedStateStoreV2(
public boolean isCheckpointingEnabled() {
return streamConfig.isCheckpointingEnabled();
}

/**
* Currently, we only support one keyed state api set. This is determined by the stream
* operator. This will be set via {@link #setKeyedStateStore} or {@link #setKeyedStateStoreV2}.
*/
private enum SupportKeyedStateApiSet {
STATE_V1,
STATE_V2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void testValueStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false);
ValueStateDescriptor<TaskInfo> descr = new ValueStateDescriptor<>("name", TaskInfo.class);
context.getState(descr);

Expand All @@ -117,7 +117,7 @@ void testReducingStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false);

@SuppressWarnings("unchecked")
ReduceFunction<TaskInfo> reducer = (ReduceFunction<TaskInfo>) mock(ReduceFunction.class);
Expand All @@ -143,7 +143,7 @@ void testAggregatingStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false);

@SuppressWarnings("unchecked")
AggregateFunction<String, TaskInfo, String> aggregate =
Expand Down Expand Up @@ -172,7 +172,7 @@ void testListStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false);

ListStateDescriptor<TaskInfo> descr = new ListStateDescriptor<>("name", TaskInfo.class);
context.getListState(descr);
Expand Down Expand Up @@ -213,7 +213,7 @@ void testMapStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, false);

MapStateDescriptor<String, TaskInfo> descr =
new MapStateDescriptor<>("name", String.class, TaskInfo.class);
Expand Down Expand Up @@ -257,7 +257,7 @@ void testV2ValueStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true);
org.apache.flink.runtime.state.v2.ValueStateDescriptor<TaskInfo> descr =
new org.apache.flink.runtime.state.v2.ValueStateDescriptor<>(
"name", TypeInformation.of(TaskInfo.class), serializerConfig);
Expand All @@ -281,7 +281,7 @@ void testV2ListStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true);
org.apache.flink.runtime.state.v2.ListStateDescriptor<TaskInfo> descr =
new org.apache.flink.runtime.state.v2.ListStateDescriptor<>(
"name", TypeInformation.of(TaskInfo.class), serializerConfig);
Expand All @@ -305,7 +305,7 @@ void testV2MapStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true);
org.apache.flink.runtime.state.v2.MapStateDescriptor<String, TaskInfo> descr =
new org.apache.flink.runtime.state.v2.MapStateDescriptor<>(
"name",
Expand Down Expand Up @@ -333,7 +333,7 @@ void testV2ReducingStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true);

@SuppressWarnings("unchecked")
ReduceFunction<TaskInfo> reducer = (ReduceFunction<TaskInfo>) mock(ReduceFunction.class);
Expand Down Expand Up @@ -363,7 +363,7 @@ void testV2AggregatingStateInstantiation() throws Exception {

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config, true);

@SuppressWarnings("unchecked")
AggregateFunction<String, TaskInfo, String> aggregate =
Expand Down Expand Up @@ -405,11 +405,13 @@ private StreamingRuntimeContext createRuntimeContext() throws Exception {
}

private StreamingRuntimeContext createRuntimeContext(
AtomicReference<Object> descriptorCapture, ExecutionConfig config) throws Exception {
AtomicReference<Object> descriptorCapture, ExecutionConfig config, boolean stateV2)
throws Exception {
return createDescriptorCapturingMockOp(
descriptorCapture,
config,
MockEnvironment.builder().setExecutionConfig(config).build())
MockEnvironment.builder().setExecutionConfig(config).build(),
stateV2)
.getRuntimeContext();
}

Expand All @@ -428,7 +430,8 @@ private StreamingRuntimeContext createRuntimeContext(AbstractStreamOperator<?> o
private static AbstractStreamOperator<?> createDescriptorCapturingMockOp(
final AtomicReference<Object> ref,
final ExecutionConfig config,
Environment environment)
Environment environment,
boolean stateV2)
throws Exception {

AbstractStreamOperator<?> operator =
Expand Down Expand Up @@ -490,9 +493,12 @@ public <T> TypeSerializer<T> createSerializer(
any(org.apache.flink.runtime.state.v2.StateDescriptor.class));

operator.initializeState(streamTaskStateManager);
operator.getRuntimeContext().setKeyedStateStore(keyedStateStore);
operator.getRuntimeContext()
.setKeyedStateStoreV2(new DefaultKeyedStateStoreV2(asyncKeyedStateBackend));
if (!stateV2) {
operator.getRuntimeContext().setKeyedStateStore(keyedStateStore);
} else {
operator.getRuntimeContext()
.setKeyedStateStoreV2(new DefaultKeyedStateStoreV2(asyncKeyedStateBackend));
}

return operator;
}
Expand Down

0 comments on commit ada22a6

Please sign in to comment.