Skip to content

Commit

Permalink
[FLINK-36944][State] Introduce serializer based state descriptor cons…
Browse files Browse the repository at this point in the history
…tructors
  • Loading branch information
Zakelly committed Dec 22, 2024
1 parent 5d00028 commit ec3a9c3
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -72,6 +73,21 @@ public AggregatingStateDescriptor(
this.aggregateFunction = checkNotNull(aggregateFunction);
}

/**
* Create a new {@code ReducingStateDescriptor} with the given stateId and the given type
* serializer.
*
* @param stateId The (unique) stateId for the state.
* @param serializer The type serializer for accumulator.
*/
public AggregatingStateDescriptor(
@Nonnull String stateId,
@Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction,
@Nonnull TypeSerializer<ACC> serializer) {
super(stateId, serializer);
this.aggregateFunction = checkNotNull(aggregateFunction);
}

/** Returns the Aggregate function for this state. */
public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
return aggregateFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import javax.annotation.Nonnull;

/**
* {@link StateDescriptor} for {@link ListState}. This can be used to create partitioned list state
Expand All @@ -36,7 +39,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<T> {
* @param stateId The (unique) stateId for the state.
* @param typeInfo The type of the values in the state.
*/
public ListStateDescriptor(String stateId, TypeInformation<T> typeInfo) {
public ListStateDescriptor(@Nonnull String stateId, @Nonnull TypeInformation<T> typeInfo) {
super(stateId, typeInfo);
}

Expand All @@ -49,10 +52,23 @@ public ListStateDescriptor(String stateId, TypeInformation<T> typeInfo) {
* TypeSerializer}.
*/
public ListStateDescriptor(
String stateId, TypeInformation<T> typeInfo, SerializerConfig serializerConfig) {
@Nonnull String stateId,
@Nonnull TypeInformation<T> typeInfo,
SerializerConfig serializerConfig) {
super(stateId, typeInfo, serializerConfig);
}

/**
* Create a new {@code ListStateDescriptor} with the given stateId and the given type
* serializer.
*
* @param stateId The (unique) stateId for the state.
* @param serializer The type serializer for the values in the state.
*/
public ListStateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer<T> serializer) {
super(stateId, serializer);
}

@Override
public Type getType() {
return Type.LIST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
* @param <UV> The type of the values that the map state can hold.
*/
public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> {
/** The type of the user key in the state. */
@Nonnull private final TypeInformation<UK> userKeyTypeInfo;

/** The serializer for the user key. */
@Nonnull private final TypeSerializer<UK> userKeySerializer;
Expand All @@ -48,9 +46,9 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> {
* @param userValueTypeInfo The type of the values in the state.
*/
public MapStateDescriptor(
String stateId,
TypeInformation<UK> userKeyTypeInfo,
TypeInformation<UV> userValueTypeInfo) {
@Nonnull String stateId,
@Nonnull TypeInformation<UK> userKeyTypeInfo,
@Nonnull TypeInformation<UV> userValueTypeInfo) {
this(stateId, userKeyTypeInfo, userValueTypeInfo, new SerializerConfigImpl());
}

Expand All @@ -64,18 +62,27 @@ public MapStateDescriptor(
* TypeSerializer}.
*/
public MapStateDescriptor(
String stateId,
TypeInformation<UK> userKeyTypeInfo,
TypeInformation<UV> userValueTypeInfo,
@Nonnull String stateId,
@Nonnull TypeInformation<UK> userKeyTypeInfo,
@Nonnull TypeInformation<UV> userValueTypeInfo,
SerializerConfig serializerConfig) {
super(stateId, userValueTypeInfo, serializerConfig);
this.userKeyTypeInfo = userKeyTypeInfo;
this.userKeySerializer = userKeyTypeInfo.createSerializer(serializerConfig);
}

@Nonnull
public TypeInformation<UK> getUserKeyType() {
return userKeyTypeInfo;
/**
* Create a new {@code MapStateDescriptor} with the given stateId and the given type serializer.
*
* @param stateId The (unique) stateId for the state.
* @param userKeySerializer The serializer for the user keys in the state.
* @param userValueSerializer The serializer for the user values in the state.
*/
public MapStateDescriptor(
@Nonnull String stateId,
@Nonnull TypeSerializer<UK> userKeySerializer,
@Nonnull TypeSerializer<UV> userValueSerializer) {
super(stateId, userValueSerializer);
this.userKeySerializer = userKeySerializer;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import javax.annotation.Nonnull;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -41,7 +44,9 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<T> {
* @param typeInfo The type of the values in the state.
*/
public ReducingStateDescriptor(
String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) {
@Nonnull String name,
@Nonnull ReduceFunction<T> reduceFunction,
@Nonnull TypeInformation<T> typeInfo) {
super(name, typeInfo);
this.reduceFunction = checkNotNull(reduceFunction);
}
Expand All @@ -54,14 +59,29 @@ public ReducingStateDescriptor(
* @param typeInfo The type of the values in the state.
*/
public ReducingStateDescriptor(
String name,
ReduceFunction<T> reduceFunction,
TypeInformation<T> typeInfo,
@Nonnull String name,
@Nonnull ReduceFunction<T> reduceFunction,
@Nonnull TypeInformation<T> typeInfo,
SerializerConfig serializerConfig) {
super(name, typeInfo, serializerConfig);
this.reduceFunction = checkNotNull(reduceFunction);
}

/**
* Create a new {@code ReducingStateDescriptor} with the given stateId and the given type
* serializer.
*
* @param stateId The (unique) stateId for the state.
* @param serializer The type serializer for the values in the state.
*/
public ReducingStateDescriptor(
@Nonnull String stateId,
@Nonnull ReduceFunction<T> reduceFunction,
@Nonnull TypeSerializer<T> serializer) {
super(stateId, serializer);
this.reduceFunction = checkNotNull(reduceFunction);
}

/** Returns the reduce function to be used for the reducing state. */
public ReduceFunction<T> getReduceFunction() {
return reduceFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ protected StateDescriptor(
this.typeSerializer = typeInfo.createSerializer(serializerConfig);
}

/**
* Create a new {@code StateDescriptor} with the given stateId and the given type serializer.
*
* @param stateId The stateId of the {@code StateDescriptor}.
* @param serializer The type serializer for the values in the state.
*/
protected StateDescriptor(@Nonnull String stateId, TypeSerializer<T> serializer) {
this.stateId = checkNotNull(stateId, "stateId must not be null");
this.typeSerializer = checkNotNull(serializer, "type serializer must not be null");
}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import javax.annotation.Nonnull;

/**
* {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned value
Expand All @@ -36,7 +39,7 @@ public class ValueStateDescriptor<T> extends StateDescriptor<T> {
* @param stateId The (unique) stateId for the state.
* @param typeInfo The type of the values in the state.
*/
public ValueStateDescriptor(String stateId, TypeInformation<T> typeInfo) {
public ValueStateDescriptor(@Nonnull String stateId, @Nonnull TypeInformation<T> typeInfo) {
super(stateId, typeInfo);
}

Expand All @@ -49,10 +52,23 @@ public ValueStateDescriptor(String stateId, TypeInformation<T> typeInfo) {
* TypeSerializer}.
*/
public ValueStateDescriptor(
String stateId, TypeInformation<T> typeInfo, SerializerConfig serializerConfig) {
@Nonnull String stateId,
@Nonnull TypeInformation<T> typeInfo,
SerializerConfig serializerConfig) {
super(stateId, typeInfo, serializerConfig);
}

/**
* Create a new {@code ValueStateDescriptor} with the given stateId and the given type
* serializer.
*
* @param stateId The (unique) stateId for the state.
* @param serializer The type serializer for the values in the state.
*/
public ValueStateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer<T> serializer) {
super(stateId, serializer);
}

@Override
public Type getType() {
return Type.VALUE;
Expand Down
Loading

0 comments on commit ec3a9c3

Please sign in to comment.