diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java index 53e7573a1185a..f85927c4d26fb 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java @@ -87,16 +87,18 @@ public void resetExtra() { public byte[] getOrCreateSerializedKey( FunctionWithException, byte[], IOException> serializeKeyFunc) throws IOException { - if (recordContext.getExtra() != null) { - return (byte[]) recordContext.getExtra(); + byte[] serializedKey = (byte[]) recordContext.getExtra(); + if (serializedKey != null) { + return serializedKey; } synchronized (recordContext) { - if (recordContext.getExtra() == null) { - byte[] serializedKey = serializeKeyFunc.apply(this); + serializedKey = (byte[]) recordContext.getExtra(); + if (serializedKey == null) { + serializedKey = serializeKeyFunc.apply(this); recordContext.setExtra(serializedKey); } } - return (byte[]) recordContext.getExtra(); + return serializedKey; } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java index 003fad1010b61..1c799253c2fdb 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMapCheckRequest.java @@ -37,6 +37,8 @@ */ public class ForStDBMapCheckRequest extends ForStDBGetRequest { + private static final byte[] VALID_PLACEHOLDER = new byte[0]; + /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; @@ -60,7 +62,7 @@ public void process(RocksDB db) throws RocksDBException, IOException { try (RocksIterator iter = db.newIterator(getColumnFamilyHandle())) { iter.seek(key); if (iter.isValid() && startWithKeyPrefix(key, iter.key(), keyGroupPrefixBytes)) { - completeStateFuture(new byte[0]); + completeStateFuture(VALID_PLACEHOLDER); } else { completeStateFuture(null); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java index 7b5da10e9604e..d1ddfa9db54cf 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java @@ -29,6 +29,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractListState; import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.apache.flink.util.Preconditions; @@ -70,6 +72,9 @@ public class ForStListState extends AbstractListState /** The data inputStream used for value deserializer, which should be thread-safe. */ private final ThreadLocal valueDeserializerView; + /** Whether to enable the reuse of serialized key(and namespace). */ + private final boolean enableKeyReuse; + public ForStListState( StateRequestHandler stateRequestHandler, ColumnFamilyHandle columnFamily, @@ -86,6 +91,11 @@ public ForStListState( this.namespaceSerializer = ThreadLocal.withInitial(namespaceSerializerInitializer); this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer); this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer); + // We only enable key reuse for the most common namespace across all states. + this.enableKeyReuse = + (defaultNamespace instanceof VoidNamespace) + && (namespaceSerializerInitializer.get() + instanceof VoidNamespaceSerializer); } @Override @@ -95,15 +105,12 @@ public ColumnFamilyHandle getColumnFamilyHandle() { @Override public byte[] serializeKey(ContextKey contextKey) throws IOException { - return contextKey.getOrCreateSerializedKey( - ctxKey -> { - SerializedCompositeKeyBuilder builder = serializedKeyBuilder.get(); - builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); - N namespace = contextKey.getNamespace(); - return builder.buildCompositeKeyNamespace( - namespace == null ? defaultNamespace : namespace, - namespaceSerializer.get()); - }); + return ForStSerializerUtils.serializeKeyAndNamespace( + contextKey, + serializedKeyBuilder.get(), + defaultNamespace, + namespaceSerializer.get(), + enableKeyReuse); } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java index 2a2595cdfd66d..05b448bc74313 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapState.java @@ -114,21 +114,16 @@ public ColumnFamilyHandle getColumnFamilyHandle() { @Override public byte[] serializeKey(ContextKey contextKey) throws IOException { - contextKey.resetExtra(); - return contextKey.getOrCreateSerializedKey( - ctxKey -> { - SerializedCompositeKeyBuilder builder = serializedKeyBuilder.get(); - builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); - N namespace = contextKey.getNamespace(); - builder.setNamespace( - namespace == null ? defaultNamespace : namespace, - namespaceSerializer.get()); - if (contextKey.getUserKey() == null) { // value get - return builder.build(); - } - UK userKey = (UK) contextKey.getUserKey(); // map get - return builder.buildCompositeKeyUserKey(userKey, userKeySerializer); - }); + SerializedCompositeKeyBuilder builder = serializedKeyBuilder.get(); + builder.setKeyAndKeyGroup(contextKey.getRawKey(), contextKey.getKeyGroup()); + N namespace = contextKey.getNamespace(); + builder.setNamespace( + namespace == null ? defaultNamespace : namespace, namespaceSerializer.get()); + if (contextKey.getUserKey() == null) { // value get + return builder.build(); + } + UK userKey = (UK) contextKey.getUserKey(); // map get + return builder.buildCompositeKeyUserKey(userKey, userKeySerializer); } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java index 9861a2b451c66..f640da7f9c162 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractReducingState; import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; import org.apache.flink.util.Preconditions; @@ -64,6 +66,9 @@ public class ForStReducingState extends AbstractReducingState /** The data inputStream used for value deserializer, which should be thread-safe. */ private final ThreadLocal valueDeserializerView; + /** Whether to enable the reuse of serialized key(and namespace). */ + private final boolean enableKeyReuse; + public ForStReducingState( StateRequestHandler stateRequestHandler, ColumnFamilyHandle columnFamily, @@ -80,6 +85,11 @@ public ForStReducingState( this.namespaceSerializer = ThreadLocal.withInitial(namespaceSerializerInitializer); this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer); this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer); + // We only enable key reuse for the most common namespace across all states. + this.enableKeyReuse = + (defaultNamespace instanceof VoidNamespace) + && (namespaceSerializerInitializer.get() + instanceof VoidNamespaceSerializer); } @Override @@ -89,15 +99,12 @@ public ColumnFamilyHandle getColumnFamilyHandle() { @Override public byte[] serializeKey(ContextKey contextKey) throws IOException { - return contextKey.getOrCreateSerializedKey( - ctxKey -> { - SerializedCompositeKeyBuilder builder = serializedKeyBuilder.get(); - builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); - N namespace = ctxKey.getNamespace(); - return builder.buildCompositeKeyNamespace( - namespace == null ? defaultNamespace : namespace, - namespaceSerializer.get()); - }); + return ForStSerializerUtils.serializeKeyAndNamespace( + contextKey, + serializedKeyBuilder.get(), + defaultNamespace, + namespaceSerializer.get(), + enableKeyReuse); } @Override diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java new file mode 100644 index 0000000000000..d600b9b324d54 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSerializerUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; + +import java.io.IOException; + +/** A utility of serialization keys in ForSt. */ +public class ForStSerializerUtils { + + /** + * Serialize a key and namespace. No user key. + * + * @param contextKey the context key of current request + * @param builder key builder + * @param defaultNamespace default namespace of the state + * @param namespaceSerializer the namespace serializer + * @param enableKeyReuse whether to enable key reuse + */ + public static byte[] serializeKeyAndNamespace( + ContextKey contextKey, + SerializedCompositeKeyBuilder builder, + N defaultNamespace, + TypeSerializer namespaceSerializer, + boolean enableKeyReuse) + throws IOException { + N namespace = contextKey.getNamespace(); + namespace = (namespace == null ? defaultNamespace : namespace); + if (enableKeyReuse && namespace == defaultNamespace) { + // key reuse. + return contextKey.getOrCreateSerializedKey( + ctxKey -> { + builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); + return builder.buildCompositeKeyNamespace( + defaultNamespace, namespaceSerializer); + }); + } else { + // no key reuse, serialize again. + builder.setKeyAndKeyGroup(contextKey.getRawKey(), contextKey.getKeyGroup()); + return builder.buildCompositeKeyNamespace(namespace, namespaceSerializer); + } + } + + private ForStSerializerUtils() {} +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java index 519f463468626..15dcd72cec68c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractValueState; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.util.Preconditions; @@ -64,6 +66,9 @@ public class ForStValueState extends AbstractValueState /** The data inputStream used for value deserializer, which should be thread-safe. */ private final ThreadLocal valueDeserializerView; + /** Whether to enable the reuse of serialized key(and namespace). */ + private final boolean enableKeyReuse; + public ForStValueState( StateRequestHandler stateRequestHandler, ColumnFamilyHandle columnFamily, @@ -80,6 +85,11 @@ public ForStValueState( this.namespaceSerializer = ThreadLocal.withInitial(namespaceSerializerInitializer); this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer); this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer); + // We only enable key reuse for the most common namespace across all states. + this.enableKeyReuse = + (defaultNamespace instanceof VoidNamespace) + && (namespaceSerializerInitializer.get() + instanceof VoidNamespaceSerializer); } @Override @@ -89,15 +99,12 @@ public ColumnFamilyHandle getColumnFamilyHandle() { @Override public byte[] serializeKey(ContextKey contextKey) throws IOException { - return contextKey.getOrCreateSerializedKey( - ctxKey -> { - SerializedCompositeKeyBuilder builder = serializedKeyBuilder.get(); - builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup()); - N namespace = contextKey.getNamespace(); - return builder.buildCompositeKeyNamespace( - namespace == null ? defaultNamespace : namespace, - namespaceSerializer.get()); - }); + return ForStSerializerUtils.serializeKeyAndNamespace( + contextKey, + serializedKeyBuilder.get(), + defaultNamespace, + namespaceSerializer.get(), + enableKeyReuse); } @Override