Skip to content

Commit

Permalink
[FLINK-34978][State] Introduce Asynchronous State APIs (apache#24595)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly authored and masteryhx committed Apr 2, 2024
1 parent d271495 commit f38d8ca
Show file tree
Hide file tree
Showing 9 changed files with 594 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state.v2;

import org.apache.flink.annotation.Experimental;

/**
* Base interface for partitioned state that supports adding elements and inspecting the current
* state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
*
* <p>The state is accessed and modified by user functions, and checkpointed consistently by the
* system as part of the distributed snapshots.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the key of
* the current element. That way, the system can handle stream and state partitioning consistently
* together.
*
* @param <IN> Type of the value that can be added to the state.
* @param <OUT> Type of the value that can be retrieved from the state.
*/
@Experimental
public interface AppendingState<IN, OUT> extends State {

/**
* Returns the current value for the state. When the state is not partitioned the returned value
* is the same for all inputs in a given operator instance. If state partitioning is applied,
* the value returned depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method should return {@code
* null} wrapped by a StateFuture.
*
* @return The operator state value corresponding to the current input or {@code null} wrapped
* by a {@link StateFuture} if the state is empty.
*/
StateFuture<OUT> asyncGet();

/**
* Updates the operator state accessible by {@link #asyncGet()} by adding the given value to the
* list of values. The next time {@link #asyncGet()} is called (for the same state partition)
* the returned state will represent the updated list.
*
* <p>null value is not allowed to be passed in.
*
* @param value The new value for the state.
*/
StateFuture<Void> asyncAdd(IN value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state.v2;

import org.apache.flink.annotation.Experimental;

import java.util.List;

/**
* {@link State} interface for partitioned list state in Operations. The state is accessed and
* modified by user functions, and checkpointed consistently by the system as part of the
* distributed snapshots.
*
* <p>The state can be a keyed list state or an operator list state.
*
* <p>When it is a keyed list state, it is accessed by functions applied on a {@code KeyedStream}.
* The key is automatically supplied by the system, so the function always sees the value mapped to
* the key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
*
* <p>When it is an operator list state, the list is a collection of state items that are
* independent from each other and eligible for redistribution across operator instances in case of
* changed operator parallelism.
*
* @param <T> Type of values that this list state keeps.
*/
@Experimental
public interface ListState<T> extends MergingState<T, StateIterator<T>> {

/**
* Updates the operator state accessible by {@link #asyncGet()} by updating existing values to
* the given list of values. The next time {@link #asyncGet()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If an empty list is passed in, the state value will be null.
*
* <p>Null value passed in or any null value in list is not allowed.
*
* @param values The new values for the state.
*/
StateFuture<Void> asyncUpdate(List<T> values);

/**
* Updates the operator state accessible by {@link #asyncGet()} by adding the given values to
* existing list of values. The next time {@link #asyncGet()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If an empty list is passed in, the state value remains unchanged.
*
* <p>Null value passed in or any null value in list is not allowed.
*
* @param values The new values to be added to the state.
*/
StateFuture<Void> asyncAddAll(List<T> values);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state.v2;

import org.apache.flink.annotation.Experimental;

import java.util.Map;

/**
* {@link State} interface for partitioned key-value state. The key-value pair can be added, updated
* and retrieved.
*
* <p>The state is accessed and modified by user functions, and checkpointed consistently by the
* system as part of the distributed snapshots.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the key of
* the current element. That way, the system can handle stream and state partitioning consistently
* together.
*
* @param <UK> Type of the keys in the state.
* @param <UV> Type of the values in the state.
*/
@Experimental
public interface MapState<UK, UV> extends State {

/**
* Returns the current value associated with the given key asynchronously. When the state is not
* partitioned the returned value is the same for all inputs in a given operator instance. If
* state partitioning is applied, the value returned depends on the current operator input, as
* the operator maintains an independent state for each partition.
*
* @return The {@link StateFuture} that will return value corresponding to the current input.
* When no corresponding value for this key, the future will return {@code null}.
*/
StateFuture<UV> asyncGet(UK key);

/**
* Update the current value associated with the given key asynchronously. When the state is not
* partitioned the value is updated for all inputs in a given operator instance. If state
* partitioning is applied, the updated value depends on the current operator input, as the
* operator maintains an independent state for each partition. When a {@code null} value is
* provided, the state for the given key will be removed.
*
* @param key The key that will be updated.
* @param value The new value for the key.
* @return The {@link StateFuture} that will trigger the callback when update finishes.
*/
StateFuture<Void> asyncPut(UK key, UV value);

/**
* Update all of the mappings from the given map into the state asynchronously. When the state
* is not partitioned the value is updated for all inputs in a given operator instance. If state
* partitioning is applied, the updated mapping depends on the current operator input, as the
* operator maintains an independent state for each partition. When a {@code null} value is
* provided within the map, the state for the corresponding key will be removed.
*
* <p>If an empty map is passed in, the state value remains unchanged.
*
* <p>Null map pointer is not allowed.
*
* @param map The mappings to be stored in this state.
* @return The {@link StateFuture} that will trigger the callback when update finishes.
*/
StateFuture<Void> asyncPutAll(Map<UK, UV> map);

/**
* Delete the mapping of the given key from the state asynchronously. When the state is not
* partitioned the deleted value is the same for all inputs in a given operator instance. If
* state partitioning is applied, the value deleted depends on the current operator input, as
* the operator maintains an independent state for each partition.
*
* @param key The key of the mapping.
* @return The {@link StateFuture} that will trigger the callback when update finishes.
*/
StateFuture<Void> asyncRemove(UK key);

/**
* Returns whether there exists the given mapping asynchronously. When the state is not
* partitioned the returned value is the same for all inputs in a given operator instance. If
* state partitioning is applied, the value returned depends on the current operator input, as
* the operator maintains an independent state for each partition.
*
* @param key The key of the mapping.
* @return The {@link StateFuture} that will return true if there exists a mapping whose key
* equals to the given key.
*/
StateFuture<Boolean> asyncContains(UK key);

/**
* Returns the current iterator for all the mappings of this state asynchronously. When the
* state is not partitioned the returned iterator is the same for all inputs in a given operator
* instance. If state partitioning is applied, the iterator returned depends on the current
* operator input, as the operator maintains an independent state for each partition.
*
* @return The {@link StateFuture} that will return mapping iterator corresponding to the
* current input.
*/
StateFuture<StateIterator<Map.Entry<UK, UV>>> asyncEntries();

/**
* Returns the current iterator for all the keys of this state asynchronously. When the state is
* not partitioned the returned iterator is the same for all inputs in a given operator
* instance. If state partitioning is applied, the iterator returned depends on the current
* operator input, as the operator maintains an independent state for each partition.
*
* @return The {@link StateFuture} that will return key iterator corresponding to the current
* input.
*/
StateFuture<StateIterator<UK>> asyncKeys();

/**
* Returns the current iterator for all the values of this state asynchronously. When the state
* is not partitioned the returned iterator is the same for all inputs in a given operator
* instance. If state partitioning is applied, the iterator returned depends on the current
* operator input, as the operator maintains an independent state for each partition.
*
* @return The {@link StateFuture} that will return value iterator corresponding to the current
* input.
*/
StateFuture<StateIterator<UV>> asyncValues();

/**
* Returns whether this state contains no key-value mappings asynchronously. When the state is
* not partitioned the returned value is the same for all inputs in a given operator instance.
* If state partitioning is applied, the value returned depends on the current operator input,
* as the operator maintains an independent state for each partition.
*
* @return The {@link StateFuture} that will return true if there is no key-value mapping,
* otherwise false.
*/
StateFuture<Boolean> asyncIsEmpty();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state.v2;

import org.apache.flink.annotation.Experimental;

/**
* Extension of {@link AppendingState} that allows merging of state. That is, two instances of
* {@link MergingState} can be combined into a single instance that contains all the information of
* the two merged states.
*
* @param <IN> Type of the value that can be added to the state.
* @param <OUT> Type of the value that can be retrieved from the state.
*/
@Experimental
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state.v2;

import org.apache.flink.annotation.Experimental;

/**
* Interface that different types of partitioned state must implement.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the key of
* the current element. That way, the system can handle stream and state partitioning consistently
* together.
*/
@Experimental
public interface State {

/** Removes the value mapped under the current key asynchronously. */
StateFuture<Void> asyncClear();
}
Loading

0 comments on commit f38d8ca

Please sign in to comment.