diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 3af58543dcfe0..19aee130b5197 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -291,13 +291,21 @@ public InternalStateFuture handleRequest( public OUT handleRequestSync( State state, StateRequestType type, @Nullable IN payload) { InternalStateFuture stateFuture = handleRequest(state, type, payload); - while (!stateFuture.isDone()) { - try { - mailboxExecutor.yield(); - } catch (InterruptedException e) { - LOG.warn("Error while waiting for state future to complete.", e); - throw new RuntimeException("Error while waiting for state future to complete.", e); + // Trigger since we are waiting the result. + triggerIfNeeded(true); + try { + while (!stateFuture.isDone()) { + if (!mailboxExecutor.tryYield()) { + // We force trigger the buffer if the executor is not fully loaded. + if (!stateExecutor.fullyLoaded()) { + triggerIfNeeded(true); + } + waitForNewMails(); + } } + } catch (InterruptedException ignored) { + // ignore the interrupted exception to avoid throwing fatal error when the task cancel + // or exit. } return stateFuture.get(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java index 504115a48fa30..9ef2d167c3f19 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java @@ -112,5 +112,8 @@ public enum StateRequestType { AGGREGATING_GET, /** Add element to aggregating state by {@link AggregatingState#asyncAdd(Object)}. */ - AGGREGATING_ADD + AGGREGATING_ADD, + + /** Defined by different state backends. */ + CUSTOMIZED } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java index 8631f9542379b..5c7a6ba6bf395 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractListState.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.state.v2.internal.InternalListState; +import java.util.Collection; import java.util.List; /** @@ -83,4 +84,16 @@ public void update(List values) { public void addAll(List values) { handleRequestSync(StateRequestType.LIST_ADD_ALL, values); } + + @Override + public StateFuture asyncMergeNamespaces(N target, Collection sources) { + throw new UnsupportedOperationException( + getClass() + " has not implement the asyncMergeNamespaces()."); + } + + @Override + public void mergeNamespaces(N target, Collection sources) { + throw new UnsupportedOperationException( + getClass() + " has not implement the mergeNamespaces()."); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ListStateAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ListStateAdaptor.java index 7b1b494eefd90..df76566ce515f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ListStateAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ListStateAdaptor.java @@ -23,6 +23,7 @@ import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.state.v2.internal.InternalListState; +import java.util.Collection; import java.util.List; /** @@ -114,4 +115,23 @@ public void add(V value) { throw new RuntimeException("Error while adding value to raw ListState", e); } } + + @Override + public StateFuture asyncMergeNamespaces(N target, Collection sources) { + try { + delegatedState.mergeNamespaces(target, sources); + } catch (Exception e) { + throw new RuntimeException("Error while merging namespaces in raw ListState", e); + } + return StateFutureUtils.completedVoidFuture(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) { + try { + delegatedState.mergeNamespaces(target, sources); + } catch (Exception e) { + throw new RuntimeException("Error while merging namespaces in raw ListState", e); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java index bc469825d85a3..39e4a4bffc666 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java @@ -31,4 +31,4 @@ public interface InternalAggregatingState extends InternalMergingState, AggregatingState, - InternalKeyedState {} + InternalStateAccessible {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java index 7e938c6fbe9b7..6b9b323d8febe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.state.v2.internal; import org.apache.flink.api.common.state.v2.AppendingState; -import org.apache.flink.api.common.state.v2.StateFuture; /** * This class defines the internal interface for appending state. @@ -31,32 +30,4 @@ * @param Type of the value that can be retrieved from the state by synchronous interface. */ public interface InternalAppendingState - extends InternalKeyedState, AppendingState { - /** - * Get internally stored value. - * - * @return internally stored value. - */ - StateFuture asyncGetInternal(); - - /** - * Update internally stored value. - * - * @param valueToStore new value to store. - */ - StateFuture asyncUpdateInternal(SV valueToStore); - - /** - * Get internally stored value. - * - * @return internally stored value. - */ - SV getInternal(); - - /** - * Update internally stored value. - * - * @param valueToStore new value to store. - */ - void updateInternal(SV valueToStore); -} + extends InternalKeyedState, AppendingState {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalListState.java index efeccc7539abb..7c0e027239210 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalListState.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.StateIterator; /** * This class defines the internal interface for list state. @@ -29,4 +30,5 @@ * @param The type of the intermediate state. */ @Internal -public interface InternalListState extends InternalKeyedState, ListState {} +public interface InternalListState + extends InternalMergingState, Iterable>, ListState {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalReducingState.java index bdebbfc2b2dd9..43e2eea794fd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalReducingState.java @@ -27,4 +27,6 @@ * @param Type of the value in the operator state. */ public interface InternalReducingState - extends InternalAggregatingState, ReducingState {} + extends InternalAggregatingState, + ReducingState, + InternalStateAccessible {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalStateAccessible.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalStateAccessible.java new file mode 100644 index 0000000000000..376227436e9ab --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalStateAccessible.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.internal; + +import org.apache.flink.api.common.state.v2.StateFuture; + +public interface InternalStateAccessible { + /** + * Get internally stored value. + * + * @return internally stored value. + */ + StateFuture asyncGetInternal(); + + /** + * Update internally stored value. + * + * @param valueToStore new value to store. + */ + StateFuture asyncUpdateInternal(SV valueToStore); + + /** + * Get internally stored value. + * + * @return internally stored value. + */ + SV getInternal(); + + /** + * Update internally stored value. + * + * @param valueToStore new value to store. + */ + void updateInternal(SV valueToStore); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java index 7cd52632866eb..c41b1d76b5b3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java @@ -123,6 +123,16 @@ private List> withTs(List values) { return withTs; } + @Override + public StateFuture asyncMergeNamespaces(N target, Collection sources) { + return original.asyncMergeNamespaces(target, sources); + } + + @Override + public void mergeNamespaces(N target, Collection sources) { + original.mergeNamespaces(target, sources); + } + private class IteratorWithCleanup implements Iterator { private final Iterator> originalIterator; private boolean anyUnexpired = false; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMultiRawMergePutRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMultiRawMergePutRequest.java new file mode 100644 index 0000000000000..1f4aa8482f793 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBMultiRawMergePutRequest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.core.state.InternalStateFuture; + +import org.forstdb.RocksDB; +import org.forstdb.RocksDBException; + +import java.io.IOException; +import java.util.Collection; + +/** + * The Put access request for ForStDB. + * + * @param The type of key in put access request. + * @param The type of namespace in put access request. + * @param The type of value in put access request. + */ +public class ForStDBMultiRawMergePutRequest extends ForStDBPutRequest { + + final Collection rawValue; + + ForStDBMultiRawMergePutRequest( + ContextKey key, + Collection value, + ForStInnerTable table, + InternalStateFuture future) { + super(key, null, true, table, future); + this.rawValue = value; + } + + @Override + public void process(ForStDBWriteBatchWrapper writeBatchWrapper, RocksDB db) + throws IOException, RocksDBException { + byte[] key = buildSerializedKey(); + for (byte[] value : rawValue) { + writeBatchWrapper.merge(table.getColumnFamilyHandle(), key, value); + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBRawGetRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBRawGetRequest.java new file mode 100644 index 0000000000000..4aa1a6ddc4a07 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBRawGetRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.core.state.InternalStateFuture; + +import java.io.IOException; +import java.util.List; + +/** + * The Get access request for ForStDB. + * + * @param The type of key in get access request. + * @param The type of namespace in get access request. + * @param The type of value returned by get request. + */ +public class ForStDBRawGetRequest extends ForStDBGetRequest, byte[]> { + + ForStDBRawGetRequest( + ContextKey key, + ForStInnerTable> table, + InternalStateFuture future) { + super(key, table, future); + } + + @Override + public void completeStateFuture(byte[] bytesValue) throws IOException { + future.complete(bytesValue); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java index 8bca44b4a671a..f3bfc29cb1767 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListState.java @@ -19,11 +19,14 @@ package org.apache.flink.state.forst; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.StateIterator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.StateRequest; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; @@ -33,12 +36,14 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.v2.AbstractListState; import org.apache.flink.runtime.state.v2.ListStateDescriptor; -import org.apache.flink.util.Preconditions; import org.forstdb.ColumnFamilyHandle; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.function.Supplier; @@ -129,15 +134,25 @@ public List deserializeValue(byte[] valueBytes) throws IOException { @SuppressWarnings("unchecked") @Override - public ForStDBGetRequest, StateIterator> buildDBGetRequest( + public ForStDBGetRequest, ?> buildDBGetRequest( StateRequest stateRequest) { - Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.LIST_GET); ContextKey contextKey = new ContextKey<>( (RecordContext) stateRequest.getRecordContext(), (N) stateRequest.getNamespace()); - return new ForStDBListGetRequest<>( - contextKey, this, (InternalStateFuture>) stateRequest.getFuture()); + switch (stateRequest.getRequestType()) { + case LIST_GET: + return new ForStDBListGetRequest<>( + contextKey, + this, + (InternalStateFuture>) stateRequest.getFuture()); + case CUSTOMIZED: + // must be LIST_GET_RAW + return new ForStDBRawGetRequest<>( + contextKey, this, (InternalStateFuture) stateRequest.getFuture()); + default: + throw new UnsupportedOperationException(); + } } @SuppressWarnings("unchecked") @@ -166,6 +181,14 @@ public ForStDBPutRequest> buildDBPutRequest( value = (List) stateRequest.getPayload(); merge = true; break; + case CUSTOMIZED: + // must be LIST_ADD_ALL_RAW + return new ForStDBMultiRawMergePutRequest<>( + contextKey, + ((Tuple2>) stateRequest.getPayload()) + .f1, + this, + (InternalStateFuture) stateRequest.getFuture()); default: throw new IllegalArgumentException(); } @@ -177,4 +200,86 @@ public ForStDBPutRequest> buildDBPutRequest( contextKey, value, this, (InternalStateFuture) stateRequest.getFuture()); } } + + @Override + public StateFuture asyncMergeNamespaces(N target, Collection sources) { + if (sources == null || sources.isEmpty()) { + return StateFutureUtils.completedVoidFuture(); + } + // phase 1: read from the sources and target + List> futures = new ArrayList<>(sources.size()); + for (N source : sources) { + if (source != null) { + setCurrentNamespace(source); + futures.add( + handleRequest( + StateRequestType.CUSTOMIZED, + Tuple2.of(ForStStateRequestType.LIST_GET_RAW, null))); + } + } + // phase 2: merge the sources to the target + return StateFutureUtils.combineAll(futures) + .thenCompose( + values -> { + List> updateFutures = + new ArrayList<>(sources.size() + 1); + List validValues = new ArrayList<>(sources.size()); + Iterator valueIterator = values.iterator(); + for (N source : sources) { + byte[] value = valueIterator.next(); + if (value != null) { + validValues.add(value); + setCurrentNamespace(source); + updateFutures.add(asyncClear()); + } + } + if (!validValues.isEmpty()) { + setCurrentNamespace(target); + updateFutures.add( + handleRequest( + StateRequestType.CUSTOMIZED, + Tuple2.of( + ForStStateRequestType.MERGE_ALL_RAW, + validValues))); + } + return StateFutureUtils.combineAll(updateFutures); + }) + .thenAccept(ignores -> {}); + } + + @Override + public void mergeNamespaces(N target, Collection sources) { + if (sources == null || sources.isEmpty()) { + return; + } + try { + // merge the sources to the target + List validValues = new ArrayList<>(sources.size()); + for (N source : sources) { + if (source != null) { + setCurrentNamespace(source); + byte[] oldValue = + handleRequestSync( + StateRequestType.CUSTOMIZED, + Tuple2.of(ForStStateRequestType.LIST_GET_RAW, null)); + + if (oldValue != null) { + setCurrentNamespace(source); + clear(); + validValues.add(oldValue); + } + } + } + + // if something came out of merging the sources, merge it or write it to the target + if (!validValues.isEmpty()) { + setCurrentNamespace(target); + handleRequestSync( + StateRequestType.CUSTOMIZED, + Tuple2.of(ForStStateRequestType.MERGE_ALL_RAW, validValues)); + } + } catch (Exception e) { + throw new RuntimeException("merge namespace fail.", e); + } + } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java index 3cbe3dd005ff7..98b3a6438e31c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java @@ -18,6 +18,7 @@ package org.apache.flink.state.forst; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.asyncprocessing.StateRequest; import org.apache.flink.runtime.asyncprocessing.StateRequestContainer; import org.apache.flink.runtime.asyncprocessing.StateRequestType; @@ -120,12 +121,43 @@ private void convertStateRequestsToForStDBRequests(StateRequest stat + " doesn't yet support the clear method."); } } + case CUSTOMIZED: + { + handleCustomizedStateRequests(stateRequest); + return; + } default: throw new UnsupportedOperationException( "Unsupported state request type:" + stateRequestType); } } + @SuppressWarnings("unchecked") + private void handleCustomizedStateRequests(StateRequest stateRequest) { + Tuple2 payload = + (Tuple2) stateRequest.getPayload(); + ForStStateRequestType requestType = payload.f0; + switch (requestType) { + case LIST_GET_RAW: + { + ForStListState forStListState = + (ForStListState) stateRequest.getState(); + dbGetRequests.add(forStListState.buildDBGetRequest(stateRequest)); + return; + } + case MERGE_ALL_RAW: + { + ForStListState forStListState = + (ForStListState) stateRequest.getState(); + dbPutRequests.add(forStListState.buildDBPutRequest(stateRequest)); + return; + } + default: + throw new UnsupportedOperationException( + "Unsupported customized state request type:" + requestType); + } + } + public List> pollDbGetRequests() { return dbGetRequests; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestType.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestType.java new file mode 100644 index 0000000000000..33c9568957ca1 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestType.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +/** + * ForSt customized state request other than {@link + * org.apache.flink.runtime.asyncprocessing.StateRequestType}. + */ +public enum ForStStateRequestType { + /** Get the list in raw bytes without deserialization. */ + LIST_GET_RAW, + + /** Merge a list of raw bytes. */ + MERGE_ALL_RAW +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java new file mode 100644 index 0000000000000..68552b89c66fe --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStListStateTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.v2.ListStateDescriptor; +import org.apache.flink.runtime.state.v2.internal.InternalListState; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ForStListState}. */ +public class ForStListStateTest extends ForStStateTestBase { + + @Test + public void testMergeNamespace() throws Exception { + ListStateDescriptor descriptor = + new ListStateDescriptor<>("testState", BasicTypeInfo.INT_TYPE_INFO); + InternalListState listState = + keyedBackend.createState(1, IntSerializer.INSTANCE, descriptor); + + setCurrentContext("test", "test"); + for (int i = 0; i < 10; i++) { + listState.setCurrentNamespace(i); + listState.asyncAdd(i); + } + drain(); + + setCurrentContext("test", "test"); + for (int i = 0; i < 10; i++) { + listState.setCurrentNamespace(i); + Iterable list = listState.get(); + assertThat(list).containsExactly(i); + } + drain(); + + // 1~20 + ArrayList namespaces = new ArrayList<>(); + for (int i = 1; i < 20; i++) { + namespaces.add(i); + } + + setCurrentContext("test", "test"); + listState + .asyncMergeNamespaces(0, namespaces) + .thenAccept( + (e) -> { + listState.setCurrentNamespace(0); + Iterable list = listState.get(); + assertThat(list).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + for (int i = 1; i < 10; i++) { + listState.setCurrentNamespace(i); + assertThat(listState.get()).isNullOrEmpty(); + } + }); + drain(); + + // test sync method + setCurrentContext("test", "test"); + for (int i = 10; i < 20; i++) { + listState.setCurrentNamespace(i); + listState.add(i); + } + drain(); + + setCurrentContext("test", "test"); + listState.mergeNamespaces(0, namespaces); + listState.setCurrentNamespace(0); + Iterable list = listState.get(); + assertThat(list) + .containsExactly( + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19); + + for (int i = 1; i < 20; i++) { + listState.setCurrentNamespace(i); + assertThat(listState.get()).isNullOrEmpty(); + } + drain(); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java new file mode 100644 index 0000000000000..06b9a601045b7 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; + +import static org.apache.flink.state.forst.ForStTestUtils.createKeyedStateBackend; + +/** + * A base class for all forst state tests, providing building the AEC and StateBackend logic, and + * some tool methods. + */ +public class ForStStateTestBase { + + protected ForStKeyedStateBackend keyedBackend; + + protected AsyncExecutionController aec; + + protected MailboxExecutor mailboxExecutor; + + protected RecordContext context; + + @BeforeEach + public void setup(@TempDir File temporaryFolder) throws IOException { + FileSystem.initialize(new Configuration(), null); + Configuration configuration = new Configuration(); + configuration.set(ForStOptions.REMOTE_DIRECTORY, temporaryFolder.toURI().toString()); + ForStStateBackend forStStateBackend = + new ForStStateBackend().configure(configuration, null); + + keyedBackend = + createKeyedStateBackend( + forStStateBackend, + getMockEnvironment(temporaryFolder), + StringSerializer.INSTANCE); + + mailboxExecutor = + new MailboxExecutorImpl( + new TaskMailboxImpl(), 0, StreamTaskActionExecutor.IMMEDIATE); + + aec = + new AsyncExecutionController<>( + mailboxExecutor, + (a, b) -> {}, + keyedBackend.createStateExecutor(), + 1, + 100, + 0, + 1, + null); + keyedBackend.setup(aec); + } + + @AfterEach + void tearDown() throws Exception { + keyedBackend.close(); + } + + protected void setCurrentContext(Object record, String key) { + context = aec.buildContext(record, key); + context.retain(); + aec.setCurrentContext(context); + } + + protected void drain() { + context.release(); + aec.drainInflightRecords(0); + } + + private static MockEnvironment getMockEnvironment(File tempDir) { + return MockEnvironment.builder() + .setUserCodeClassLoader(ForStStateBackendConfigTest.class.getClassLoader()) + .setTaskManagerRuntimeInfo( + new TestingTaskManagerRuntimeInfo(new Configuration(), tempDir)) + .build(); + } +}