diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java new file mode 100644 index 0000000000000..f9fc195a830a7 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +/** + * Base interface for partitioned state that supports adding elements and inspecting the current + * state. Elements can either be kept in a buffer (list-like) or aggregated into one value. + * + *

The state is accessed and modified by user functions, and checkpointed consistently by the + * system as part of the distributed snapshots. + * + *

The state is only accessible by functions applied on a {@code KeyedStream}. The key is + * automatically supplied by the system, so the function always sees the value mapped to the key of + * the current element. That way, the system can handle stream and state partitioning consistently + * together. + * + * @param Type of the value that can be added to the state. + * @param Type of the value that can be retrieved from the state. + */ +@Experimental +public interface AppendingState extends State { + + /** + * Returns the current value for the state. When the state is not partitioned the returned value + * is the same for all inputs in a given operator instance. If state partitioning is applied, + * the value returned depends on the current operator input, as the operator maintains an + * independent state for each partition. + * + *

NOTE TO IMPLEMENTERS: if the state is empty, then this method should return {@code + * null} wrapped by a StateFuture. + * + * @return The operator state value corresponding to the current input or {@code null} wrapped + * by a {@link StateFuture} if the state is empty. + */ + StateFuture asyncGet(); + + /** + * Updates the operator state accessible by {@link #asyncGet()} by adding the given value to the + * list of values. The next time {@link #asyncGet()} is called (for the same state partition) + * the returned state will represent the updated list. + * + *

null value is not allowed to be passed in. + * + * @param value The new value for the state. + */ + StateFuture asyncAdd(IN value); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/ListState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/ListState.java new file mode 100644 index 0000000000000..ef982551d4f20 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/ListState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * {@link State} interface for partitioned list state in Operations. The state is accessed and + * modified by user functions, and checkpointed consistently by the system as part of the + * distributed snapshots. + * + *

The state can be a keyed list state or an operator list state. + * + *

When it is a keyed list state, it is accessed by functions applied on a {@code KeyedStream}. + * The key is automatically supplied by the system, so the function always sees the value mapped to + * the key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + *

When it is an operator list state, the list is a collection of state items that are + * independent from each other and eligible for redistribution across operator instances in case of + * changed operator parallelism. + * + * @param Type of values that this list state keeps. + */ +@Experimental +public interface ListState extends MergingState> { + + /** + * Updates the operator state accessible by {@link #asyncGet()} by updating existing values to + * the given list of values. The next time {@link #asyncGet()} is called (for the same state + * partition) the returned state will represent the updated list. + * + *

If an empty list is passed in, the state value will be null. + * + *

Null value passed in or any null value in list is not allowed. + * + * @param values The new values for the state. + */ + StateFuture asyncUpdate(List values); + + /** + * Updates the operator state accessible by {@link #asyncGet()} by adding the given values to + * existing list of values. The next time {@link #asyncGet()} is called (for the same state + * partition) the returned state will represent the updated list. + * + *

If an empty list is passed in, the state value remains unchanged. + * + *

Null value passed in or any null value in list is not allowed. + * + * @param values The new values to be added to the state. + */ + StateFuture asyncAddAll(List values); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java new file mode 100644 index 0000000000000..897d68255df32 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +import java.util.Map; + +/** + * {@link State} interface for partitioned key-value state. The key-value pair can be added, updated + * and retrieved. + * + *

The state is accessed and modified by user functions, and checkpointed consistently by the + * system as part of the distributed snapshots. + * + *

The state is only accessible by functions applied on a {@code KeyedStream}. The key is + * automatically supplied by the system, so the function always sees the value mapped to the key of + * the current element. That way, the system can handle stream and state partitioning consistently + * together. + * + * @param Type of the keys in the state. + * @param Type of the values in the state. + */ +@Experimental +public interface MapState extends State { + + /** + * Returns the current value associated with the given key asynchronously. When the state is not + * partitioned the returned value is the same for all inputs in a given operator instance. If + * state partitioning is applied, the value returned depends on the current operator input, as + * the operator maintains an independent state for each partition. + * + * @return The {@link StateFuture} that will return value corresponding to the current input. + * When no corresponding value for this key, the future will return {@code null}. + */ + StateFuture asyncGet(UK key); + + /** + * Update the current value associated with the given key asynchronously. When the state is not + * partitioned the value is updated for all inputs in a given operator instance. If state + * partitioning is applied, the updated value depends on the current operator input, as the + * operator maintains an independent state for each partition. When a {@code null} value is + * provided, the state for the given key will be removed. + * + * @param key The key that will be updated. + * @param value The new value for the key. + * @return The {@link StateFuture} that will trigger the callback when update finishes. + */ + StateFuture asyncPut(UK key, UV value); + + /** + * Update all of the mappings from the given map into the state asynchronously. When the state + * is not partitioned the value is updated for all inputs in a given operator instance. If state + * partitioning is applied, the updated mapping depends on the current operator input, as the + * operator maintains an independent state for each partition. When a {@code null} value is + * provided within the map, the state for the corresponding key will be removed. + * + *

If an empty map is passed in, the state value remains unchanged. + * + *

Null map pointer is not allowed. + * + * @param map The mappings to be stored in this state. + * @return The {@link StateFuture} that will trigger the callback when update finishes. + */ + StateFuture asyncPutAll(Map map); + + /** + * Delete the mapping of the given key from the state asynchronously. When the state is not + * partitioned the deleted value is the same for all inputs in a given operator instance. If + * state partitioning is applied, the value deleted depends on the current operator input, as + * the operator maintains an independent state for each partition. + * + * @param key The key of the mapping. + * @return The {@link StateFuture} that will trigger the callback when update finishes. + */ + StateFuture asyncRemove(UK key); + + /** + * Returns whether there exists the given mapping asynchronously. When the state is not + * partitioned the returned value is the same for all inputs in a given operator instance. If + * state partitioning is applied, the value returned depends on the current operator input, as + * the operator maintains an independent state for each partition. + * + * @param key The key of the mapping. + * @return The {@link StateFuture} that will return true if there exists a mapping whose key + * equals to the given key. + */ + StateFuture asyncContains(UK key); + + /** + * Returns the current iterator for all the mappings of this state asynchronously. When the + * state is not partitioned the returned iterator is the same for all inputs in a given operator + * instance. If state partitioning is applied, the iterator returned depends on the current + * operator input, as the operator maintains an independent state for each partition. + * + * @return The {@link StateFuture} that will return mapping iterator corresponding to the + * current input. + */ + StateFuture>> asyncEntries(); + + /** + * Returns the current iterator for all the keys of this state asynchronously. When the state is + * not partitioned the returned iterator is the same for all inputs in a given operator + * instance. If state partitioning is applied, the iterator returned depends on the current + * operator input, as the operator maintains an independent state for each partition. + * + * @return The {@link StateFuture} that will return key iterator corresponding to the current + * input. + */ + StateFuture> asyncKeys(); + + /** + * Returns the current iterator for all the values of this state asynchronously. When the state + * is not partitioned the returned iterator is the same for all inputs in a given operator + * instance. If state partitioning is applied, the iterator returned depends on the current + * operator input, as the operator maintains an independent state for each partition. + * + * @return The {@link StateFuture} that will return value iterator corresponding to the current + * input. + */ + StateFuture> asyncValues(); + + /** + * Returns whether this state contains no key-value mappings asynchronously. When the state is + * not partitioned the returned value is the same for all inputs in a given operator instance. + * If state partitioning is applied, the value returned depends on the current operator input, + * as the operator maintains an independent state for each partition. + * + * @return The {@link StateFuture} that will return true if there is no key-value mapping, + * otherwise false. + */ + StateFuture asyncIsEmpty(); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MergingState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MergingState.java new file mode 100644 index 0000000000000..c2ab4fef6a3e8 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MergingState.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +/** + * Extension of {@link AppendingState} that allows merging of state. That is, two instances of + * {@link MergingState} can be combined into a single instance that contains all the information of + * the two merged states. + * + * @param Type of the value that can be added to the state. + * @param Type of the value that can be retrieved from the state. + */ +@Experimental +public interface MergingState extends AppendingState {} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/State.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/State.java new file mode 100644 index 0000000000000..c9be129f9c178 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/State.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +/** + * Interface that different types of partitioned state must implement. + * + *

The state is only accessible by functions applied on a {@code KeyedStream}. The key is + * automatically supplied by the system, so the function always sees the value mapped to the key of + * the current element. That way, the system can handle stream and state partitioning consistently + * together. + */ +@Experimental +public interface State { + + /** Removes the value mapped under the current key asynchronously. */ + StateFuture asyncClear(); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java new file mode 100644 index 0000000000000..1f7b2f0d56984 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * StateFuture is a future that act as a return value for async state interfaces. Note: All these + * methods of this interface can ONLY be called within task thread. + * + * @param The return type of this future. + */ +@Experimental +public interface StateFuture { + /** + * Returns a new StateFuture that, when this future completes normally, is executed with this + * future's result as the argument to the supplied function. + * + * @param fn the function to use to compute the value of the returned StateFuture. + * @param the function's return type. + * @return the new StateFuture. + */ + StateFuture thenApply(Function fn); + + /** + * Returns a new StateFuture that, when this future completes normally, is executed with this + * future's result as the argument to the supplied action. + * + * @param action the action to perform before completing the returned StateFuture. + * @return the new StateFuture. + */ + StateFuture thenAccept(Consumer action); + + /** + * Returns a new future that, when this future completes normally, is executed with this future + * as the argument to the supplied function. + * + * @param action the action to perform. + * @return the new StateFuture. + */ + StateFuture thenCompose(Function> action); + + /** + * Returns a new StateFuture that, when this and the other given future both complete normally, + * is executed with the two results as arguments to the supplied function. + * + * @param other the other StateFuture. + * @param fn the function to use to compute the value of the returned StateFuture. + * @param the type of the other StateFuture's result. + * @param the function's return type. + * @return the new StateFuture. + */ + StateFuture thenCombine( + StateFuture other, BiFunction fn); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFutureUtils.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFutureUtils.java new file mode 100644 index 0000000000000..8c942874684f2 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFutureUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +import java.util.Collection; + +/** A collection of utilities that expand the usage of {@link StateFuture}. */ +@Experimental +public class StateFutureUtils { + /** Returns a completed future that does nothing and return null. */ + public static StateFuture completedVoidFuture() { + throw new UnsupportedOperationException("To be implemented."); + } + + /** Returns a completed future that does nothing and return provided result. */ + public static StateFuture completedFuture(V result) { + throw new UnsupportedOperationException("To be implemented."); + } + + /** + * Creates a future that is complete once multiple other futures completed. Upon successful + * completion, the future returns the collection of the futures' results. + * + * @param futures The futures that make up the conjunction. No null entries are allowed, + * otherwise a IllegalArgumentException will be thrown. + * @return The StateFuture that completes once all given futures are complete. + */ + public static StateFuture> combineAll( + Collection> futures) { + throw new UnsupportedOperationException("To be implemented."); + } +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateIterator.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateIterator.java new file mode 100644 index 0000000000000..42b1ee42c5160 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateIterator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +import java.util.Collection; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Asynchronous iterators allow to iterate over data that comes asynchronously, on-demand. + * + * @param The element type of this iterator. + */ +@Experimental +public interface StateIterator { + /** + * Async iterate the data and call the callback when data is ready. + * + * @param iterating the data action when it is ready. The return is the state future for + * chaining. + * @param the type of the inner returned StateFuture's result. + * @return the Future that will trigger when this iterator and all returned state future get its + * results. + */ + StateFuture> onNext(Function> iterating); + + /** + * Async iterate the data and call the callback when data is ready. + * + * @param iterating the data action when it is ready. + * @return the Future that will trigger when this iterator ends. + */ + StateFuture onNext(Consumer iterating); + + /** Return if this iterator is empty synchronously. */ + boolean isEmpty(); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/ValueState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/ValueState.java new file mode 100644 index 0000000000000..5330c917dacd1 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/ValueState.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state.v2; + +import org.apache.flink.annotation.Experimental; + +/** + * {@link State} interface for partitioned single-value state. The value can be retrieved or + * updated. + * + *

The state is accessed and modified by user functions, and checkpointed consistently by the + * system as part of the distributed snapshots. + * + *

The state is only accessible by functions applied on a {@code KeyedStream}. The key is + * automatically supplied by the system, so the function always sees the value mapped to the key of + * the current element. That way, the system can handle stream and state partitioning consistently + * together. + * + * @param Type of the value in the state. + */ +@Experimental +public interface ValueState extends State { + /** + * Returns the current value for the state asynchronously. When the state is not partitioned the + * returned value is the same for all inputs in a given operator instance. If state partitioning + * is applied, the value returned depends on the current operator input, as the operator + * maintains an independent state for each partition. When no value was previously set using + * {@link #asyncUpdate(Object)}, the future will return {@code null} asynchronously. + * + * @return The {@link StateFuture} that will return the value corresponding to the current + * input. + */ + StateFuture asyncValue(); + + /** + * Updates the operator state accessible by {@link #asyncValue()} to the given value. The next + * time {@link #asyncValue()} is called (for the same state partition) the returned state will + * represent the updated value. When a partitioned state is updated with {@code null}, the state + * for the current key will be removed. + * + * @param value The new value for the state. + * @return The {@link StateFuture} that will trigger the callback when update finishes. + */ + StateFuture asyncUpdate(T value); +}