diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 4d4fae93463aee..9b46918f1d6c94 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -276,6 +276,7 @@ public StateExecutor createStateExecutor() { } StateExecutor stateExecutor = new ForStStateExecutor( + optionsContainer.isWriteInline(), optionsContainer.getReadIoParallelism(), optionsContainer.getWriteIoParallelism(), db, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java index 7dc55903937a0b..782b3061da0e3a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java @@ -157,17 +157,27 @@ public class ForStOptions { + "of rocksdb timer service, but consumes more heap memory at the same time.", TIMER_SERVICE_FACTORY.key(), ForStDB.name())); + public static final ConfigOption EXECUTOR_WRITE_IO_INLINE = + ConfigOptions.key("state.backend.forst.executor.inline-write") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to let write requests executed within the coordinator thread."); + public static final ConfigOption EXECUTOR_READ_IO_PARALLELISM = - ConfigOptions.key("state.backend.forst.memory.executor-read-io-parallelism") + ConfigOptions.key("state.backend.forst.executor.read-io-parallelism") .intType() .defaultValue(3) .withDescription( "The number of threads used for read IO operations in the executor."); public static final ConfigOption EXECUTOR_WRITE_IO_PARALLELISM = - ConfigOptions.key("state.backend.forst.memory.executor-write-io-parallelism") + ConfigOptions.key("state.backend.forst.executor.write-io-parallelism") .intType() .defaultValue(1) .withDescription( - "The number of threads used for write IO operations in the executor."); + "The number of threads used for write IO operations in the executor." + + " Only valid when '" + + EXECUTOR_WRITE_IO_INLINE.key() + + "' is false."); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index 5b48ef3711b7ed..0e72717bcee81f 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -277,6 +277,10 @@ public Path getDbPath() { } } + public boolean isWriteInline() { + return configuration.get(ForStOptions.EXECUTOR_WRITE_IO_INLINE); + } + public int getReadIoParallelism() { return configuration.get(ForStOptions.EXECUTOR_READ_IO_PARALLELISM); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java index 1366e688b59873..dcaf9915a6a82a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java @@ -71,27 +71,46 @@ public class ForStStateExecutor implements StateExecutor { private final AtomicLong ongoing; public ForStStateExecutor( - int readIoParallelism, int writeIoParallelism, RocksDB db, WriteOptions writeOptions) { - Preconditions.checkState(readIoParallelism > 0 || writeIoParallelism > 0); - this.coordinatorThread = - Executors.newSingleThreadScheduledExecutor( - new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator")); - if (readIoParallelism <= 0 || writeIoParallelism <= 0) { - this.readThreadCount = Math.max(readIoParallelism, writeIoParallelism); - this.readThreads = - Executors.newFixedThreadPool( - readThreadCount, new ExecutorThreadFactory("ForSt-StateExecutor-IO")); - this.writeThreads = null; - } else { + boolean isWriteInline, + int readIoParallelism, + int writeIoParallelism, + RocksDB db, + WriteOptions writeOptions) { + if (isWriteInline) { + Preconditions.checkState(readIoParallelism > 0); + this.coordinatorThread = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator-And-Write")); this.readThreadCount = readIoParallelism; this.readThreads = Executors.newFixedThreadPool( readIoParallelism, new ExecutorThreadFactory("ForSt-StateExecutor-read-IO")); this.writeThreads = - Executors.newFixedThreadPool( - writeIoParallelism, - new ExecutorThreadFactory("ForSt-StateExecutor-write-IO")); + org.apache.flink.util.concurrent.Executors.newDirectExecutorService(); + } else { + Preconditions.checkState(readIoParallelism > 0 || writeIoParallelism > 0); + this.coordinatorThread = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator")); + if (readIoParallelism <= 0 || writeIoParallelism <= 0) { + this.readThreadCount = Math.max(readIoParallelism, writeIoParallelism); + this.readThreads = + Executors.newFixedThreadPool( + readThreadCount, + new ExecutorThreadFactory("ForSt-StateExecutor-IO")); + this.writeThreads = readThreads; + } else { + this.readThreadCount = readIoParallelism; + this.readThreads = + Executors.newFixedThreadPool( + readIoParallelism, + new ExecutorThreadFactory("ForSt-StateExecutor-read-IO")); + this.writeThreads = + Executors.newFixedThreadPool( + writeIoParallelism, + new ExecutorThreadFactory("ForSt-StateExecutor-write-IO")); + } } this.db = db; this.writeOptions = writeOptions; @@ -122,17 +141,6 @@ public CompletableFuture executeBatchRequests( () -> { long startTime = System.currentTimeMillis(); List> futures = new ArrayList<>(3); - List> putRequests = - stateRequestClassifier.pollDbPutRequests(); - if (!putRequests.isEmpty()) { - ForStWriteBatchOperation writeOperations = - new ForStWriteBatchOperation( - db, - putRequests, - writeOptions, - writeThreads == null ? readThreads : writeThreads); - futures.add(writeOperations.process()); - } if (!getRequests.isEmpty()) { ForStGeneralMultiGetOperation getOperations = @@ -152,6 +160,15 @@ public CompletableFuture executeBatchRequests( futures.add(iterOperations.process()); } + List> putRequests = + stateRequestClassifier.pollDbPutRequests(); + if (!putRequests.isEmpty()) { + ForStWriteBatchOperation writeOperations = + new ForStWriteBatchOperation( + db, putRequests, writeOptions, writeThreads); + futures.add(writeOperations.process()); + } + FutureUtils.combineAll(futures) .thenAcceptAsync( (e) -> { diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java index df8170962a4053..486186788775cf 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java @@ -47,7 +47,7 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase { @SuppressWarnings("unchecked") void testExecuteValueStateRequest() throws Exception { ForStStateExecutor forStStateExecutor = - new ForStStateExecutor(3, 1, db, new WriteOptions()); + new ForStStateExecutor(false, 3, 1, db, new WriteOptions()); ForStValueState state1 = buildForStValueState("value-state-1"); ForStValueState state2 = @@ -131,7 +131,7 @@ void testExecuteValueStateRequest() throws Exception { @Test void testExecuteMapStateRequest() throws Exception { ForStStateExecutor forStStateExecutor = - new ForStStateExecutor(3, 1, db, new WriteOptions()); + new ForStStateExecutor(false, 3, 1, db, new WriteOptions()); ForStMapState state = buildForStMapState("map-state"); StateRequestContainer stateRequestContainer =