Skip to content

Commit

Permalink
[FLINK-35153][State] Internal async list state and corresponding stat…
Browse files Browse the repository at this point in the history
…e descriptor

This closes apache#24781
  • Loading branch information
Zakelly committed May 15, 2024
1 parent 4897072 commit 73a7e1c
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
Expand All @@ -44,6 +45,16 @@ public <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> statePro
}
}

@Override
public <T> ListState<T> getListState(@Nonnull ListStateDescriptor<T> stateProperties) {
Preconditions.checkNotNull(stateProperties, "The state properties must not be null");
try {
return asyncKeyedStateBackend.createState(stateProperties);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
}
}

@Override
public <UK, UV> MapState<UK, UV> getMapState(
@Nonnull MapStateDescriptor<UK, UV> stateProperties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;

import java.util.List;

/**
* A default implementation of {@link ListState} which delegates all async requests to {@link
* StateRequestHandler}.
*
* @param <K> The type of key the state is associated to.
* @param <V> The type of values kept internally in state.
*/
public class InternalListState<K, V> extends InternalKeyedState<K, V> implements ListState<V> {

public InternalListState(
StateRequestHandler stateRequestHandler, ListStateDescriptor<V> stateDescriptor) {
super(stateRequestHandler, stateDescriptor);
}

@Override
public StateFuture<StateIterator<V>> asyncGet() {
return handleRequest(StateRequestType.MERGING_GET, null);
}

@Override
public StateFuture<Void> asyncAdd(V value) {
return handleRequest(StateRequestType.MERGING_ADD, value);
}

@Override
public StateFuture<Void> asyncUpdate(List<V> values) {
return handleRequest(StateRequestType.LIST_UPDATE, values);
}

@Override
public StateFuture<Void> asyncAddAll(List<V> values) {
return handleRequest(StateRequestType.LIST_ADD_ALL, values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.v2;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.ValueState;
Expand All @@ -45,6 +46,19 @@ public interface KeyedStateStoreV2 {
*/
<T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> stateProperties);

/**
* Gets a handle to the system's key / value list state. This state is optimized for state that
* holds lists. One can adds elements to the list, or retrieve the list as a whole. This state
* is only accessible if the function is executed on a KeyedStream.
*
* @param stateProperties The descriptor defining the properties of the state.
* @param <T> The type of value stored in the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part os a KeyedStream).
*/
<T> ListState<T> getListState(@Nonnull ListStateDescriptor<T> stateProperties);

/**
* Gets a handle to the system's key/value map state. This state is only accessible if the
* function is executed on a KeyedStream.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;

/**
* {@link StateDescriptor} for {@link ListState}. This can be used to create partitioned list state
* internally.
*
* @param <T> The type of each value that the list state can hold.
*/
public class ListStateDescriptor<T> extends StateDescriptor<T> {

/**
* Creates a new {@code ListStateDescriptor} with the given stateId and type.
*
* @param stateId The (unique) stateId for the state.
* @param typeInfo The type of the values in the state.
*/
public ListStateDescriptor(String stateId, TypeInformation<T> typeInfo) {
super(stateId, typeInfo);
}

/**
* Creates a new {@code ListStateDescriptor} with the given stateId and type.
*
* @param stateId The (unique) stateId for the state.
* @param typeInfo The type of the values in the state.
* @param serializerConfig The serializer related config used to generate {@code
* TypeSerializer}.
*/
public ListStateDescriptor(
String stateId, TypeInformation<T> typeInfo, SerializerConfig serializerConfig) {
super(stateId, typeInfo, serializerConfig);
}

@Override
public Type getType() {
return Type.LIST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

/** Tests for {@link InternalListState}. */
public class InternalListStateTest extends InternalKeyedStateTestBase {

@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testEachOperation() {
ListStateDescriptor<Integer> descriptor =
new ListStateDescriptor<>("testState", BasicTypeInfo.INT_TYPE_INFO);
InternalListState<String, Integer> listState = new InternalListState<>(aec, descriptor);
aec.setCurrentContext(aec.buildContext("test", "test"));

listState.asyncClear();
validateRequestRun(listState, StateRequestType.CLEAR, null);

listState.asyncGet();
validateRequestRun(listState, StateRequestType.MERGING_GET, null);

listState.asyncAdd(1);
validateRequestRun(listState, StateRequestType.MERGING_ADD, 1);

List<Integer> list = new ArrayList<>();
listState.asyncUpdate(list);
validateRequestRun(listState, StateRequestType.LIST_UPDATE, list);

list = new ArrayList<>();
listState.asyncAddAll(list);
validateRequestRun(listState, StateRequestType.LIST_ADD_ALL, list);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.CommonTestUtils;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ListStateDescriptor}. */
public class ListStateDescriptorTest {

@Test
void testHashCodeAndEquals() throws Exception {
final String name = "testName";

ListStateDescriptor<Integer> original =
new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO);
ListStateDescriptor<Integer> same =
new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO);
ListStateDescriptor<Integer> sameBySerializer =
new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO);

// test that hashCode() works on state descriptors with initialized and uninitialized
// serializers
assertThat(same).hasSameHashCodeAs(original);
assertThat(sameBySerializer).hasSameHashCodeAs(original);

assertThat(same).isEqualTo(original);
assertThat(sameBySerializer).isEqualTo(original);

// equality with a clone
ListStateDescriptor<Integer> clone = CommonTestUtils.createCopySerializable(original);
assertThat(clone).isEqualTo(original);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState(
return keyedStateStoreV2.getValueState(stateProperties);
}

public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
org.apache.flink.runtime.state.v2.ListStateDescriptor<T> stateProperties) {
KeyedStateStoreV2 keyedStateStoreV2 =
checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
return keyedStateStoreV2.getListState(stateProperties);
}

public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
org.apache.flink.runtime.state.v2.MapStateDescriptor<UK, UV> stateProperties) {
KeyedStateStoreV2 keyedStateStoreV2 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,30 @@ void testV2ValueStateInstantiation() throws Exception {
.isPositive();
}

@Test
void testV2ListStateInstantiation() throws Exception {
final ExecutionConfig config = new ExecutionConfig();
SerializerConfig serializerConfig = config.getSerializerConfig();
serializerConfig.registerKryoType(Path.class);

final AtomicReference<Object> descriptorCapture = new AtomicReference<>();

StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config);
org.apache.flink.runtime.state.v2.ListStateDescriptor<TaskInfo> descr =
new org.apache.flink.runtime.state.v2.ListStateDescriptor<>(
"name", TypeInformation.of(TaskInfo.class), serializerConfig);
context.getListState(descr);

org.apache.flink.runtime.state.v2.ListStateDescriptor<?> descrIntercepted =
(org.apache.flink.runtime.state.v2.ListStateDescriptor<?>) descriptorCapture.get();
TypeSerializer<?> serializer = descrIntercepted.getSerializer();

// check that the Path class is really registered, i.e., the execution config was applied
assertThat(serializer).isInstanceOf(KryoSerializer.class);
assertThat(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId())
.isPositive();
}

@Test
void testV2MapStateInstantiation() throws Exception {
final ExecutionConfig config = new ExecutionConfig();
Expand Down

0 comments on commit 73a7e1c

Please sign in to comment.