Skip to content

Commit

Permalink
[FLINK-35153][State] Internal async list state and corresponding stat…
Browse files Browse the repository at this point in the history
…e descriptor
  • Loading branch information
Zakelly committed Apr 18, 2024
1 parent 63dc6e9 commit 72c02da
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;

import java.util.List;

/**
* A default implementation of {@link ListState} which delegates all async requests to {@link
* AsyncExecutionController}.
*
* @param <K> The type of key the state is associated to.
* @param <V> The type of values kept internally in state.
*/
public class InternalListState<K, V> extends InternalKeyedState<K, V> implements ListState<V> {

public InternalListState(
AsyncExecutionController<K> asyncExecutionController,
ListStateDescriptor<V> valueStateDescriptor) {
super(asyncExecutionController, valueStateDescriptor);
}

@Override
public StateFuture<StateIterator<V>> asyncGet() {
return handleRequest(StateRequestType.MERGING_GET, null);
}

@Override
public StateFuture<Void> asyncAdd(V value) {
return handleRequest(StateRequestType.MERGING_ADD, value);
}

@Override
public StateFuture<Void> asyncUpdate(List<V> values) {
return handleRequest(StateRequestType.LIST_UPDATE, values);
}

@Override
public StateFuture<Void> asyncAddAll(List<V> values) {
return handleRequest(StateRequestType.LIST_ADD_ALL, values);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;

/**
* {@link StateDescriptor} for {@link ListState}. This can be used to create partitioned list state
* internally.
*
* @param <T> The type of each value that the list state can hold.
*/
public class ListStateDescriptor<T> extends StateDescriptor<T> {

/**
* Creates a new {@code ListStateDescriptor} with the given stateId and type.
*
* @param stateId The (unique) stateId for the state.
* @param typeInfo The type of the values in the state.
*/
public ListStateDescriptor(String stateId, TypeInformation<T> typeInfo) {
super(stateId, typeInfo);
}

/**
* Creates a new {@code ListStateDescriptor} with the given stateId and type.
*
* @param stateId The (unique) stateId for the state.
* @param typeInfo The type of the values in the state.
* @param serializerConfig The serializer related config used to generate {@code
* TypeSerializer}.
*/
public ListStateDescriptor(
String stateId, TypeInformation<T> typeInfo, SerializerConfig serializerConfig) {
super(stateId, typeInfo, serializerConfig);
}

@Override
public Type getType() {
return Type.LIST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

/** Tests for {@link InternalListState}. */
public class InternalListStateTest extends InternalKeyedStateTestBase {

@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testEachOperation() {
ListStateDescriptor<Integer> descriptor =
new ListStateDescriptor<>("testState", BasicTypeInfo.INT_TYPE_INFO);
InternalListState<String, Integer> listState = new InternalListState<>(aec, descriptor);
aec.setCurrentContext(aec.buildContext("test", "test"));

listState.asyncClear();
validateRequestRun(listState, StateRequestType.CLEAR, null);

listState.asyncGet();
validateRequestRun(listState, StateRequestType.MERGING_GET, null);

listState.asyncAdd(1);
validateRequestRun(listState, StateRequestType.MERGING_ADD, 1);

List<Integer> list = new ArrayList<>();
listState.asyncUpdate(list);
validateRequestRun(listState, StateRequestType.LIST_UPDATE, list);

list = new ArrayList<>();
listState.asyncAddAll(list);
validateRequestRun(listState, StateRequestType.LIST_ADD_ALL, list);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.CommonTestUtils;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ListStateDescriptor}. */
public class ListStateDescriptorTest {

@Test
void testHashCodeAndEquals() throws Exception {
final String name = "testName";

ListStateDescriptor<Integer> original =
new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO);
ListStateDescriptor<Integer> same =
new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO);
ListStateDescriptor<Integer> sameBySerializer =
new ListStateDescriptor<>(name, BasicTypeInfo.INT_TYPE_INFO);

// test that hashCode() works on state descriptors with initialized and uninitialized
// serializers
assertThat(same).hasSameHashCodeAs(original);
assertThat(sameBySerializer).hasSameHashCodeAs(original);

assertThat(same).isEqualTo(original);
assertThat(sameBySerializer).isEqualTo(original);

// equality with a clone
ListStateDescriptor<Integer> clone = CommonTestUtils.createCopySerializable(original);
assertThat(clone).isEqualTo(original);
}
}

0 comments on commit 72c02da

Please sign in to comment.