Skip to content

Commit

Permalink
[FLINK-35161][state] Refactor ForStGeneralMultiGetOperation and ForSt…
Browse files Browse the repository at this point in the history
…WriteBatchOperation
  • Loading branch information
ljz2051 authored and Zakelly committed May 7, 2024
1 parent ffa3869 commit 88f6d06
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.forst;

import org.apache.flink.core.state.InternalStateFuture;

import org.rocksdb.ColumnFamilyHandle;

import java.io.IOException;

/**
* The Get access request for ForStDB.
*
* @param <K> The type of key in get access request.
* @param <V> The type of value returned by get request.
*/
public class ForStDBGetRequest<K, V> {

private final K key;
private final ForStInnerTable<K, V> table;
private final InternalStateFuture<V> future;

private ForStDBGetRequest(K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) {
this.key = key;
this.table = table;
this.future = future;
}

public byte[] buildSerializedKey() throws IOException {
return table.serializeKey(key);
}

public ColumnFamilyHandle getColumnFamilyHandle() {
return table.getColumnFamilyHandle();
}

public void completeStateFuture(byte[] bytesValue) throws IOException {
if (bytesValue == null) {
future.complete(null);
return;
}
V value = table.deserializeValue(bytesValue);
future.complete(value);
}

static <K, V> ForStDBGetRequest<K, V> of(
K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) {
return new ForStDBGetRequest<>(key, table, future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
* Data access operation to ForStDB. This interface is used to encapsulate the DB access operations
* formed after grouping state access. For more information about “Grouping state access”, please
* refer to FLIP-426.
*
* @param <OUT> The type of output for DB access.
*/
@Internal
public interface ForStDBOperation<OUT> {
public interface ForStDBOperation {

/**
* Process the ForStDB access requests.
*
* @return Processing result.
* @return The future which indicates whether the operation is completed.
*/
CompletableFuture<OUT> process();
CompletableFuture<Void> process();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.forst;

import org.apache.flink.core.state.InternalStateFuture;

import org.rocksdb.ColumnFamilyHandle;

import javax.annotation.Nullable;

import java.io.IOException;

/**
* The Put access request for ForStDB.
*
* @param <K> The type of key in put access request.
* @param <V> The type of value in put access request.
*/
public class ForStDBPutRequest<K, V> {

private final K key;
@Nullable private final V value;
private final ForStInnerTable<K, V> table;
private final InternalStateFuture<Void> future;

private ForStDBPutRequest(
K key, V value, ForStInnerTable<K, V> table, InternalStateFuture<Void> future) {
this.key = key;
this.value = value;
this.table = table;
this.future = future;
}

public boolean valueIsNull() {
return value == null;
}

public ColumnFamilyHandle getColumnFamilyHandle() {
return table.getColumnFamilyHandle();
}

public byte[] buildSerializedKey() throws IOException {
return table.serializeKey(key);
}

public byte[] buildSerializedValue() throws IOException {
assert value != null;
return table.serializeValue(value);
}

public void completeStateFuture() {
future.complete(null);
}

/**
* If the value of the ForStDBPutRequest is null, then the request will signify the deletion of
* the data associated with that key.
*/
static <K, V> ForStDBPutRequest<K, V> of(
K key,
@Nullable V value,
ForStInnerTable<K, V> table,
InternalStateFuture<Void> future) {
return new ForStDBPutRequest<>(key, value, table, future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand All @@ -31,75 +30,50 @@
/**
* The general-purpose multiGet operation implementation for ForStDB, which simulates multiGet by
* calling the Get API multiple times with multiple threads.
*
* @param <K> The type of key in get access request.
* @param <V> The type of value in get access request.
*/
public class ForStGeneralMultiGetOperation<K, V> implements ForStDBOperation<List<V>> {
public class ForStGeneralMultiGetOperation implements ForStDBOperation {

private static final Logger LOG = LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);

private final RocksDB db;

private final List<GetRequest<K, V>> batchRequest;
private final List<ForStDBGetRequest<?, ?>> batchRequest;

private final Executor executor;

ForStGeneralMultiGetOperation(
RocksDB db, List<GetRequest<K, V>> batchRequest, Executor executor) {
RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor executor) {
this.db = db;
this.batchRequest = batchRequest;
this.executor = executor;
}

@Override
public CompletableFuture<List<V>> process() {
public CompletableFuture<Void> process() {

CompletableFuture<List<V>> future = new CompletableFuture<>();
@SuppressWarnings("unchecked")
V[] result = (V[]) new Object[batchRequest.size()];
Arrays.fill(result, null);
CompletableFuture<Void> future = new CompletableFuture<>();

AtomicInteger counter = new AtomicInteger(batchRequest.size());
for (int i = 0; i < batchRequest.size(); i++) {
GetRequest<K, V> request = batchRequest.get(i);
final int index = i;
ForStDBGetRequest<?, ?> request = batchRequest.get(i);
executor.execute(
() -> {
try {
ForStInnerTable<K, V> table = request.table;
byte[] key = table.serializeKey(request.key);
byte[] value = db.get(table.getColumnFamilyHandle(), key);
if (value != null) {
result[index] = table.deserializeValue(value);
}
byte[] key = request.buildSerializedKey();
byte[] value = db.get(request.getColumnFamilyHandle(), key);
request.completeStateFuture(value);
} catch (Exception e) {
LOG.warn(
"Error when process general multiGet operation for forStDB", e);
future.completeExceptionally(e);
} finally {
if (counter.decrementAndGet() == 0
&& !future.isCompletedExceptionally()) {
future.complete(Arrays.asList(result));
future.complete(null);
}
}
});
}
return future;
}

/** The Get access request for ForStDB. */
static class GetRequest<K, V> {
final K key;
final ForStInnerTable<K, V> table;

private GetRequest(K key, ForStInnerTable<K, V> table) {
this.key = key;
this.table = table;
}

static <K, V> GetRequest<K, V> of(K key, ForStInnerTable<K, V> table) {
return new GetRequest<>(key, table);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,27 @@
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

import javax.annotation.Nullable;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;

/**
* The writeBatch operation implementation for ForStDB.
*
* @param <K> The type of key in put access request.
* @param <V> The type of value in put access request.
*/
public class ForStWriteBatchOperation<K, V> implements ForStDBOperation<Void> {
/** The writeBatch operation implementation for ForStDB. */
public class ForStWriteBatchOperation implements ForStDBOperation {

private static final int PER_RECORD_ESTIMATE_BYTES = 100;

private final RocksDB db;

private final List<PutRequest<K, V>> batchRequest;
private final List<ForStDBPutRequest<?, ?>> batchRequest;

private final WriteOptions writeOptions;

private final Executor executor;

ForStWriteBatchOperation(
RocksDB db,
List<PutRequest<K, V>> batchRequest,
List<ForStDBPutRequest<?, ?>> batchRequest,
WriteOptions writeOptions,
Executor executor) {
this.db = db;
Expand All @@ -64,46 +57,27 @@ public CompletableFuture<Void> process() {
() -> {
try (WriteBatch writeBatch =
new WriteBatch(batchRequest.size() * PER_RECORD_ESTIMATE_BYTES)) {
for (PutRequest<K, V> request : batchRequest) {
ForStInnerTable<K, V> table = request.table;
if (request.value == null) {
for (ForStDBPutRequest<?, ?> request : batchRequest) {
if (request.valueIsNull()) {
// put(key, null) == delete(key)
writeBatch.delete(
table.getColumnFamilyHandle(),
table.serializeKey(request.key));
request.getColumnFamilyHandle(),
request.buildSerializedKey());
} else {
writeBatch.put(
table.getColumnFamilyHandle(),
table.serializeKey(request.key),
table.serializeValue(request.value));
request.getColumnFamilyHandle(),
request.buildSerializedKey(),
request.buildSerializedValue());
}
}
db.write(writeOptions, writeBatch);
for (ForStDBPutRequest<?, ?> request : batchRequest) {
request.completeStateFuture();
}
} catch (Exception e) {
throw new CompletionException("Error while adding data to ForStDB", e);
}
},
executor);
}

/** The Put access request for ForStDB. */
static class PutRequest<K, V> {
final K key;
@Nullable final V value;
final ForStInnerTable<K, V> table;

private PutRequest(K key, V value, ForStInnerTable<K, V> table) {
this.key = key;
this.value = value;
this.table = table;
}

/**
* If the value of the PutRequest is null, then the request will signify the deletion of the
* data associated with that key.
*/
static <K, V> PutRequest<K, V> of(K key, @Nullable V value, ForStInnerTable<K, V> table) {
return new PutRequest<>(key, value, table);
}
}
}
Loading

0 comments on commit 88f6d06

Please sign in to comment.