diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java new file mode 100644 index 0000000000000..513e6453b419d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +import java.util.List; + +/** + * A default implementation of {@link ListState} which delegates all async requests to {@link + * AsyncExecutionController}. + * + * @param The type of key the state is associated to. + * @param The type of values kept internally in state. + */ +public class InternalListState extends InternalKeyedState implements ListState { + + public InternalListState( + AsyncExecutionController asyncExecutionController, + ListStateDescriptor valueStateDescriptor) { + super(asyncExecutionController, valueStateDescriptor); + } + + @Override + public StateFuture> asyncGet() { + return handleRequest(StateRequestType.MERGING_GET, null); + } + + @Override + public StateFuture asyncAdd(V value) { + return handleRequest(StateRequestType.MERGING_ADD, value); + } + + @Override + public StateFuture asyncUpdate(List values) { + return handleRequest(StateRequestType.LIST_UPDATE, values); + } + + @Override + public StateFuture asyncAddAll(List values) { + return handleRequest(StateRequestType.LIST_ADD_ALL, values); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java new file mode 100644 index 0000000000000..7e3a975ccd93c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * {@link StateDescriptor} for {@link ListState}. This can be used to create partitioned list state + * internally. + * + * @param The type of each value that the list state can hold. + */ +public class ListStateDescriptor extends StateDescriptor { + + /** + * Creates a new {@code ListStateDescriptor} with the given stateId and type. + * + * @param stateId The (unique) stateId for the state. + * @param typeInfo The type of the values in the state. + */ + public ListStateDescriptor(String stateId, TypeInformation typeInfo) { + super(stateId, typeInfo); + } + + /** + * Creates a new {@code ListStateDescriptor} with the given stateId and type. + * + * @param stateId The (unique) stateId for the state. + * @param typeInfo The type of the values in the state. + * @param serializerConfig The serializer related config used to generate {@code + * TypeSerializer}. + */ + public ListStateDescriptor( + String stateId, TypeInformation typeInfo, SerializerConfig serializerConfig) { + super(stateId, typeInfo, serializerConfig); + } + + @Override + public Type getType() { + return Type.LIST; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java new file mode 100644 index 0000000000000..b911cfb2a1874 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +/** Tests for {@link InternalListState}. */ +public class InternalListStateTest extends InternalKeyedStateTestBase { + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testEachOperation() { + ListStateDescriptor descriptor = + new ListStateDescriptor<>("testState", BasicTypeInfo.INT_TYPE_INFO); + InternalListState listState = new InternalListState<>(aec, descriptor); + aec.setCurrentContext(aec.buildContext("test", "test")); + + listState.asyncClear(); + validateRequestRun(listState, StateRequestType.CLEAR, null); + + listState.asyncGet(); + validateRequestRun(listState, StateRequestType.MERGING_GET, null); + + listState.asyncAdd(1); + validateRequestRun(listState, StateRequestType.MERGING_ADD, 1); + + List list = new ArrayList<>(); + listState.asyncUpdate(list); + validateRequestRun(listState, StateRequestType.LIST_UPDATE, list); + + list = new ArrayList<>(); + listState.asyncAddAll(list); + validateRequestRun(listState, StateRequestType.LIST_ADD_ALL, list); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java new file mode 100644 index 0000000000000..9df21b4f1c9f2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ListStateDescriptor}. */ +public class ListStateDescriptorTest { + + @Test + void testHashCodeAndEquals() throws Exception { + final String name = "testName"; + + ListStateDescriptor original = + new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO); + ListStateDescriptor same = + new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO); + ListStateDescriptor sameBySerializer = + new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO); + + // test that hashCode() works on state descriptors with initialized and uninitialized + // serializers + assertThat(same).hasSameHashCodeAs(original); + assertThat(sameBySerializer).hasSameHashCodeAs(original); + + assertThat(same).isEqualTo(original); + assertThat(sameBySerializer).isEqualTo(original); + + // equality with a clone + ListStateDescriptor clone = CommonTestUtils.createCopySerializable(original); + assertThat(clone).isEqualTo(original); + } +}