Skip to content

Commit

Permalink
half way -2
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Nov 11, 2024
1 parent c713d25 commit 89302a7
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 586 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public RegisteredKeyValueStateBackendMetaInfo(
StateSnapshotTransformFactory.noTransform());
}

@SuppressWarnings("unchecked")
public RegisteredKeyValueStateBackendMetaInfo(
@Nonnull StateDescriptor.Type stateType,
@Nonnull String name,
Expand All @@ -76,9 +75,7 @@ public RegisteredKeyValueStateBackendMetaInfo(
stateType,
name,
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
StateSerializerProvider.fromNewRegisteredSerializer(
(TypeSerializer<S>)
TtlAwareSerializer.wrapTtlAwareSerializer(stateSerializer)),
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer, true),
stateSnapshotTransformFactory);
}

Expand All @@ -98,12 +95,10 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot sna
StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<S>)
Preconditions.checkNotNull(
new TtlAwareSerializerSnapshotWrapper<>(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot
.CommonSerializerKeys
.VALUE_SERIALIZER))
.getTtlAwareSerializerSnapshot())),
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.VALUE_SERIALIZER)),
true),
StateSnapshotTransformFactory.noTransform());

Preconditions.checkState(
Expand Down Expand Up @@ -151,6 +146,12 @@ public TypeSerializer<S> getStateSerializer() {
return stateSerializerProvider.currentSchemaSerializer();
}

@Nonnull
public TypeSerializer<S> getTtlAwareStateSerializer() {
TypeSerializer<S> serializer = stateSerializerProvider.currentSchemaSerializer();
return (TtlAwareSerializer<S, ?>) TtlAwareSerializer.wrapTtlAwareSerializer(serializer);
}

@Nonnull
public TypeSerializerSchemaCompatibility<S> updateStateSerializer(
TypeSerializer<S> newStateSerializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializerSnapshotWrapper;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -96,9 +98,14 @@ public abstract class StateSerializerProvider<T> {
* @param <T> the type of the state.
* @return a new {@link StateSerializerProvider}.
*/
public static <T> StateSerializerProvider<T> fromPreviousSerializerSnapshot(
TypeSerializerSnapshot<T> stateSerializerSnapshot, boolean mayContainsTtl) {
return new LazilyRegisteredStateSerializerProvider<>(stateSerializerSnapshot, mayContainsTtl);
}

public static <T> StateSerializerProvider<T> fromPreviousSerializerSnapshot(
TypeSerializerSnapshot<T> stateSerializerSnapshot) {
return new LazilyRegisteredStateSerializerProvider<>(stateSerializerSnapshot);
return fromPreviousSerializerSnapshot(stateSerializerSnapshot, false);
}

/**
Expand All @@ -112,9 +119,14 @@ public static <T> StateSerializerProvider<T> fromPreviousSerializerSnapshot(
* @param <T> the type of the state.
* @return a new {@link StateSerializerProvider}.
*/
public static <T> StateSerializerProvider<T> fromNewRegisteredSerializer(
TypeSerializer<T> registeredStateSerializer, boolean mayContainsTtl) {
return new EagerlyRegisteredStateSerializerProvider<>(registeredStateSerializer, mayContainsTtl);
}

public static <T> StateSerializerProvider<T> fromNewRegisteredSerializer(
TypeSerializer<T> registeredStateSerializer) {
return new EagerlyRegisteredStateSerializerProvider<>(registeredStateSerializer);
return fromNewRegisteredSerializer(registeredStateSerializer, false);
}

private StateSerializerProvider(@Nonnull TypeSerializer<T> stateSerializer) {
Expand Down Expand Up @@ -287,9 +299,12 @@ protected final void invalidateCurrentSchemaSerializerAccess() {
private static class LazilyRegisteredStateSerializerProvider<T>
extends StateSerializerProvider<T> {

private boolean mayContainsTtl;

LazilyRegisteredStateSerializerProvider(
TypeSerializerSnapshot<T> previousSerializerSnapshot) {
TypeSerializerSnapshot<T> previousSerializerSnapshot, boolean mayContainsTtl) {
super(Preconditions.checkNotNull(previousSerializerSnapshot));
this.mayContainsTtl = mayContainsTtl;
}

@Nonnull
Expand All @@ -303,10 +318,21 @@ public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredStat
"A serializer has already been registered for the state; re-registration is not allowed.");
}

TypeSerializerSchemaCompatibility<T> result =
newSerializer
.snapshotConfiguration()
.resolveSchemaCompatibility(previousSerializerSnapshot);
TypeSerializerSchemaCompatibility<T> result;
if (mayContainsTtl) {
result =
TtlAwareSerializer.wrapTtlAwareSerializer(newSerializer)
.snapshotConfiguration()
.resolveSchemaCompatibility(
new TtlAwareSerializerSnapshotWrapper(
previousSerializerSnapshot)
.getTtlAwareSerializerSnapshot());
} else {
result =
newSerializer
.snapshotConfiguration()
.resolveSchemaCompatibility(previousSerializerSnapshot);
}
if (result.isIncompatible()) {
invalidateCurrentSchemaSerializerAccess();
}
Expand Down Expand Up @@ -335,8 +361,11 @@ public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRest
private static class EagerlyRegisteredStateSerializerProvider<T>
extends StateSerializerProvider<T> {

EagerlyRegisteredStateSerializerProvider(TypeSerializer<T> registeredStateSerializer) {
boolean mayContainsTtl;

EagerlyRegisteredStateSerializerProvider(TypeSerializer<T> registeredStateSerializer, boolean mayContainsTtl) {
super(Preconditions.checkNotNull(registeredStateSerializer));
this.mayContainsTtl = mayContainsTtl;
}

@Nonnull
Expand All @@ -359,10 +388,21 @@ public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRest

this.previousSerializerSnapshot = previousSerializerSnapshot;

TypeSerializerSchemaCompatibility<T> result =
Preconditions.checkNotNull(registeredSerializer)
.snapshotConfiguration()
.resolveSchemaCompatibility(previousSerializerSnapshot);
TypeSerializerSchemaCompatibility<T> result;
if (mayContainsTtl) {
result =
TtlAwareSerializer.wrapTtlAwareSerializer(registeredSerializer)
.snapshotConfiguration()
.resolveSchemaCompatibility(
new TtlAwareSerializerSnapshotWrapper(
previousSerializerSnapshot)
.getTtlAwareSerializerSnapshot());
} else {
result =
Preconditions.checkNotNull(registeredSerializer)
.snapshotConfiguration()
.resolveSchemaCompatibility(previousSerializerSnapshot);
}
if (result.isIncompatible()) {
invalidateCurrentSchemaSerializerAccess();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,167 +27,30 @@
import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* This class wraps a {@link TypeSerializer} with ttl awareness. It will return true when the
* wrapped {@link TypeSerializer} is instance of {@link TtlStateFactory.TtlSerializer}. Also, it
* wraps the value migration process between TtlSerializer and non-ttl typeSerializer.
* The list version of TtlAwareSerializer.
* @param <T>
*/
public class TtlAwareSerializer<T, ORI extends TypeSerializer<T>> extends TypeSerializer<T> {
public class TtlAwareListSerializer<T> extends TtlAwareSerializer<List<T>, ListSerializer<T>> {

private final boolean isTtlEnabled;

private final ORI typeSerializer;

public TtlAwareSerializer(ORI typeSerializer) {
this.typeSerializer = typeSerializer;
this.isTtlEnabled = typeSerializer instanceof TtlStateFactory.TtlSerializer;
}

@Override
public boolean isImmutableType() {
return typeSerializer.isImmutableType();
}

@Override
public TypeSerializer<T> duplicate() {
return new TtlAwareSerializer<>(typeSerializer.duplicate());
}

@Override
public T createInstance() {
return typeSerializer.createInstance();
}

@Override
public T copy(T from) {
return typeSerializer.copy(from);
}

@Override
public T copy(T from, T reuse) {
return typeSerializer.copy(from, reuse);
}

@Override
public int getLength() {
return typeSerializer.getLength();
}

@Override
public void serialize(T record, DataOutputView target) throws IOException {
typeSerializer.serialize(record, target);
}

@Override
public T deserialize(DataInputView source) throws IOException {
return typeSerializer.deserialize(source);
}

@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return typeSerializer.deserialize(reuse, source);
public TtlAwareListSerializer(ListSerializer<T> typeSerializer) {
super(typeSerializer);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TtlAwareSerializer<?> that = (TtlAwareSerializer<?>) o;
return isTtlEnabled == that.isTtlEnabled
&& Objects.equals(typeSerializer, that.typeSerializer);
}

@Override
public int hashCode() {
return Objects.hash(isTtlEnabled, typeSerializer);
}
// ------------------------------------------------------------------------
// ListSerializer specific properties
// ------------------------------------------------------------------------

/**
* Gets the serializer for the elements of the list.
*
* @return The serializer for the elements of the list
*/
@SuppressWarnings("unchecked")
public void migrateValueFromPriorSerializer(
TtlAwareSerializer<T> priorTtlAwareSerializer,
SupplierWithException<T, IOException> inputSupplier,
DataOutputView target,
TtlTimeProvider ttlTimeProvider)
throws IOException {
T outputRecord;
if (this.isTtlEnabled()) {
outputRecord =
priorTtlAwareSerializer.isTtlEnabled
? inputSupplier.get()
: (T)
new TtlValue<>(
inputSupplier.get(),
ttlTimeProvider.currentTimestamp());
} else {
outputRecord =
priorTtlAwareSerializer.isTtlEnabled
? ((TtlValue<T>) inputSupplier.get()).getUserValue()
: inputSupplier.get();
}
this.serialize(outputRecord, target);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
typeSerializer.copy(source, target);
}

public boolean isTtlEnabled() {
return isTtlEnabled;
}

public TypeSerializer<T> getOriginalTypeSerializer() {
return typeSerializer;
}

@Override
public TypeSerializerSnapshot<T> snapshotConfiguration() {
return new TtlAwareSerializerSnapshot<>(
typeSerializer.snapshotConfiguration(), isTtlEnabled);
}

public static boolean isSerializerTtlEnabled(TypeSerializer<?> typeSerializer) {
TypeSerializer<?> wrappedTypeSerializer = wrapTtlAwareSerializer(typeSerializer);
return wrappedTypeSerializer instanceof TtlAwareSerializer
&& ((TtlAwareSerializer<?, ?>) wrappedTypeSerializer).isTtlEnabled();
}

public static boolean needTtlStateMigration(
TypeSerializer<?> previousSerializer, TypeSerializer<?> newSerializer) {
return TtlAwareSerializer.isSerializerTtlEnabled(previousSerializer)
!= TtlAwareSerializer.isSerializerTtlEnabled(newSerializer);
}

public static TypeSerializer<?> wrapTtlAwareSerializer(TypeSerializer<?> typeSerializer) {
if (typeSerializer instanceof TtlAwareSerializer) {
return typeSerializer;
}

if (typeSerializer instanceof ListSerializer) {
return ((ListSerializer<?>) typeSerializer).getElementSerializer()
instanceof TtlAwareSerializer
? typeSerializer
: new ListSerializer<>(
new TtlAwareSerializer<>(
((ListSerializer<?>) typeSerializer).getElementSerializer()));
}

if (typeSerializer instanceof MapSerializer) {
return ((MapSerializer<?, ?>) typeSerializer).getValueSerializer()
instanceof TtlAwareSerializer
? typeSerializer
: new MapSerializer<>(
((MapSerializer<?, ?>) typeSerializer).getKeySerializer(),
new TtlAwareSerializer<>(
((MapSerializer<?, ?>) typeSerializer).getValueSerializer()));
}

return new TtlAwareSerializer<>(typeSerializer);
public TtlAwareSerializer<T, TypeSerializer<T>> getElementSerializer() {
return (TtlAwareSerializer<T, TypeSerializer<T>>) TtlAwareSerializer.wrapTtlAwareSerializer(getOriginalTypeSerializer().getElementSerializer());
}
}
Loading

0 comments on commit 89302a7

Please sign in to comment.