diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java index fcb9d12997e91..89b599bf14c18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStoreV2.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2; +import org.apache.flink.api.common.state.v2.ListState; import org.apache.flink.api.common.state.v2.MapState; import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; @@ -44,6 +45,16 @@ public ValueState getValueState(@Nonnull ValueStateDescriptor statePro } } + @Override + public ListState getListState(@Nonnull ListStateDescriptor stateProperties) { + Preconditions.checkNotNull(stateProperties, "The state properties must not be null"); + try { + return asyncKeyedStateBackend.createState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + @Override public MapState getMapState( @Nonnull MapStateDescriptor stateProperties) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java new file mode 100644 index 0000000000000..3900f9b51a317 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalListState.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +import java.util.List; + +/** + * A default implementation of {@link ListState} which delegates all async requests to {@link + * StateRequestHandler}. + * + * @param The type of key the state is associated to. + * @param The type of values kept internally in state. + */ +public class InternalListState extends InternalKeyedState implements ListState { + + public InternalListState( + StateRequestHandler stateRequestHandler, ListStateDescriptor stateDescriptor) { + super(stateRequestHandler, stateDescriptor); + } + + @Override + public StateFuture> asyncGet() { + return handleRequest(StateRequestType.MERGING_GET, null); + } + + @Override + public StateFuture asyncAdd(V value) { + return handleRequest(StateRequestType.MERGING_ADD, value); + } + + @Override + public StateFuture asyncUpdate(List values) { + return handleRequest(StateRequestType.LIST_UPDATE, values); + } + + @Override + public StateFuture asyncAddAll(List values) { + return handleRequest(StateRequestType.LIST_ADD_ALL, values); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java index 04d75ea56dae2..6c46fdcb7584a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStoreV2.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.ListState; import org.apache.flink.api.common.state.v2.MapState; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.ValueState; @@ -45,6 +46,19 @@ public interface KeyedStateStoreV2 { */ ValueState getValueState(@Nonnull ValueStateDescriptor stateProperties); + /** + * Gets a handle to the system's key / value list state. This state is optimized for state that + * holds lists. One can adds elements to the list, or retrieve the list as a whole. This state + * is only accessible if the function is executed on a KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the state. + * @param The type of value stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part os a KeyedStream). + */ + ListState getListState(@Nonnull ListStateDescriptor stateProperties); + /** * Gets a handle to the system's key/value map state. This state is only accessible if the * function is executed on a KeyedStream. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java new file mode 100644 index 0000000000000..7e3a975ccd93c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * {@link StateDescriptor} for {@link ListState}. This can be used to create partitioned list state + * internally. + * + * @param The type of each value that the list state can hold. + */ +public class ListStateDescriptor extends StateDescriptor { + + /** + * Creates a new {@code ListStateDescriptor} with the given stateId and type. + * + * @param stateId The (unique) stateId for the state. + * @param typeInfo The type of the values in the state. + */ + public ListStateDescriptor(String stateId, TypeInformation typeInfo) { + super(stateId, typeInfo); + } + + /** + * Creates a new {@code ListStateDescriptor} with the given stateId and type. + * + * @param stateId The (unique) stateId for the state. + * @param typeInfo The type of the values in the state. + * @param serializerConfig The serializer related config used to generate {@code + * TypeSerializer}. + */ + public ListStateDescriptor( + String stateId, TypeInformation typeInfo, SerializerConfig serializerConfig) { + super(stateId, typeInfo, serializerConfig); + } + + @Override + public Type getType() { + return Type.LIST; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java new file mode 100644 index 0000000000000..b911cfb2a1874 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/InternalListStateTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +/** Tests for {@link InternalListState}. */ +public class InternalListStateTest extends InternalKeyedStateTestBase { + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testEachOperation() { + ListStateDescriptor descriptor = + new ListStateDescriptor<>("testState", BasicTypeInfo.INT_TYPE_INFO); + InternalListState listState = new InternalListState<>(aec, descriptor); + aec.setCurrentContext(aec.buildContext("test", "test")); + + listState.asyncClear(); + validateRequestRun(listState, StateRequestType.CLEAR, null); + + listState.asyncGet(); + validateRequestRun(listState, StateRequestType.MERGING_GET, null); + + listState.asyncAdd(1); + validateRequestRun(listState, StateRequestType.MERGING_ADD, 1); + + List list = new ArrayList<>(); + listState.asyncUpdate(list); + validateRequestRun(listState, StateRequestType.LIST_UPDATE, list); + + list = new ArrayList<>(); + listState.asyncAddAll(list); + validateRequestRun(listState, StateRequestType.LIST_ADD_ALL, list); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java new file mode 100644 index 0000000000000..9df21b4f1c9f2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/ListStateDescriptorTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ListStateDescriptor}. */ +public class ListStateDescriptorTest { + + @Test + void testHashCodeAndEquals() throws Exception { + final String name = "testName"; + + ListStateDescriptor original = + new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO); + ListStateDescriptor same = + new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO); + ListStateDescriptor sameBySerializer = + new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO); + + // test that hashCode() works on state descriptors with initialized and uninitialized + // serializers + assertThat(same).hasSameHashCodeAs(original); + assertThat(sameBySerializer).hasSameHashCodeAs(original); + + assertThat(same).isEqualTo(original); + assertThat(sameBySerializer).isEqualTo(original); + + // equality with a clone + ListStateDescriptor clone = CommonTestUtils.createCopySerializable(original); + assertThat(clone).isEqualTo(original); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 150e001a3d0ef..e93ecc75bcb01 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -256,6 +256,13 @@ public org.apache.flink.api.common.state.v2.ValueState getValueState( return keyedStateStoreV2.getValueState(stateProperties); } + public org.apache.flink.api.common.state.v2.ListState getListState( + org.apache.flink.runtime.state.v2.ListStateDescriptor stateProperties) { + KeyedStateStoreV2 keyedStateStoreV2 = + checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); + return keyedStateStoreV2.getListState(stateProperties); + } + public org.apache.flink.api.common.state.v2.MapState getMapState( org.apache.flink.runtime.state.v2.MapStateDescriptor stateProperties) { KeyedStateStoreV2 keyedStateStoreV2 = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 7f1898ad55411..03efc44132808 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -273,6 +273,30 @@ void testV2ValueStateInstantiation() throws Exception { .isPositive(); } + @Test + void testV2ListStateInstantiation() throws Exception { + final ExecutionConfig config = new ExecutionConfig(); + SerializerConfig serializerConfig = config.getSerializerConfig(); + serializerConfig.registerKryoType(Path.class); + + final AtomicReference descriptorCapture = new AtomicReference<>(); + + StreamingRuntimeContext context = createRuntimeContext(descriptorCapture, config); + org.apache.flink.runtime.state.v2.ListStateDescriptor descr = + new org.apache.flink.runtime.state.v2.ListStateDescriptor<>( + "name", TypeInformation.of(TaskInfo.class), serializerConfig); + context.getListState(descr); + + org.apache.flink.runtime.state.v2.ListStateDescriptor descrIntercepted = + (org.apache.flink.runtime.state.v2.ListStateDescriptor) descriptorCapture.get(); + TypeSerializer serializer = descrIntercepted.getSerializer(); + + // check that the Path class is really registered, i.e., the execution config was applied + assertThat(serializer).isInstanceOf(KryoSerializer.class); + assertThat(((KryoSerializer) serializer).getKryo().getRegistration(Path.class).getId()) + .isPositive(); + } + @Test void testV2MapStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig();