Skip to content

Commit

Permalink
[FLINK-35161][state] Implement StateExecutor for ForStStateBackend
Browse files Browse the repository at this point in the history
This closes apache#24739.
  • Loading branch information
ljz2051 authored and Zakelly committed May 7, 2024
1 parent 88f6d06 commit ea4112a
Show file tree
Hide file tree
Showing 16 changed files with 599 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -252,8 +252,14 @@ void triggerIfNeeded(boolean force) {
if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
return;
}
List<StateRequest<?, ?, ?>> toRun = stateRequestsBuffer.popActive(batchSize);
stateExecutor.executeBatchRequests(toRun);

Optional<StateRequestContainer> toRun =
stateRequestsBuffer.popActive(
batchSize, () -> stateExecutor.createStateRequestContainer());
if (!toRun.isPresent() || toRun.get().isEmpty()) {
return;
}
stateExecutor.executeBatchRequests(toRun.get());
stateRequestsBuffer.advanceSeq();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,18 @@ public interface StateExecutor {
/**
* Execute a batch of state requests.
*
* @param processingRequests the given batch of processing requests
* @param stateRequestContainer The StateRequestContainer which holds the given batch of
* processing requests.
* @return A future can determine whether execution has completed.
*/
CompletableFuture<Boolean> executeBatchRequests(
Iterable<StateRequest<?, ?, ?>> processingRequests);
CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer);

/**
* Create a {@link StateRequestContainer} which is used to hold the batched {@link
* StateRequest}.
*/
StateRequestContainer createStateRequestContainer();

/** Shutdown the StateExecutor, and new committed state execution requests will be rejected. */
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class StateRequest<K, IN, OUT> implements Serializable {
/** The record context of this request. */
private final RecordContext<K> context;

StateRequest(
public StateRequest(
@Nullable State state,
StateRequestType type,
@Nullable IN payload,
Expand All @@ -64,25 +64,25 @@ public class StateRequest<K, IN, OUT> implements Serializable {
this.context = context;
}

StateRequestType getRequestType() {
public StateRequestType getRequestType() {
return type;
}

@Nullable
IN getPayload() {
public IN getPayload() {
return payload;
}

@Nullable
State getState() {
public State getState() {
return state;
}

public InternalStateFuture<OUT> getFuture() {
return stateFuture;
}

RecordContext<K> getRecordContext() {
public RecordContext<K> getRecordContext() {
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* A buffer to hold state requests to execute state requests in batch, which can only be manipulated
Expand Down Expand Up @@ -198,18 +197,20 @@ int activeQueueSize() {
* Try to pop state requests from active queue, if the size of active queue is less than N,
* return all the requests in active queue.
*
* @param n the number of state requests to pop.
* @return A list of state requests.
* @param n The number of state requests to pop.
* @param requestContainerInitializer Initializer for the stateRequest container
* @return A StateRequestContainer which holds the popped state requests.
*/
List<StateRequest<?, ?, ?>> popActive(int n) {
Optional<StateRequestContainer> popActive(
int n, Supplier<StateRequestContainer> requestContainerInitializer) {
final int count = Math.min(n, activeQueue.size());
if (count <= 0) {
return Collections.emptyList();
return Optional.empty();
}
ArrayList<StateRequest<?, ?, ?>> ret = new ArrayList<>(count);
StateRequestContainer stateRequestContainer = requestContainerInitializer.get();
for (int i = 0; i < count; i++) {
ret.add(activeQueue.pop());
stateRequestContainer.offer(activeQueue.pop());
}
return ret;
return Optional.of(stateRequestContainer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

/**
* A container which is used to hold {@link StateRequest}s. The role of {@code
* StateRequestContainer} is to serve as an intermediary carrier for data transmission between the
* runtime layer and the state layer. It stores the stateRequest from the runtime layer, which is
* then processed by the state layer.
*
* <p>Notice that the {@code StateRequestContainer} may not be thread-safe.
*/
public interface StateRequestContainer {

/** Preserve a stateRequest into the {@code StateRequestContainer}. */
void offer(StateRequest<?, ?, ?> stateRequest);

/** Returns whether the container is empty. */
boolean isEmpty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,12 @@ public TestStateExecutor() {}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public CompletableFuture<Boolean> executeBatchRequests(
Iterable<StateRequest<?, ?, ?>> processingRequests) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
for (StateRequest request : processingRequests) {
public CompletableFuture<Void> executeBatchRequests(
StateRequestContainer stateRequestContainer) {
Preconditions.checkArgument(stateRequestContainer instanceof MockStateRequestContainer);
CompletableFuture<Void> future = new CompletableFuture<>();
for (StateRequest request :
((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) {
if (request.getRequestType() == StateRequestType.VALUE_GET) {
Preconditions.checkState(request.getState() != null);
TestValueState state = (TestValueState) request.getState();
Expand All @@ -574,8 +576,16 @@ public CompletableFuture<Boolean> executeBatchRequests(
throw new UnsupportedOperationException("Unsupported request type");
}
}
future.complete(true);
future.complete(null);
return future;
}

@Override
public StateRequestContainer createStateRequestContainer() {
return new MockStateRequestContainer();
}

@Override
public void shutdown() {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import java.util.ArrayList;
import java.util.List;

/** The mocked {@link StateRequestContainer} for testing. */
public class MockStateRequestContainer implements StateRequestContainer {

private final List<StateRequest<?, ?, ?>> stateRequestList = new ArrayList<>();

@Override
public void offer(StateRequest<?, ?, ?> stateRequest) {
stateRequestList.add(stateRequest);
}

@Override
public boolean isEmpty() {
return stateRequestList.isEmpty();
}

public List<StateRequest<?, ?, ?>> getStateRequestList() {
return stateRequestList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.state.forst;

import org.apache.flink.runtime.asyncprocessing.StateRequest;

import org.rocksdb.ColumnFamilyHandle;

import java.io.IOException;
Expand Down Expand Up @@ -63,4 +65,22 @@ public interface ForStInnerTable<K, V> {
* @throws IOException Thrown if the deserialization encountered an I/O related error.
*/
V deserializeValue(byte[] value) throws IOException;

/**
* Build a {@link ForStDBGetRequest} that belong to this {@code ForStInnerTable} with the given
* stateRequest.
*
* @param stateRequest The given stateRequest.
* @return The corresponding ForSt GetRequest.
*/
ForStDBGetRequest<K, V> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest);

/**
* Build a {@link ForStDBPutRequest} that belong to {@code ForStInnerTable} with the given
* stateRequest.
*
* @param stateRequest The given stateRequest.
* @return The corresponding ForSt PutRequest.
*/
ForStDBPutRequest<K, V> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest);
}
Loading

0 comments on commit ea4112a

Please sign in to comment.