Skip to content

Commit

Permalink
[FLINK-35411] Support process write state requests within coordinator…
Browse files Browse the repository at this point in the history
… thread
  • Loading branch information
Zakelly committed Sep 23, 2024
1 parent 870d6a6 commit a182b9b
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public StateExecutor createStateExecutor() {
}
StateExecutor stateExecutor =
new ForStStateExecutor(
optionsContainer.isWriteInline(),
optionsContainer.getReadIoParallelism(),
optionsContainer.getWriteIoParallelism(),
db,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,27 @@ public class ForStOptions {
+ "of rocksdb timer service, but consumes more heap memory at the same time.",
TIMER_SERVICE_FACTORY.key(), ForStDB.name()));

public static final ConfigOption<Boolean> EXECUTOR_WRITE_IO_INLINE =
ConfigOptions.key("state.backend.forst.executor.inline-write")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to let write requests executed within the coordinator thread.");

public static final ConfigOption<Integer> EXECUTOR_READ_IO_PARALLELISM =
ConfigOptions.key("state.backend.forst.memory.executor-read-io-parallelism")
ConfigOptions.key("state.backend.forst.executor.read-io-parallelism")
.intType()
.defaultValue(3)
.withDescription(
"The number of threads used for read IO operations in the executor.");

public static final ConfigOption<Integer> EXECUTOR_WRITE_IO_PARALLELISM =
ConfigOptions.key("state.backend.forst.memory.executor-write-io-parallelism")
ConfigOptions.key("state.backend.forst.executor.write-io-parallelism")
.intType()
.defaultValue(1)
.withDescription(
"The number of threads used for write IO operations in the executor.");
"The number of threads used for write IO operations in the executor."
+ " Only valid when '"
+ EXECUTOR_WRITE_IO_INLINE.key()
+ "' is false.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ public Path getDbPath() {
}
}

public boolean isWriteInline() {
return configuration.get(ForStOptions.EXECUTOR_WRITE_IO_INLINE);
}

public int getReadIoParallelism() {
return configuration.get(ForStOptions.EXECUTOR_READ_IO_PARALLELISM);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,46 @@ public class ForStStateExecutor implements StateExecutor {
private final AtomicLong ongoing;

public ForStStateExecutor(
int readIoParallelism, int writeIoParallelism, RocksDB db, WriteOptions writeOptions) {
Preconditions.checkState(readIoParallelism > 0 || writeIoParallelism > 0);
this.coordinatorThread =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator"));
if (readIoParallelism <= 0 || writeIoParallelism <= 0) {
this.readThreadCount = Math.max(readIoParallelism, writeIoParallelism);
this.readThreads =
Executors.newFixedThreadPool(
readThreadCount, new ExecutorThreadFactory("ForSt-StateExecutor-IO"));
this.writeThreads = null;
} else {
boolean isWriteInline,
int readIoParallelism,
int writeIoParallelism,
RocksDB db,
WriteOptions writeOptions) {
if (isWriteInline) {
Preconditions.checkState(readIoParallelism > 0);
this.coordinatorThread =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator-And-Write"));
this.readThreadCount = readIoParallelism;
this.readThreads =
Executors.newFixedThreadPool(
readIoParallelism,
new ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
this.writeThreads =
Executors.newFixedThreadPool(
writeIoParallelism,
new ExecutorThreadFactory("ForSt-StateExecutor-write-IO"));
org.apache.flink.util.concurrent.Executors.newDirectExecutorService();
} else {
Preconditions.checkState(readIoParallelism > 0 || writeIoParallelism > 0);
this.coordinatorThread =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator"));
if (readIoParallelism <= 0 || writeIoParallelism <= 0) {
this.readThreadCount = Math.max(readIoParallelism, writeIoParallelism);
this.readThreads =
Executors.newFixedThreadPool(
readThreadCount,
new ExecutorThreadFactory("ForSt-StateExecutor-IO"));
this.writeThreads = readThreads;
} else {
this.readThreadCount = readIoParallelism;
this.readThreads =
Executors.newFixedThreadPool(
readIoParallelism,
new ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
this.writeThreads =
Executors.newFixedThreadPool(
writeIoParallelism,
new ExecutorThreadFactory("ForSt-StateExecutor-write-IO"));
}
}
this.db = db;
this.writeOptions = writeOptions;
Expand Down Expand Up @@ -122,17 +141,6 @@ public CompletableFuture<Void> executeBatchRequests(
() -> {
long startTime = System.currentTimeMillis();
List<CompletableFuture<Void>> futures = new ArrayList<>(3);
List<ForStDBPutRequest<?, ?, ?>> putRequests =
stateRequestClassifier.pollDbPutRequests();
if (!putRequests.isEmpty()) {
ForStWriteBatchOperation writeOperations =
new ForStWriteBatchOperation(
db,
putRequests,
writeOptions,
writeThreads == null ? readThreads : writeThreads);
futures.add(writeOperations.process());
}

if (!getRequests.isEmpty()) {
ForStGeneralMultiGetOperation getOperations =
Expand All @@ -152,6 +160,15 @@ public CompletableFuture<Void> executeBatchRequests(
futures.add(iterOperations.process());
}

List<ForStDBPutRequest<?, ?, ?>> putRequests =
stateRequestClassifier.pollDbPutRequests();
if (!putRequests.isEmpty()) {
ForStWriteBatchOperation writeOperations =
new ForStWriteBatchOperation(
db, putRequests, writeOptions, writeThreads);
futures.add(writeOperations.process());
}

FutureUtils.combineAll(futures)
.thenAcceptAsync(
(e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase {
@SuppressWarnings("unchecked")
void testExecuteValueStateRequest() throws Exception {
ForStStateExecutor forStStateExecutor =
new ForStStateExecutor(3, 1, db, new WriteOptions());
new ForStStateExecutor(false, 3, 1, db, new WriteOptions());
ForStValueState<Integer, VoidNamespace, String> state1 =
buildForStValueState("value-state-1");
ForStValueState<Integer, VoidNamespace, String> state2 =
Expand Down Expand Up @@ -131,7 +131,7 @@ void testExecuteValueStateRequest() throws Exception {
@Test
void testExecuteMapStateRequest() throws Exception {
ForStStateExecutor forStStateExecutor =
new ForStStateExecutor(3, 1, db, new WriteOptions());
new ForStStateExecutor(false, 3, 1, db, new WriteOptions());
ForStMapState<Integer, VoidNamespace, String, String> state =
buildForStMapState("map-state");
StateRequestContainer stateRequestContainer =
Expand Down

0 comments on commit a182b9b

Please sign in to comment.