Skip to content

Commit

Permalink
[FLINK-35356][State] State descriptor and implementation for async re…
Browse files Browse the repository at this point in the history
…ducing state
  • Loading branch information
Zakelly committed May 16, 2024
1 parent b87ead7 commit a8938c8
Show file tree
Hide file tree
Showing 13 changed files with 351 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state.v2;

import org.apache.flink.annotation.Experimental;

/**
* {@link State} interface for reducing state. Elements can be added to the state, they will be
* combined using a reduce function. The current state can be inspected.
*
* <p>The state is accessed and modified by user functions, and checkpointed consistently by the
* system as part of the distributed snapshots.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the key of
* the current element. That way, the system can handle stream and state partitioning consistently
* together.
*
* @param <T> Type of the value in the operator state
*/
@Experimental
public interface ReducingState<T> extends MergingState<T, T> {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.MergingState;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.ValueState;

Expand All @@ -29,10 +29,7 @@

/**
* The type of processing request for {@link State} from **users' perspective**. Each interface of
* {@link State} and its sub-interfaces will have a corresponding enum entry. Given that there is an
* inheritance between state interfaces, only the methods on parent/base interface class have
* corresponding entries, and we omit the ones inherited from parent interface class. e.g. There is
* only {@link #MERGING_GET} but no {@code LIST_GET}.
* {@link State} and its sub-interfaces will have a corresponding enum entry.
*
* <p>TODO: Serialization and Deserialization.
*/
Expand All @@ -54,11 +51,11 @@ public enum StateRequestType {
/** Update value to current partition, {@link ValueState#asyncUpdate(Object)}. */
VALUE_UPDATE,

/** Get from merging state, {@link MergingState#asyncGet()}. */
MERGING_GET,
/** Get from list state, {@link ListState#asyncGet()}. */
LIST_GET,

/** Add value to merging state, {@link MergingState#asyncAdd(Object)}. */
MERGING_ADD,
/** Add value to list state, {@link ListState#asyncAdd(Object)}. */
LIST_ADD,

/** Put a list to current partition, {@link ListState#asyncUpdate(List)}. */
LIST_UPDATE,
Expand Down Expand Up @@ -102,5 +99,11 @@ public enum StateRequestType {
MAP_IS_EMPTY,

/** Continuously load elements for one iterator. */
ITERATOR_LOADING
ITERATOR_LOADING,

/** Get from reducing state, {@link ReducingState#asyncGet()}. */
REDUCING_GET,

/** Add element into reducing state, {@link ReducingState#asyncAdd(Object)}. */
REDUCING_ADD
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -65,4 +66,15 @@ public <UK, UV> MapState<UK, UV> getMapState(
throw new RuntimeException("Error while getting state", e);
}
}

@Override
public <T> ReducingState<T> getReducingState(
@Nonnull ReducingStateDescriptor<T> stateProperties) {
Preconditions.checkNotNull(stateProperties, "The state properties must not be null");
try {
return asyncKeyedStateBackend.createState(stateProperties);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public InternalListState(

@Override
public StateFuture<StateIterator<V>> asyncGet() {
return handleRequest(StateRequestType.MERGING_GET, null);
return handleRequest(StateRequestType.LIST_GET, null);
}

@Override
public StateFuture<Void> asyncAdd(V value) {
return handleRequest(StateRequestType.MERGING_ADD, value);
return handleRequest(StateRequestType.LIST_ADD, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;

/**
* A default implementation of {@link ReducingState} which delegates all async requests to {@link
* StateRequestHandler}.
*
* @param <K> The type of key the state is associated to.
* @param <V> The type of values kept internally in state.
*/
public class InternalReducingState<K, V> extends InternalKeyedState<K, V>
implements ReducingState<V> {

protected final ReduceFunction<V> reduceFunction;

public InternalReducingState(
StateRequestHandler stateRequestHandler, ReducingStateDescriptor<V> stateDescriptor) {
super(stateRequestHandler, stateDescriptor);
this.reduceFunction = stateDescriptor.getReduceFunction();
}

@Override
public StateFuture<V> asyncGet() {
return handleRequest(StateRequestType.REDUCING_GET, null);
}

@Override
public StateFuture<Void> asyncAdd(V value) {
return handleRequest(StateRequestType.REDUCING_ADD, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.ReducingState;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.ValueState;

Expand Down Expand Up @@ -71,4 +72,18 @@ public interface KeyedStateStoreV2 {
* function (function is not part of a KeyedStream).
*/
<UK, UV> MapState<UK, UV> getMapState(@Nonnull MapStateDescriptor<UK, UV> stateProperties);

/**
* Gets a handle to the system's key/value reducing state. This state is optimized for state
* that aggregates values.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* @param stateProperties The descriptor defining the properties of the stats.
* @param <T> The type of value stored in the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
<T> ReducingState<T> getReducingState(@Nonnull ReducingStateDescriptor<T> stateProperties);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* {@link StateDescriptor} for {@link org.apache.flink.api.common.state.v2.ReducingState}.
*
* @param <T> The type of the values that can be added to the state.
*/
public class ReducingStateDescriptor<T> extends StateDescriptor<T> {

private final ReduceFunction<T> reduceFunction;

/**
* Creates a new {@code ReducingStateDescriptor} with the given name and default value.
*
* @param name The (unique) name for the state.
* @param reduceFunction The {@code ReduceFunction} used to aggregate the state.
* @param typeInfo The type of the values in the state.
*/
public ReducingStateDescriptor(
String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) {
super(name, typeInfo, null);
this.reduceFunction = checkNotNull(reduceFunction);
}

/**
* Creates a new {@code ReducingStateDescriptor} with the given name and default value.
*
* @param name The (unique) name for the state.
* @param reduceFunction The {@code ReduceFunction} used to aggregate the state.
* @param typeInfo The type of the values in the state.
*/
public ReducingStateDescriptor(
String name,
ReduceFunction<T> reduceFunction,
TypeInformation<T> typeInfo,
SerializerConfig serializerConfig) {
super(name, typeInfo, serializerConfig);
this.reduceFunction = checkNotNull(reduceFunction);
}

/** Returns the reduce function to be used for the reducing state. */
public ReduceFunction<T> getReduceFunction() {
return reduceFunction;
}

@Override
public Type getType() {
return Type.REDUCING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public enum Type {
VALUE,
LIST,
REDUCING,
FOLDING,
AGGREGATING,
MAP
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public void testEachOperation() {
validateRequestRun(listState, StateRequestType.CLEAR, null);

listState.asyncGet();
validateRequestRun(listState, StateRequestType.MERGING_GET, null);
validateRequestRun(listState, StateRequestType.LIST_GET, null);

listState.asyncAdd(1);
validateRequestRun(listState, StateRequestType.MERGING_ADD, 1);
validateRequestRun(listState, StateRequestType.LIST_ADD, 1);

List<Integer> list = new ArrayList<>();
listState.asyncUpdate(list);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;

import org.junit.jupiter.api.Test;

/** Tests for {@link InternalReducingState}. */
public class InternalReducingStateTest extends InternalKeyedStateTestBase {

@Test
@SuppressWarnings({"unchecked"})
public void testEachOperation() {
ReduceFunction<Integer> reducer = Integer::sum;
ReducingStateDescriptor<Integer> descriptor =
new ReducingStateDescriptor<>("testState", reducer, BasicTypeInfo.INT_TYPE_INFO);
InternalReducingState<String, Integer> reducingState =
new InternalReducingState<>(aec, descriptor);
aec.setCurrentContext(aec.buildContext("test", "test"));

reducingState.asyncClear();
validateRequestRun(reducingState, StateRequestType.CLEAR, null);

reducingState.asyncGet();
validateRequestRun(reducingState, StateRequestType.REDUCING_GET, null);

reducingState.asyncAdd(1);
validateRequestRun(reducingState, StateRequestType.REDUCING_ADD, 1);
}
}
Loading

0 comments on commit a8938c8

Please sign in to comment.