diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java new file mode 100644 index 0000000000000..7bd8400192c80 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.core.state.InternalStateFuture; + +import org.rocksdb.ColumnFamilyHandle; + +import java.io.IOException; + +/** + * The Get access request for ForStDB. + * + * @param The type of key in get access request. + * @param The type of value returned by get request. + */ +public class ForStDBGetRequest { + + private final K key; + private final ForStInnerTable table; + private final InternalStateFuture future; + + private ForStDBGetRequest(K key, ForStInnerTable table, InternalStateFuture future) { + this.key = key; + this.table = table; + this.future = future; + } + + public byte[] buildSerializedKey() throws IOException { + return table.serializeKey(key); + } + + public ColumnFamilyHandle getColumnFamilyHandle() { + return table.getColumnFamilyHandle(); + } + + public void completeStateFuture(byte[] bytesValue) throws IOException { + if (bytesValue == null) { + future.complete(null); + return; + } + V value = table.deserializeValue(bytesValue); + future.complete(value); + } + + static ForStDBGetRequest of( + K key, ForStInnerTable table, InternalStateFuture future) { + return new ForStDBGetRequest<>(key, table, future); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBOperation.java index 236ef83185568..48ed6c5b4bb65 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBOperation.java @@ -26,16 +26,14 @@ * Data access operation to ForStDB. This interface is used to encapsulate the DB access operations * formed after grouping state access. For more information about “Grouping state access”, please * refer to FLIP-426. - * - * @param The type of output for DB access. */ @Internal -public interface ForStDBOperation { +public interface ForStDBOperation { /** * Process the ForStDB access requests. * - * @return Processing result. + * @return The future which indicates whether the operation is completed. */ - CompletableFuture process(); + CompletableFuture process(); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java new file mode 100644 index 0000000000000..6e9821aeaa547 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.core.state.InternalStateFuture; + +import org.rocksdb.ColumnFamilyHandle; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * The Put access request for ForStDB. + * + * @param The type of key in put access request. + * @param The type of value in put access request. + */ +public class ForStDBPutRequest { + + private final K key; + @Nullable private final V value; + private final ForStInnerTable table; + private final InternalStateFuture future; + + private ForStDBPutRequest( + K key, V value, ForStInnerTable table, InternalStateFuture future) { + this.key = key; + this.value = value; + this.table = table; + this.future = future; + } + + public boolean valueIsNull() { + return value == null; + } + + public ColumnFamilyHandle getColumnFamilyHandle() { + return table.getColumnFamilyHandle(); + } + + public byte[] buildSerializedKey() throws IOException { + return table.serializeKey(key); + } + + public byte[] buildSerializedValue() throws IOException { + assert value != null; + return table.serializeValue(value); + } + + public void completeStateFuture() { + future.complete(null); + } + + /** + * If the value of the ForStDBPutRequest is null, then the request will signify the deletion of + * the data associated with that key. + */ + static ForStDBPutRequest of( + K key, + @Nullable V value, + ForStInnerTable table, + InternalStateFuture future) { + return new ForStDBPutRequest<>(key, value, table, future); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java index 8faa5599e44a3..ce74b574b7b91 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java @@ -22,7 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -31,48 +30,38 @@ /** * The general-purpose multiGet operation implementation for ForStDB, which simulates multiGet by * calling the Get API multiple times with multiple threads. - * - * @param The type of key in get access request. - * @param The type of value in get access request. */ -public class ForStGeneralMultiGetOperation implements ForStDBOperation> { +public class ForStGeneralMultiGetOperation implements ForStDBOperation { private static final Logger LOG = LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class); private final RocksDB db; - private final List> batchRequest; + private final List> batchRequest; private final Executor executor; ForStGeneralMultiGetOperation( - RocksDB db, List> batchRequest, Executor executor) { + RocksDB db, List> batchRequest, Executor executor) { this.db = db; this.batchRequest = batchRequest; this.executor = executor; } @Override - public CompletableFuture> process() { + public CompletableFuture process() { - CompletableFuture> future = new CompletableFuture<>(); - @SuppressWarnings("unchecked") - V[] result = (V[]) new Object[batchRequest.size()]; - Arrays.fill(result, null); + CompletableFuture future = new CompletableFuture<>(); AtomicInteger counter = new AtomicInteger(batchRequest.size()); for (int i = 0; i < batchRequest.size(); i++) { - GetRequest request = batchRequest.get(i); - final int index = i; + ForStDBGetRequest request = batchRequest.get(i); executor.execute( () -> { try { - ForStInnerTable table = request.table; - byte[] key = table.serializeKey(request.key); - byte[] value = db.get(table.getColumnFamilyHandle(), key); - if (value != null) { - result[index] = table.deserializeValue(value); - } + byte[] key = request.buildSerializedKey(); + byte[] value = db.get(request.getColumnFamilyHandle(), key); + request.completeStateFuture(value); } catch (Exception e) { LOG.warn( "Error when process general multiGet operation for forStDB", e); @@ -80,26 +69,11 @@ public CompletableFuture> process() { } finally { if (counter.decrementAndGet() == 0 && !future.isCompletedExceptionally()) { - future.complete(Arrays.asList(result)); + future.complete(null); } } }); } return future; } - - /** The Get access request for ForStDB. */ - static class GetRequest { - final K key; - final ForStInnerTable table; - - private GetRequest(K key, ForStInnerTable table) { - this.key = key; - this.table = table; - } - - static GetRequest of(K key, ForStInnerTable table) { - return new GetRequest<>(key, table); - } - } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java index a0c71f572cd9d..90e811c7ccbf6 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java @@ -22,26 +22,19 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; -import javax.annotation.Nullable; - import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; -/** - * The writeBatch operation implementation for ForStDB. - * - * @param The type of key in put access request. - * @param The type of value in put access request. - */ -public class ForStWriteBatchOperation implements ForStDBOperation { +/** The writeBatch operation implementation for ForStDB. */ +public class ForStWriteBatchOperation implements ForStDBOperation { private static final int PER_RECORD_ESTIMATE_BYTES = 100; private final RocksDB db; - private final List> batchRequest; + private final List> batchRequest; private final WriteOptions writeOptions; @@ -49,7 +42,7 @@ public class ForStWriteBatchOperation implements ForStDBOperation { ForStWriteBatchOperation( RocksDB db, - List> batchRequest, + List> batchRequest, WriteOptions writeOptions, Executor executor) { this.db = db; @@ -64,46 +57,27 @@ public CompletableFuture process() { () -> { try (WriteBatch writeBatch = new WriteBatch(batchRequest.size() * PER_RECORD_ESTIMATE_BYTES)) { - for (PutRequest request : batchRequest) { - ForStInnerTable table = request.table; - if (request.value == null) { + for (ForStDBPutRequest request : batchRequest) { + if (request.valueIsNull()) { // put(key, null) == delete(key) writeBatch.delete( - table.getColumnFamilyHandle(), - table.serializeKey(request.key)); + request.getColumnFamilyHandle(), + request.buildSerializedKey()); } else { writeBatch.put( - table.getColumnFamilyHandle(), - table.serializeKey(request.key), - table.serializeValue(request.value)); + request.getColumnFamilyHandle(), + request.buildSerializedKey(), + request.buildSerializedValue()); } } db.write(writeOptions, writeBatch); + for (ForStDBPutRequest request : batchRequest) { + request.completeStateFuture(); + } } catch (Exception e) { throw new CompletionException("Error while adding data to ForStDB", e); } }, executor); } - - /** The Put access request for ForStDB. */ - static class PutRequest { - final K key; - @Nullable final V value; - final ForStInnerTable table; - - private PutRequest(K key, V value, ForStInnerTable table) { - this.key = key; - this.value = value; - this.table = table; - } - - /** - * If the value of the PutRequest is null, then the request will signify the deletion of the - * data associated with that key. - */ - static PutRequest of(K key, @Nullable V value, ForStInnerTable table) { - return new PutRequest<>(key, value, table); - } - } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index 1e5939bcd01f9..ff0e0246353a4 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.ConfigConstants; @@ -42,6 +43,10 @@ import org.rocksdb.RocksDB; import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; /** Base class for {@link ForStDBOperation} tests. */ @@ -104,4 +109,45 @@ protected ForStValueState buildForStValueState(String stateName valueSerializerView, valueDeserializerView); } + + static class TestStateFuture implements InternalStateFuture { + + public CompletableFuture future = new CompletableFuture<>(); + + @Override + public StateFuture thenApply(Function fn) { + throw new UnsupportedOperationException(); + } + + @Override + public StateFuture thenAccept(Consumer action) { + throw new UnsupportedOperationException(); + } + + @Override + public StateFuture thenCompose( + Function> action) { + throw new UnsupportedOperationException(); + } + + @Override + public StateFuture thenCombine( + StateFuture other, BiFunction fn) { + throw new UnsupportedOperationException(); + } + + @Override + public void complete(T result) { + future.complete(result); + } + + public T getCompletedResult() throws Exception { + return future.get(); + } + + @Override + public void thenSyncAccept(Consumer action) { + throw new UnsupportedOperationException(); + } + } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java index 3b7bdb3978526..5b8263ec25e9d 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java @@ -18,6 +18,8 @@ package org.apache.flink.state.forst; +import org.apache.flink.api.java.tuple.Tuple2; + import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -25,7 +27,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.apache.flink.state.forst.ForStGeneralMultiGetOperation.GetRequest; import static org.assertj.core.api.Assertions.assertThat; /** Unit test for {@link ForStGeneralMultiGetOperation}. */ @@ -35,30 +36,35 @@ public class ForStGeneralMultiGetOperationTest extends ForStDBOperationTestBase public void testValueStateMultiGet() throws Exception { ForStValueState valueState1 = buildForStValueState("test-multiGet-1"); ForStValueState valueState2 = buildForStValueState("test-multiGet-2"); - List, String>> batchGetRequest = new ArrayList<>(); - List resultCheckList = new ArrayList<>(); + List> batchGetRequest = new ArrayList<>(); + List>> resultCheckList = new ArrayList<>(); int keyNum = 1000; for (int i = 0; i < keyNum; i++) { - GetRequest, String> request = - GetRequest.of(buildContextKey(i), ((i % 2 == 0) ? valueState1 : valueState2)); + TestStateFuture future = new TestStateFuture<>(); + ForStValueState table = ((i % 2 == 0) ? valueState1 : valueState2); + ForStDBGetRequest, String> request = + ForStDBGetRequest.of(buildContextKey(i), table, future); batchGetRequest.add(request); + String value = (i % 10 != 0 ? String.valueOf(i) : null); - resultCheckList.add(value); + resultCheckList.add(Tuple2.of(value, future)); if (value == null) { continue; } - byte[] keyBytes = request.table.serializeKey(request.key); - byte[] valueBytes = request.table.serializeValue(value); - db.put(request.table.getColumnFamilyHandle(), keyBytes, valueBytes); + byte[] keyBytes = request.buildSerializedKey(); + byte[] valueBytes = table.serializeValue(value); + db.put(request.getColumnFamilyHandle(), keyBytes, valueBytes); } ExecutorService executor = Executors.newFixedThreadPool(4); - ForStGeneralMultiGetOperation, String> generalMultiGetOperation = - new ForStGeneralMultiGetOperation<>(db, batchGetRequest, executor); - List result = generalMultiGetOperation.process().get(); + ForStGeneralMultiGetOperation generalMultiGetOperation = + new ForStGeneralMultiGetOperation(db, batchGetRequest, executor); + generalMultiGetOperation.process().get(); - assertThat(result).isEqualTo(resultCheckList); + for (Tuple2> tuple : resultCheckList) { + assertThat(tuple.f1.getCompletedResult()).isEqualTo(tuple.f0); + } executor.shutdownNow(); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java index 38abac4a70959..19e11684fbf62 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStWriteBatchOperationTest.java @@ -26,8 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.apache.flink.state.forst.ForStWriteBatchOperation.PutRequest; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Unit test for {@link ForStWriteBatchOperation}. */ public class ForStWriteBatchOperationTest extends ForStDBOperationTestBase { @@ -36,65 +36,78 @@ public class ForStWriteBatchOperationTest extends ForStDBOperationTestBase { public void testValueStateWriteBatch() throws Exception { ForStValueState valueState1 = buildForStValueState("test-write-batch-1"); ForStValueState valueState2 = buildForStValueState("test-write-batch-2"); - List, String>> batchPutRequest = new ArrayList<>(); + List> batchPutRequest = new ArrayList<>(); int keyNum = 100; for (int i = 0; i < keyNum; i++) { batchPutRequest.add( - PutRequest.of( + ForStDBPutRequest.of( buildContextKey(i), String.valueOf(i), - ((i % 2 == 0) ? valueState1 : valueState2))); + ((i % 2 == 0) ? valueState1 : valueState2), + new TestStateFuture<>())); } ExecutorService executor = Executors.newFixedThreadPool(2); - ForStWriteBatchOperation, String> writeBatchOperation = - new ForStWriteBatchOperation<>(db, batchPutRequest, new WriteOptions(), executor); + ForStWriteBatchOperation writeBatchOperation = + new ForStWriteBatchOperation(db, batchPutRequest, new WriteOptions(), executor); writeBatchOperation.process().get(); // check data correctness - for (PutRequest, String> request : batchPutRequest) { - ForStInnerTable, String> table = request.table; - byte[] keyBytes = table.serializeKey(request.key); - byte[] valueBytes = db.get(table.getColumnFamilyHandle(), keyBytes); - assertThat(table.deserializeValue(valueBytes)).isEqualTo(request.value); + for (ForStDBPutRequest request : batchPutRequest) { + byte[] keyBytes = request.buildSerializedKey(); + byte[] valueBytes = db.get(request.getColumnFamilyHandle(), keyBytes); + assertArrayEquals(valueBytes, request.buildSerializedValue()); } } @Test public void testWriteBatchWithNullValue() throws Exception { ForStValueState valueState = buildForStValueState("test-write-batch"); - List, String>> batchPutRequest = new ArrayList<>(); + List> batchPutRequest = new ArrayList<>(); // 1. write some data without null value int keyNum = 100; for (int i = 0; i < keyNum; i++) { - batchPutRequest.add(PutRequest.of(buildContextKey(i), String.valueOf(i), valueState)); + batchPutRequest.add( + ForStDBPutRequest.of( + buildContextKey(i), + String.valueOf(i), + valueState, + new TestStateFuture<>())); } ExecutorService executor = Executors.newFixedThreadPool(2); - ForStWriteBatchOperation, String> writeBatchOperation = - new ForStWriteBatchOperation<>(db, batchPutRequest, new WriteOptions(), executor); + ForStWriteBatchOperation writeBatchOperation = + new ForStWriteBatchOperation(db, batchPutRequest, new WriteOptions(), executor); writeBatchOperation.process().get(); // 2. update data with null value batchPutRequest.clear(); for (int i = 0; i < keyNum; i++) { if (i % 8 == 0) { - batchPutRequest.add(PutRequest.of(buildContextKey(i), null, valueState)); + batchPutRequest.add( + ForStDBPutRequest.of( + buildContextKey(i), null, valueState, new TestStateFuture<>())); } else { batchPutRequest.add( - PutRequest.of(buildContextKey(i), String.valueOf(i * 2), valueState)); + ForStDBPutRequest.of( + buildContextKey(i), + String.valueOf(i * 2), + valueState, + new TestStateFuture<>())); } } - ForStWriteBatchOperation, String> writeBatchOperation2 = - new ForStWriteBatchOperation<>(db, batchPutRequest, new WriteOptions(), executor); + ForStWriteBatchOperation writeBatchOperation2 = + new ForStWriteBatchOperation(db, batchPutRequest, new WriteOptions(), executor); writeBatchOperation2.process().get(); // 3. check data correctness - for (PutRequest, String> request : batchPutRequest) { - ForStInnerTable, String> table = request.table; - byte[] keyBytes = table.serializeKey(request.key); - byte[] valueBytes = db.get(table.getColumnFamilyHandle(), keyBytes); - String value = (valueBytes == null) ? null : table.deserializeValue(valueBytes); - assertThat(value).isEqualTo(request.value); + for (ForStDBPutRequest request : batchPutRequest) { + byte[] keyBytes = request.buildSerializedKey(); + byte[] valueBytes = db.get(request.getColumnFamilyHandle(), keyBytes); + if (valueBytes == null) { + assertTrue(request.valueIsNull()); + } else { + assertArrayEquals(valueBytes, request.buildSerializedValue()); + } } } }