diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java index 42ff2dd24b1fd..977807fd734ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import javax.annotation.Nonnull; @@ -72,6 +73,21 @@ public AggregatingStateDescriptor( this.aggregateFunction = checkNotNull(aggregateFunction); } + /** + * Create a new {@code ReducingStateDescriptor} with the given stateId and the given type + * serializer. + * + * @param stateId The (unique) stateId for the state. + * @param serializer The type serializer for accumulator. + */ + public AggregatingStateDescriptor( + @Nonnull String stateId, + @Nonnull AggregateFunction aggregateFunction, + @Nonnull TypeSerializer serializer) { + super(stateId, serializer); + this.aggregateFunction = checkNotNull(aggregateFunction); + } + /** Returns the Aggregate function for this state. */ public AggregateFunction getAggregateFunction() { return aggregateFunction; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java index 7e3a975ccd93c..bd63f13a47ac8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java @@ -21,6 +21,9 @@ import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.state.v2.ListState; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import javax.annotation.Nonnull; /** * {@link StateDescriptor} for {@link ListState}. This can be used to create partitioned list state @@ -36,7 +39,7 @@ public class ListStateDescriptor extends StateDescriptor { * @param stateId The (unique) stateId for the state. * @param typeInfo The type of the values in the state. */ - public ListStateDescriptor(String stateId, TypeInformation typeInfo) { + public ListStateDescriptor(@Nonnull String stateId, @Nonnull TypeInformation typeInfo) { super(stateId, typeInfo); } @@ -49,10 +52,23 @@ public ListStateDescriptor(String stateId, TypeInformation typeInfo) { * TypeSerializer}. */ public ListStateDescriptor( - String stateId, TypeInformation typeInfo, SerializerConfig serializerConfig) { + @Nonnull String stateId, + @Nonnull TypeInformation typeInfo, + SerializerConfig serializerConfig) { super(stateId, typeInfo, serializerConfig); } + /** + * Create a new {@code ListStateDescriptor} with the given stateId and the given type + * serializer. + * + * @param stateId The (unique) stateId for the state. + * @param serializer The type serializer for the values in the state. + */ + public ListStateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer serializer) { + super(stateId, serializer); + } + @Override public Type getType() { return Type.LIST; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java index 64138e340ddf0..5c98e12736b2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java @@ -34,8 +34,6 @@ * @param The type of the values that the map state can hold. */ public class MapStateDescriptor extends StateDescriptor { - /** The type of the user key in the state. */ - @Nonnull private final TypeInformation userKeyTypeInfo; /** The serializer for the user key. */ @Nonnull private final TypeSerializer userKeySerializer; @@ -48,9 +46,9 @@ public class MapStateDescriptor extends StateDescriptor { * @param userValueTypeInfo The type of the values in the state. */ public MapStateDescriptor( - String stateId, - TypeInformation userKeyTypeInfo, - TypeInformation userValueTypeInfo) { + @Nonnull String stateId, + @Nonnull TypeInformation userKeyTypeInfo, + @Nonnull TypeInformation userValueTypeInfo) { this(stateId, userKeyTypeInfo, userValueTypeInfo, new SerializerConfigImpl()); } @@ -64,18 +62,27 @@ public MapStateDescriptor( * TypeSerializer}. */ public MapStateDescriptor( - String stateId, - TypeInformation userKeyTypeInfo, - TypeInformation userValueTypeInfo, + @Nonnull String stateId, + @Nonnull TypeInformation userKeyTypeInfo, + @Nonnull TypeInformation userValueTypeInfo, SerializerConfig serializerConfig) { super(stateId, userValueTypeInfo, serializerConfig); - this.userKeyTypeInfo = userKeyTypeInfo; this.userKeySerializer = userKeyTypeInfo.createSerializer(serializerConfig); } - @Nonnull - public TypeInformation getUserKeyType() { - return userKeyTypeInfo; + /** + * Create a new {@code MapStateDescriptor} with the given stateId and the given type serializer. + * + * @param stateId The (unique) stateId for the state. + * @param userKeySerializer The serializer for the user keys in the state. + * @param userValueSerializer The serializer for the user values in the state. + */ + public MapStateDescriptor( + @Nonnull String stateId, + @Nonnull TypeSerializer userKeySerializer, + @Nonnull TypeSerializer userValueSerializer) { + super(stateId, userValueSerializer); + this.userKeySerializer = userKeySerializer; } @Nonnull diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java index 564930e66f390..d1b85162ee099 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java @@ -21,6 +21,9 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import javax.annotation.Nonnull; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -41,7 +44,9 @@ public class ReducingStateDescriptor extends StateDescriptor { * @param typeInfo The type of the values in the state. */ public ReducingStateDescriptor( - String name, ReduceFunction reduceFunction, TypeInformation typeInfo) { + @Nonnull String name, + @Nonnull ReduceFunction reduceFunction, + @Nonnull TypeInformation typeInfo) { super(name, typeInfo); this.reduceFunction = checkNotNull(reduceFunction); } @@ -54,14 +59,29 @@ public ReducingStateDescriptor( * @param typeInfo The type of the values in the state. */ public ReducingStateDescriptor( - String name, - ReduceFunction reduceFunction, - TypeInformation typeInfo, + @Nonnull String name, + @Nonnull ReduceFunction reduceFunction, + @Nonnull TypeInformation typeInfo, SerializerConfig serializerConfig) { super(name, typeInfo, serializerConfig); this.reduceFunction = checkNotNull(reduceFunction); } + /** + * Create a new {@code ReducingStateDescriptor} with the given stateId and the given type + * serializer. + * + * @param stateId The (unique) stateId for the state. + * @param serializer The type serializer for the values in the state. + */ + public ReducingStateDescriptor( + @Nonnull String stateId, + @Nonnull ReduceFunction reduceFunction, + @Nonnull TypeSerializer serializer) { + super(stateId, serializer); + this.reduceFunction = checkNotNull(reduceFunction); + } + /** Returns the reduce function to be used for the reducing state. */ public ReduceFunction getReduceFunction() { return reduceFunction; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java index 24ad4e3aee34e..0d04924fcf19e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java @@ -89,6 +89,17 @@ protected StateDescriptor( this.typeSerializer = typeInfo.createSerializer(serializerConfig); } + /** + * Create a new {@code StateDescriptor} with the given stateId and the given type serializer. + * + * @param stateId The stateId of the {@code StateDescriptor}. + * @param serializer The type serializer for the values in the state. + */ + protected StateDescriptor(@Nonnull String stateId, TypeSerializer serializer) { + this.stateId = checkNotNull(stateId, "stateId must not be null"); + this.typeSerializer = checkNotNull(serializer, "type serializer must not be null"); + } + // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java index aa63e76ab5b1c..64ce817defdff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java @@ -21,6 +21,9 @@ import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import javax.annotation.Nonnull; /** * {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned value @@ -36,7 +39,7 @@ public class ValueStateDescriptor extends StateDescriptor { * @param stateId The (unique) stateId for the state. * @param typeInfo The type of the values in the state. */ - public ValueStateDescriptor(String stateId, TypeInformation typeInfo) { + public ValueStateDescriptor(@Nonnull String stateId, @Nonnull TypeInformation typeInfo) { super(stateId, typeInfo); } @@ -49,10 +52,23 @@ public ValueStateDescriptor(String stateId, TypeInformation typeInfo) { * TypeSerializer}. */ public ValueStateDescriptor( - String stateId, TypeInformation typeInfo, SerializerConfig serializerConfig) { + @Nonnull String stateId, + @Nonnull TypeInformation typeInfo, + SerializerConfig serializerConfig) { super(stateId, typeInfo, serializerConfig); } + /** + * Create a new {@code ValueStateDescriptor} with the given stateId and the given type + * serializer. + * + * @param stateId The (unique) stateId for the state. + * @param serializer The type serializer for the values in the state. + */ + public ValueStateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer serializer) { + super(stateId, serializer); + } + @Override public Type getType() { return Type.VALUE; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java index 1f3050734fbe6..e4b946dbeed36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java @@ -18,10 +18,8 @@ package org.apache.flink.runtime.state.v2.ttl; -import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.v2.State; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeSerializer; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -162,10 +160,8 @@ private IS createValueState() throws Exception { ? (ValueStateDescriptor>) stateDesc : new ValueStateDescriptor<>( stateDesc.getStateId(), - new TtlTypeInformation<>( - new TtlSerializer<>( - LongSerializer.INSTANCE, - stateDesc.getSerializer()))); + new TtlSerializer<>( + LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor)); } @@ -177,10 +173,8 @@ private IS createListState() throws Exception { ? (ListStateDescriptor>) stateDesc : new ListStateDescriptor<>( stateDesc.getStateId(), - new TtlTypeInformation<>( - new TtlSerializer<>( - LongSerializer.INSTANCE, - listStateDesc.getSerializer()))); + new TtlSerializer<>( + LongSerializer.INSTANCE, listStateDesc.getSerializer())); return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor)); } @@ -192,11 +186,9 @@ private IS createMapState() throws Exception { ? (MapStateDescriptor>) stateDesc : new MapStateDescriptor<>( stateDesc.getStateId(), - mapStateDesc.getUserKeyType(), - new TtlTypeInformation<>( - new TtlSerializer<>( - LongSerializer.INSTANCE, - mapStateDesc.getSerializer()))); + mapStateDesc.getUserKeySerializer(), + new TtlSerializer<>( + LongSerializer.INSTANCE, mapStateDesc.getSerializer())); return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor)); } @@ -212,10 +204,8 @@ private IS createReducingState() throws Exception { reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider), - new TtlTypeInformation<>( - new TtlSerializer<>( - LongSerializer.INSTANCE, - stateDesc.getSerializer()))); + new TtlSerializer<>( + LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlReducingState<>(createTtlStateContext(ttlDescriptor)); } @@ -232,10 +222,8 @@ private IS createAggregatingState() throws Exception { : new AggregatingStateDescriptor<>( stateDesc.getStateId(), ttlAggregateFunction, - new TtlTypeInformation<>( - new TtlSerializer<>( - LongSerializer.INSTANCE, - stateDesc.getSerializer()))); + new TtlSerializer<>( + LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlAggregatingState<>( createTtlStateContext(ttlDescriptor), ttlAggregateFunction); @@ -261,79 +249,6 @@ TtlStateContext createTtlStateContext(StateDescriptor ttlDescripto () -> {}); } - public static class TtlTypeInformation extends TypeInformation> { - - Class typeClass; - - TypeSerializer> typeSerializer; - - TtlTypeInformation(TypeSerializer> typeSerializer) { - this.typeSerializer = typeSerializer; - typeClass = TtlValue.class; - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 2; - } - - @Override - public int getTotalFields() { - return 2; - } - - @Override - public Class> getTypeClass() { - return (Class>) typeClass; - } - - @Override - public boolean isKeyType() { - return false; - } - - @Override - public TypeSerializer> createSerializer(SerializerConfig config) { - return typeSerializer; - } - - @Override - public String toString() { - return "TtlTypeInformation{}"; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - return typeSerializer.equals(((TtlTypeInformation) obj).typeSerializer); - } - - @Override - public int hashCode() { - return typeSerializer.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof TtlTypeInformation; - } - } - /** * Serializer for user state value with TTL. Visibility is public for usage with external tools. */ diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java index b9d08f504d33d..db768bcc18fab 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeutils.TypeInformationTestBase; import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; -import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory; import org.apache.flink.table.dataview.ListViewTypeInfo; import org.apache.flink.table.dataview.MapViewTypeInfo; import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo; @@ -83,8 +82,7 @@ public void testTypeInfoTestCoverage() { BigDecimalTypeInfo.class.getName(), DecimalDataTypeInfo.class.getName(), GenericRecordAvroTypeInfo.class.getName(), - AvroTypeInfo.class.getName(), - TtlStateFactory.TtlTypeInformation.class.getName()); + AvroTypeInfo.class.getName()); // check if a test exists for each type information for (Class typeInfo : typeInfos) {