diff --git a/dev-support/findbugs-exclude.xml b/dev-support/findbugs-exclude.xml index 1f18a83d..01eef014 100644 --- a/dev-support/findbugs-exclude.xml +++ b/dev-support/findbugs-exclude.xml @@ -15,7 +15,8 @@ - + + diff --git a/scripts/format-all.sh b/scripts/format-all.sh index 9209c354..ae95b433 100755 --- a/scripts/format-all.sh +++ b/scripts/format-all.sh @@ -24,13 +24,14 @@ PROJECT_DIR=$(dirname "${SCRIPT_DIR}") cd "${PROJECT_DIR}" || exit 1 SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java + src/main/java/com/xiaomi/infra/pegasus/client/request/batch/*.java + src/main/java/com/xiaomi/infra/pegasus/client/request/range/*.java src/main/java/com/xiaomi/infra/pegasus/metrics/*.java src/main/java/com/xiaomi/infra/pegasus/rpc/*.java src/main/java/com/xiaomi/infra/pegasus/rpc/async/*.java src/main/java/com/xiaomi/infra/pegasus/operator/*.java src/main/java/com/xiaomi/infra/pegasus/tools/*.java src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/*.java - src/main/java/com/xiaomi/infra/pegasus/client/request/*.java src/main/java/com/xiaomi/infra/pegasus/base/*.java src/main/java/com/xiaomi/infra/pegasus/example/*.java src/main/java/com/xiaomi/infra/pegasus/security/*.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 3af2de70..55f3a3eb 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -3,6 +3,8 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.request.batch.Batch; +import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse; import java.util.*; import org.apache.commons.lang3.tuple.Pair; @@ -111,8 +113,7 @@ public PegasusTableInterface openTable(String tableName, TableOptions tableOptio * Batch get values of different keys. Will terminate immediately if any error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)} + * more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)} * @param tableName table name * @param keys hashKey and sortKey pair list. * @param values output values; should be created by caller; if succeed, the size of values will @@ -131,8 +132,7 @@ public void batchGet(String tableName, List> keys, List> values) throws PException; + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, @@ -220,8 +224,7 @@ public boolean multiGet( * occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)} + * more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)} * @param tableName table name * @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all * sortKeys under the hashKey. @@ -242,8 +245,7 @@ public void batchMultiGet( * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List, - * List)} + * BatchWithResponse#commitWaitAllComplete(List, List)} * @param tableName table name * @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all * sortKeys under the hashKey. @@ -307,8 +309,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, * Batch set lots of values. Will terminate immediately if any error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)} + * more. The latest batch operation please see {@link Batch#commit(List)} * @param tableName TableHandler name * @param items list of items. * @throws PException throws exception if any error occurs. @@ -322,8 +323,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, * Batch set lots of values. Will wait for all requests done even if some error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)} + * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)} * @param tableName table name * @param items list of items. * @param results output results; should be created by caller; after call done, the size of @@ -360,8 +360,7 @@ public void multiSet(String tableName, byte[] hashKey, List * occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)} + * more. The latest batch operation please see {@link Batch#commit(List)} * @param tableName TableHandler name * @param items list of items. * @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0. @@ -381,8 +380,7 @@ public void batchMultiSet(String tableName, List items, int ttlSeco * error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)} + * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)} * @param tableName table name * @param items list of items. * @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0. @@ -420,8 +418,7 @@ public int batchMultiSet2(String tableName, List items, List items, List> keys, List> keys, List>> key * if some error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)} + * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)} * @param tableName table name * @param keys List{hashKey,List{sortKey}} * @param results output results; should be created by caller; after call done, the size of diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index f2f69654..ce830a0b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -7,6 +7,7 @@ import com.xiaomi.infra.pegasus.base.blob; import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; +import com.xiaomi.infra.pegasus.client.request.range.GetRange; import com.xiaomi.infra.pegasus.operator.*; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; @@ -43,6 +44,10 @@ public PegasusTable(PegasusClient client, Table table) { this.metaList = client.getMetaList(); } + public int getDefaultTimeout() { + return defaultTimeout; + } + @Override public Future asyncExist(byte[] hashKey, byte[] sortKey, int timeout) { final DefaultPromise promise = table.newPromise(); @@ -1173,16 +1178,13 @@ public int batchMultiGet2( public MultiGetSortKeysResult multiGetSortKeys( byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; - MultiGetSortKeysResult sortKeysResult = new MultiGetSortKeysResult(); - sortKeysResult.keys = new ArrayList<>(); - ScanOptions options = new ScanOptions(); - options.noValue = true; - ScanRangeResult result = scanRange(hashKey, null, null, options, maxFetchCount, timeout); - for (Pair, byte[]> pair : result.results) { - sortKeysResult.keys.add(pair.getLeft().getValue()); - } - sortKeysResult.allFetched = result.allFetched; - return sortKeysResult; + ScanOptions scanOptions = new ScanOptions(); + scanOptions.noValue = true; + GetRange getRange = new GetRange(this, hashKey, timeout); + return getRange + .withOptions(scanOptions) + .commitAndWait(maxFetchCount) + .convertMultiGetSortKeysResult(); } @Override @@ -1818,67 +1820,6 @@ public List getUnorderedScanners( return ret; } - /** - * {@linkplain #scanRange(byte[], byte[], byte[], ScanOptions, int, int)} result, if fetch all - * data for {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true - */ - static class ScanRangeResult { - public List, byte[]>> results; - public boolean allFetched; - } - - /** - * get scan result for {startSortKey, stopSortKey} within hashKey - * - * @param hashKey used to decide which partition to put this k-v, - * @param startSortKey start sort key scan from if null or length == 0, means start from begin - * @param stopSortKey stop sort key scan to if null or length == 0, means stop to end - * @param options scan options like endpoint inclusive/exclusive - * @param maxFetchCount max count of k-v pairs to be fetched. if <=0 means fetch all data for - * {startSortKey, stopSortKey} - * @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with - * "timeout" of config - * @return ScanRangeResult result{pair((hashKey, sortKey), value}, if fetch all data for - * {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true - * @throws PException - */ - ScanRangeResult scanRange( - byte[] hashKey, - byte[] startSortKey, - byte[] stopSortKey, - ScanOptions options, - int maxFetchCount, - int timeout /*ms*/) - throws PException { - if (timeout <= 0) timeout = defaultTimeout; - long deadlineTime = System.currentTimeMillis() + timeout; - - PegasusScannerInterface pegasusScanner = - getScanner(hashKey, startSortKey, stopSortKey, options); - ScanRangeResult scanRangeResult = new ScanRangeResult(); - scanRangeResult.allFetched = false; - scanRangeResult.results = new ArrayList<>(); - if (System.currentTimeMillis() >= deadlineTime) { - throw PException.timeout( - metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException()); - } - - Pair, byte[]> pair; - while ((pair = pegasusScanner.next()) != null - && (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) { - if (System.currentTimeMillis() >= deadlineTime) { - throw PException.timeout( - metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException()); - } - scanRangeResult.results.add(pair); - } - - if (pegasusScanner.next() == null) { - scanRangeResult.allFetched = true; - } - return scanRangeResult; - } - public void handleReplicaException( Request request, DefaultPromise promise, client_operator op, Table table, int timeout) { if (timeout <= 0) timeout = defaultTimeout; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java index f54fbec7..58276e3e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java @@ -3,6 +3,8 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.request.batch.Batch; +import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.List; @@ -219,6 +221,8 @@ public Future asyncMultiGet( /** * get multiple key-values under the same hashKey with sortKey range limited, async version * + * @deprecated the API may can't get all records, please use {@linkplain + * com.xiaomi.infra.pegasus.client.request.range.GetRange} * @param hashKey used to decide which partition the key may exist should not be null or empty. * @param startSortKey the start sort key. null means "". * @param stopSortKey the stop sort key. null or "" means fetch to the last sort key. @@ -238,6 +242,7 @@ public Future asyncMultiGet( * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncMultiGet( byte[] hashKey, byte[] startSortKey, @@ -247,6 +252,7 @@ public Future asyncMultiGet( int maxFetchSize, int timeout /*ms*/); + @Deprecated public Future asyncMultiGet( byte[] hashKey, byte[] startSortKey, @@ -730,8 +736,7 @@ public static interface TTLListener extends GenericFutureListener> keys, List values, int t * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List, - * List)} + * BatchWithResponse#commitWaitAllComplete(List, List)} * @param keys hashKey and sortKey pair list. * @param results output results; should be created by caller; after call done, the size of * results will be same with keys; the results[i] is a Pair: - if Pair.left != null : means @@ -791,7 +795,10 @@ public MultiGetResult multiGet(byte[] hashKey, List sortKeys, int timeou * sync version of MultiGet, please refer to the async version {@link #asyncMultiGet(byte[], * byte[], byte[], MultiGetOptions, int, int, int)} and {@link #asyncMultiGet(byte[], byte[], * byte[], MultiGetOptions, int)} + * + * @deprecated The API may can't get all records */ + @Deprecated public MultiGetResult multiGet( byte[] hashKey, byte[] startSortKey, @@ -802,6 +809,7 @@ public MultiGetResult multiGet( int timeout /*ms*/) throws PException; + @Deprecated public MultiGetResult multiGet( byte[] hashKey, byte[] startSortKey, @@ -815,8 +823,7 @@ public MultiGetResult multiGet( * occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)} + * more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)} * @param keys List{hashKey,List{sortKey}} * @param values output values; should be created by caller; if succeed, the size of values will * be same with keys; the data for keys[i] is stored in values[i]. @@ -837,8 +844,7 @@ public void batchMultiGet( * error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)} + * more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)} * @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all * sortKeys under the hashKey. * @param results output results; should be created by caller; after call done, the size of @@ -890,8 +896,7 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/ * Batch set lots of values. Will terminate immediately if any error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)} + * more. The latest batch operation please see {@link Batch#commit(List)} * @param items list of items. * @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a * timeout value for current op, else the timeout value in the configuration file will be @@ -907,8 +912,7 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/ * Batch set lots of values. Will wait for all requests done even if some error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)} + * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)} * @param items list of items. * @param results output results; should be created by caller; after call done, the size of * results will be same with items; the results[i] is a PException: - if results[i] != null : @@ -942,8 +946,7 @@ public void multiSet(byte[] hashKey, List> values, int time * occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)} + * more. The latest batch operation please see {@link Batch#commit(List)} * @param items list of items. * @param ttlSeconds time to live in seconds, 0 means no ttl. * @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a @@ -962,8 +965,7 @@ public void batchMultiSet(List items, int ttlSeconds, int timeout / * error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)} + * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)} * @param items list of items. * @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0. * @param results output results; should be created by caller; after call done, the size of @@ -992,8 +994,7 @@ public int batchMultiSet2( * Batch delete values of different keys. Will terminate immediately if any error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)} + * more. The latest batch operation please see {@link Batch#commit(List)} * @param keys hashKey and sortKey pair list. * @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a * timeout value for current op, else the timeout value in the configuration file will be @@ -1010,8 +1011,7 @@ public int batchMultiSet2( * occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)} + * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)} * @param keys hashKey and sortKey pair list. * @param results output results; should be created by caller; after call done, the size of * results will be same with keys; the results[i] is a PException: - if results[i] != null : @@ -1062,8 +1062,7 @@ public void delRange( * error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)} + * more. The latest batch operation please see {@link Batch#commit(List)} * @param keys List{hashKey,List{sortKey}} * @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a * timeout value for current op, else the timeout value in the configuration file will be @@ -1081,8 +1080,7 @@ public void batchMultiDel(List>> keys, int timeout /*m * if some error occurs. * * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any - * more. The latest batch operation please see {@link - * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)} + * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)} * @param keys List{hashKey,List{sortKey}} * @param results output results; should be created by caller; after call done, the size of * results will be same with keys; the results[i] is a PException: - if results[i] != null : diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/AbstractBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/AbstractBatch.java similarity index 97% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/AbstractBatch.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/AbstractBatch.java index 20b65fe1..8e52dfae 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/AbstractBatch.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/AbstractBatch.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.FutureGroup; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Batch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Batch.java similarity index 98% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/Batch.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Batch.java index 1e2b693c..068056c6 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Batch.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Batch.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchWithResponse.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/BatchWithResponse.java similarity index 98% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/BatchWithResponse.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/BatchWithResponse.java index 0de78de8..50de7d2e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchWithResponse.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/BatchWithResponse.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.PException; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Delete.java similarity index 95% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Delete.java index 733dd4db..c70b7581 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Delete.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import java.io.Serializable; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/DeleteBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/DeleteBatch.java similarity index 95% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/DeleteBatch.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/DeleteBatch.java index 5c9059b3..a5634474 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/DeleteBatch.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/DeleteBatch.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; import io.netty.util.concurrent.Future; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Get.java similarity index 95% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Get.java index fd1cd23d..b0203ac5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Get.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import java.io.Serializable; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/GetBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/GetBatch.java similarity index 95% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/GetBatch.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/GetBatch.java index 6791bdc9..048d3b51 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/GetBatch.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/GetBatch.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; import io.netty.util.concurrent.Future; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDelete.java similarity index 96% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDelete.java index c6ceedf1..073677a4 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDelete.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import java.io.Serializable; import java.util.ArrayList; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDeleteBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDeleteBatch.java similarity index 95% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDeleteBatch.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDeleteBatch.java index 7c006755..d3a95391 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDeleteBatch.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDeleteBatch.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; import io.netty.util.concurrent.Future; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGet.java similarity index 96% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGet.java index d14d8ae2..fbc43096 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGet.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import java.io.Serializable; import java.util.ArrayList; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGetBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGetBatch.java similarity index 96% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGetBatch.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGetBatch.java index 7a940f53..c42bf621 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGetBatch.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGetBatch.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSet.java similarity index 97% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSet.java index 28b641c9..7b128ba4 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSet.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import java.io.Serializable; import java.util.ArrayList; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSetBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSetBatch.java similarity index 95% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSetBatch.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSetBatch.java index 9e5ec035..f8feedc7 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSetBatch.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSetBatch.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; import io.netty.util.concurrent.Future; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Set.java similarity index 96% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Set.java index 40728fee..190db8c9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Set.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import java.io.Serializable; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/SetBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/SetBatch.java similarity index 95% rename from src/main/java/com/xiaomi/infra/pegasus/client/request/SetBatch.java rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/SetBatch.java index d77213c8..4c2a1e7d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/request/SetBatch.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/SetBatch.java @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.xiaomi.infra.pegasus.client.request; +package com.xiaomi.infra.pegasus.client.request.batch; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; import io.netty.util.concurrent.Future; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/range/GetRange.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/GetRange.java new file mode 100644 index 00000000..4aacece5 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/GetRange.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.xiaomi.infra.pegasus.client.request.range; + +import com.xiaomi.infra.pegasus.client.PException; +import com.xiaomi.infra.pegasus.client.PegasusScannerInterface; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface; +import java.util.concurrent.*; +import org.apache.commons.lang3.tuple.Pair; + +public class GetRange extends Range { + + public GetRange(PegasusTableInterface table, byte[] hashKey, int timeout) { + super(table, hashKey, timeout); + } + + public ScanResult commitAndWait(int maxFetchCount) throws PException { + ScanResult scanResult; + try { + scanResult = + CompletableFuture.supplyAsync( + () -> { + ScanResult res = new ScanResult(); + try { + PegasusScannerInterface scanner = + table.getScanner(hashKey, startSortKey, stopSortKey, scanOptions); + Pair, byte[]> pair; + while ((pair = scanner.next()) != null + && (maxFetchCount <= 0 || res.results.size() < maxFetchCount)) { + res.results.add(pair); + } + if (pair == null) { + res.allFetched = true; + } + return res; + } catch (PException e) { + return new ScanResult(e); + } + }) + .get(scanOptions.timeoutMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new PException(e); + } + + if (scanResult != null && scanResult.exception != null) { + throw scanResult.exception; + } + return scanResult; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/range/Range.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/Range.java new file mode 100644 index 00000000..cf61283a --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/Range.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.xiaomi.infra.pegasus.client.request.range; + +import com.xiaomi.infra.pegasus.client.PException; +import com.xiaomi.infra.pegasus.client.PegasusTable; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface; +import com.xiaomi.infra.pegasus.client.ScanOptions; + +/** + * range operation abstract class based {@linkplain PegasusTableInterface#getScanner(byte[], byte[], + * byte[], ScanOptions)} it is implemented by {@linkplain DeleteRange} and {@linkplain GetRange} + * + * @param generic type for response + */ +public abstract class Range { + public PegasusTableInterface table; + protected byte[] hashKey; + protected int timeout; + + protected ScanOptions scanOptions = new ScanOptions(); + protected byte[] startSortKey; + protected byte[] stopSortKey; + + /** + * @param table table opened + * @param hashKey hashKey used to decide which partition to put this k-v + * @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with + * "timeout" of config + */ + public Range(PegasusTableInterface table, byte[] hashKey, int timeout) { + this.table = table; + this.hashKey = hashKey; + this.timeout = timeout <= 0 ? ((PegasusTable) table).getDefaultTimeout() : timeout; + } + + /** + * @param maxRangeCount the max count of range operation result + * @return generic type for response + * @throws PException + */ + public abstract Response commitAndWait(int maxRangeCount) throws PException; + + /** + * set scan options + * + * @param scanOptions see {@linkplain ScanOptions} + * @return this + */ + public Range withOptions(ScanOptions scanOptions) { + this.scanOptions = scanOptions; + return this; + } + + /** + * set startSortKey + * + * @param startSortKey start sort key scan from if null or length == 0, means start from begin + * @param stopSortKey stop sort key scan to if null or length == 0, means stop to end + * @return this + */ + public Range withSortKeyRange(byte[] startSortKey, byte[] stopSortKey) { + this.startSortKey = startSortKey; + this.stopSortKey = stopSortKey; + return this; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/range/ScanResult.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/ScanResult.java new file mode 100644 index 00000000..1511f66b --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/ScanResult.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.xiaomi.infra.pegasus.client.request.range; + +import static com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult; +import static com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetSortKeysResult; + +import com.xiaomi.infra.pegasus.client.*; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang3.tuple.Pair; + +public class ScanResult { + public List, byte[]>> results; + public boolean allFetched; + public PException exception; + + public ScanResult() { + this(new ArrayList<>(), false); + } + + public ScanResult(List, byte[]>> results, boolean allFetched) { + this.results = results; + this.allFetched = allFetched; + } + + public ScanResult(PException exception) { + this.exception = exception; + } + + public MultiGetResult convertMultiGetResult() { + MultiGetResult multiGetResult = new MultiGetResult(); + if (results == null) { + return multiGetResult; + } + multiGetResult.values = new ArrayList<>(); + for (Pair, byte[]> pair : results) { + multiGetResult.values.add(Pair.of(pair.getLeft().getValue(), pair.getValue())); + } + multiGetResult.allFetched = allFetched; + return multiGetResult; + } + + public MultiGetSortKeysResult convertMultiGetSortKeysResult() { + MultiGetSortKeysResult multiGetSortKeysResult = new MultiGetSortKeysResult(); + if (results == null) { + return multiGetSortKeysResult; + } + multiGetSortKeysResult.keys = new ArrayList<>(); + for (Pair, byte[]> pair : results) { + multiGetSortKeysResult.keys.add(pair.getLeft().getValue()); + } + multiGetSortKeysResult.allFetched = allFetched; + return multiGetSortKeysResult; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/example/BatchSample.java b/src/main/java/com/xiaomi/infra/pegasus/example/BatchSample.java index d8c93664..9a9552af 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/example/BatchSample.java +++ b/src/main/java/com/xiaomi/infra/pegasus/example/BatchSample.java @@ -22,11 +22,11 @@ import com.xiaomi.infra.pegasus.client.PegasusClientFactory; import com.xiaomi.infra.pegasus.client.PegasusClientInterface; import com.xiaomi.infra.pegasus.client.PegasusTableInterface; -import com.xiaomi.infra.pegasus.client.request.BatchWithResponse; -import com.xiaomi.infra.pegasus.client.request.Get; -import com.xiaomi.infra.pegasus.client.request.GetBatch; -import com.xiaomi.infra.pegasus.client.request.Set; -import com.xiaomi.infra.pegasus.client.request.SetBatch; +import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse; +import com.xiaomi.infra.pegasus.client.request.batch.Get; +import com.xiaomi.infra.pegasus.client.request.batch.GetBatch; +import com.xiaomi.infra.pegasus.client.request.batch.Set; +import com.xiaomi.infra.pegasus.client.request.batch.SetBatch; import io.netty.util.concurrent.Future; import java.util.ArrayList; import java.util.List; diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index 563e7899..740b4462 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -4,7 +4,6 @@ package com.xiaomi.infra.pegasus.client; /** @author qinzuoyan */ -import com.xiaomi.infra.pegasus.client.PegasusTable.ScanRangeResult; import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetSortKeysResult; import io.netty.util.concurrent.Future; import java.util.ArrayList; @@ -2632,8 +2631,8 @@ private void testWriteSizeLimit(PegasusClientInterface client) { } } - @Test // test for making sure return "maxFetchCount" if has "maxFetchCount" valid record - public void testScanRangeWithValueExpired() throws PException, InterruptedException { + @Test + public void testGetSortKeysWithExpiredValue() throws PException, InterruptedException { String tableName = "temp"; String hashKey = "hashKey"; // generate records: sortKeys=[expired_0....expired_999,persistent_0...persistent_9] @@ -2641,78 +2640,18 @@ public void testScanRangeWithValueExpired() throws PException, InterruptedExcept PegasusTable table = (PegasusTable) PegasusClientFactory.getSingletonClient().openTable(tableName); - // case A: scan all record - // case A1: scan all record: if persistent record count >= maxFetchCount, it must return - // maxFetchCount records - ScanRangeResult caseA1 = - table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 5, 0); - assertScanResult(0, 4, false, caseA1); - // case A2: scan all record: if persistent record count < maxFetchCount, it only return - // persistent count records - ScanRangeResult caseA2 = - table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 100, 0); - assertScanResult(0, 9, true, caseA2); - - // case B: scan limit record by "startSortKey" and "": - // case B1: scan limit record by "expired_0" and "", if persistent record count >= - // maxFetchCount, it must return maxFetchCount records - ScanRangeResult caseB1 = - table.scanRange( - hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new ScanOptions(), 5, 0); - assertScanResult(0, 4, false, caseB1); - // case B2: scan limit record by "expired_0" and "", if persistent record count < maxFetchCount, - // it only return valid records - ScanRangeResult caseB2 = - table.scanRange( - hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new ScanOptions(), 50, 0); - assertScanResult(0, 9, true, caseB2); - // case B3: scan limit record by "persistent_5" and "", if following persistent record count < - // maxFetchCount, it only return valid records - ScanRangeResult caseB3 = - table.scanRange( - hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new ScanOptions(), 50, 0); - assertScanResult(5, 9, true, caseB3); - // case B4: scan limit record by "persistent_5" and "", if following persistent record count > - // maxFetchCount, it only return valid records - ScanRangeResult caseB4 = - table.scanRange( - hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new ScanOptions(), 3, 0); - assertScanResult(5, 7, false, caseB4); - - // case C: scan limit record by "" and "stopSortKey": - // case C1: scan limit record by "" and "expired_7", if will return 0 record - ScanRangeResult caseC1 = - table.scanRange( - hashKey.getBytes(), "".getBytes(), "expired_7".getBytes(), new ScanOptions(), 3, 0); - Assert.assertTrue(caseC1.allFetched); - Assert.assertEquals(0, caseC1.results.size()); // among "" and "expired_7" has 0 valid record - // case C2: scan limit record by "" and "persistent_7", if valid record count < maxFetchCount, - // it only return valid record - ScanRangeResult caseC2 = - table.scanRange( - hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new ScanOptions(), 10, 0); - assertScanResult(0, 6, true, caseC2); - // case C3: scan limit record by "" and "persistent_7", if valid record count > maxFetchCount, - // it only return valid record - ScanRangeResult caseC3 = - table.scanRange( - hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new ScanOptions(), 2, 0); - assertScanResult(0, 1, false, caseC3); - - // case D: use multiGetSortKeys, which actually equal with case A but no value - // case D1: maxFetchCount > 0, return maxFetchCount valid record - MultiGetSortKeysResult caseD1 = table.multiGetSortKeys(hashKey.getBytes(), 5, -1, 0); - Assert.assertFalse(caseD1.allFetched); - Assert.assertEquals(5, caseD1.keys.size()); + MultiGetSortKeysResult result1 = table.multiGetSortKeys(hashKey.getBytes(), 5, -1, 0); + Assertions.assertFalse(result1.allFetched); + Assertions.assertEquals(5, result1.keys.size()); for (int i = 0; i <= 4; i++) { - Assertions.assertEquals("persistent_" + i, new String(caseD1.keys.get(i))); + Assertions.assertEquals("persistent_" + i, new String(result1.keys.get(i))); } // case D1: maxFetchCount < 0, return all valid record - MultiGetSortKeysResult caseD2 = table.multiGetSortKeys(hashKey.getBytes(), 10, -1, 0); - Assert.assertTrue(caseD2.allFetched); - Assert.assertEquals(10, caseD2.keys.size()); + MultiGetSortKeysResult result2 = table.multiGetSortKeys(hashKey.getBytes(), 10, -1, 0); + Assertions.assertTrue(result2.allFetched); + Assertions.assertEquals(10, result2.keys.size()); for (int i = 0; i <= 9; i++) { - Assertions.assertEquals("persistent_" + i, new String(caseD2.keys.get(i))); + Assertions.assertEquals("persistent_" + i, new String(result2.keys.get(i))); } } @@ -2742,20 +2681,4 @@ private void generateRecordsWithExpired( } PegasusClientFactory.closeSingletonClient(); } - - private void assertScanResult( - int startIndex, int stopIndex, boolean expectAllFetched, ScanRangeResult actuallyRes) { - Assertions.assertEquals(expectAllFetched, actuallyRes.allFetched); - Assertions.assertEquals(stopIndex - startIndex + 1, actuallyRes.results.size()); - for (int i = startIndex; i <= stopIndex; i++) { - Assertions.assertEquals( - "hashKey", new String(actuallyRes.results.get(i - startIndex).getLeft().getKey())); - Assertions.assertEquals( - "persistent_" + i, - new String(actuallyRes.results.get(i - startIndex).getLeft().getValue())); - Assertions.assertEquals( - "persistent_" + i + "_value", - new String(actuallyRes.results.get(i - startIndex).getRight())); - } - } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBatch.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBatch.java index 6f7d0e30..040e0ebf 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBatch.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBatch.java @@ -17,19 +17,19 @@ package com.xiaomi.infra.pegasus.client; import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult; -import com.xiaomi.infra.pegasus.client.request.BatchWithResponse; -import com.xiaomi.infra.pegasus.client.request.Delete; -import com.xiaomi.infra.pegasus.client.request.DeleteBatch; -import com.xiaomi.infra.pegasus.client.request.Get; -import com.xiaomi.infra.pegasus.client.request.GetBatch; -import com.xiaomi.infra.pegasus.client.request.MultiDelete; -import com.xiaomi.infra.pegasus.client.request.MultiDeleteBatch; -import com.xiaomi.infra.pegasus.client.request.MultiGet; -import com.xiaomi.infra.pegasus.client.request.MultiGetBatch; -import com.xiaomi.infra.pegasus.client.request.MultiSet; -import com.xiaomi.infra.pegasus.client.request.MultiSetBatch; -import com.xiaomi.infra.pegasus.client.request.Set; -import com.xiaomi.infra.pegasus.client.request.SetBatch; +import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse; +import com.xiaomi.infra.pegasus.client.request.batch.Delete; +import com.xiaomi.infra.pegasus.client.request.batch.DeleteBatch; +import com.xiaomi.infra.pegasus.client.request.batch.Get; +import com.xiaomi.infra.pegasus.client.request.batch.GetBatch; +import com.xiaomi.infra.pegasus.client.request.batch.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.batch.MultiDeleteBatch; +import com.xiaomi.infra.pegasus.client.request.batch.MultiGet; +import com.xiaomi.infra.pegasus.client.request.batch.MultiGetBatch; +import com.xiaomi.infra.pegasus.client.request.batch.MultiSet; +import com.xiaomi.infra.pegasus.client.request.batch.MultiSetBatch; +import com.xiaomi.infra.pegasus.client.request.batch.Set; +import com.xiaomi.infra.pegasus.client.request.batch.SetBatch; import io.netty.util.concurrent.Future; import java.util.ArrayList; import java.util.List; diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestRange.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestRange.java new file mode 100644 index 00000000..67355d02 --- /dev/null +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestRange.java @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.xiaomi.infra.pegasus.client; + +import com.xiaomi.infra.pegasus.client.request.range.GetRange; +import com.xiaomi.infra.pegasus.client.request.range.ScanResult; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestRange { + + @Test // test for making sure return "maxFetchCount" if has "maxFetchCount" valid record + public void testScanRangeWithValueExpired() + throws PException, InterruptedException, TimeoutException, ExecutionException { + String tableName = "temp"; + String hashKey = "hashKey"; + // generate records: sortKeys=[expired_0....expired_999,persistent_0...persistent_9] + generateRecordsWithExpired(tableName, hashKey, 1000, 10); + + PegasusTableInterface table = PegasusClientFactory.getSingletonClient().openTable(tableName); + GetRange getRange = new GetRange(table, hashKey.getBytes(), 0); + // case A: scan all record + // case A1: scan all record: if persistent record count >= maxFetchCount, it must return + // maxFetchCount records + ScanResult caseA1 = getRange.commitAndWait(5); + assertScanResult(0, 4, false, caseA1); + // case A2: scan all record: if persistent record count < maxFetchCount, it only return + // persistent count records + ScanResult caseA2 = getRange.commitAndWait(100); + assertScanResult(0, 9, true, caseA2); + + // case B: scan limit record by "startSortKey" and "": + // case B1: scan limit record by "expired_0" and "", if persistent record count >= + // maxFetchCount, it must return maxFetchCount records + ScanResult caseB1 = getRange.withSortKeyRange("expired_0".getBytes(), null).commitAndWait(5); + assertScanResult(0, 4, false, caseB1); + // case B2: scan limit record by "expired_0" and "", if persistent record count < maxFetchCount, + // it only return valid records + ScanResult caseB2 = getRange.withSortKeyRange("expired_0".getBytes(), null).commitAndWait(50); + assertScanResult(0, 9, true, caseB2); + // case B3: scan limit record by "persistent_5" and "", if following persistent record count < + // maxFetchCount, it only return valid records + ScanResult caseB3 = + getRange.withSortKeyRange("persistent_5".getBytes(), null).commitAndWait(50); + assertScanResult(5, 9, true, caseB3); + // case B4: scan limit record by "persistent_5" and "", if following persistent record count > + // maxFetchCount, it only return valid records + ScanResult caseB4 = getRange.withSortKeyRange("persistent_5".getBytes(), null).commitAndWait(3); + assertScanResult(5, 7, false, caseB4); + + // case C: scan limit record by "" and "stopSortKey": + // case C1: scan limit record by "" and "expired_7", if will return 0 record + ScanResult caseC1 = getRange.withSortKeyRange(null, "expired_7".getBytes()).commitAndWait(3); + Assertions.assertTrue(caseC1.allFetched); + Assertions.assertEquals( + 0, caseC1.results.size()); // among "" and "expired_7" has 0 valid record + // case C2: scan limit record by "" and "persistent_7", if valid record count < maxFetchCount, + // it only return valid record + ScanResult caseC2 = + getRange.withSortKeyRange(null, "persistent_7".getBytes()).commitAndWait(10); + assertScanResult(0, 6, true, caseC2); + // case C3: scan limit record by "" and "persistent_7", if valid record count > maxFetchCount, + // it only return valid record + ScanResult caseC3 = getRange.withSortKeyRange(null, "persistent_7".getBytes()).commitAndWait(2); + assertScanResult(0, 1, false, caseC3); + } + + private void generateRecordsWithExpired( + String tableName, String hashKey, int expiredCount, int persistentCount) + throws PException, InterruptedException { + PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); + // assign prefix to make sure the expire record is stored front of persistent + String expiredSortKeyPrefix = "expired_"; + String persistentSortKeyPrefix = "persistent_"; + while (expiredCount-- > 0) { + client.set( + tableName, + hashKey.getBytes(), + (expiredSortKeyPrefix + expiredCount).getBytes(), + (expiredSortKeyPrefix + expiredCount + "_value").getBytes(), + 1); + } + // sleep to make sure the record is expired + Thread.sleep(1000); + while (persistentCount-- > 0) { + client.set( + tableName, + hashKey.getBytes(), + (persistentSortKeyPrefix + persistentCount).getBytes(), + (persistentSortKeyPrefix + persistentCount + "_value").getBytes()); + } + PegasusClientFactory.closeSingletonClient(); + } + + private void assertScanResult( + int startIndex, int stopIndex, boolean expectAllFetched, ScanResult actuallyRes) { + Assertions.assertEquals(expectAllFetched, actuallyRes.allFetched); + Assertions.assertEquals(stopIndex - startIndex + 1, actuallyRes.results.size()); + for (int i = startIndex; i <= stopIndex; i++) { + Assertions.assertEquals( + "hashKey", new String(actuallyRes.results.get(i - startIndex).getLeft().getKey())); + Assertions.assertEquals( + "persistent_" + i, + new String(actuallyRes.results.get(i - startIndex).getLeft().getValue())); + Assertions.assertEquals( + "persistent_" + i + "_value", + new String(actuallyRes.results.get(i - startIndex).getRight())); + } + } +}