diff --git a/dev-support/findbugs-exclude.xml b/dev-support/findbugs-exclude.xml
index 1f18a83d..01eef014 100644
--- a/dev-support/findbugs-exclude.xml
+++ b/dev-support/findbugs-exclude.xml
@@ -15,7 +15,8 @@
-
+
+
diff --git a/scripts/format-all.sh b/scripts/format-all.sh
index 9209c354..ae95b433 100755
--- a/scripts/format-all.sh
+++ b/scripts/format-all.sh
@@ -24,13 +24,14 @@ PROJECT_DIR=$(dirname "${SCRIPT_DIR}")
cd "${PROJECT_DIR}" || exit 1
SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java
+ src/main/java/com/xiaomi/infra/pegasus/client/request/batch/*.java
+ src/main/java/com/xiaomi/infra/pegasus/client/request/range/*.java
src/main/java/com/xiaomi/infra/pegasus/metrics/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/async/*.java
src/main/java/com/xiaomi/infra/pegasus/operator/*.java
src/main/java/com/xiaomi/infra/pegasus/tools/*.java
src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/*.java
- src/main/java/com/xiaomi/infra/pegasus/client/request/*.java
src/main/java/com/xiaomi/infra/pegasus/base/*.java
src/main/java/com/xiaomi/infra/pegasus/example/*.java
src/main/java/com/xiaomi/infra/pegasus/security/*.java
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java
index 3af2de70..55f3a3eb 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java
@@ -3,6 +3,8 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;
+import com.xiaomi.infra.pegasus.client.request.batch.Batch;
+import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse;
import java.util.*;
import org.apache.commons.lang3.tuple.Pair;
@@ -111,8 +113,7 @@ public PegasusTableInterface openTable(String tableName, TableOptions tableOptio
* Batch get values of different keys. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
+ * more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)}
* @param tableName table name
* @param keys hashKey and sortKey pair list.
* @param values output values; should be created by caller; if succeed, the size of values will
@@ -131,8 +132,7 @@ public void batchGet(String tableName, List> keys, List> values)
throws PException;
+ @Deprecated
public boolean multiGet(
String tableName,
byte[] hashKey,
@@ -220,8 +224,7 @@ public boolean multiGet(
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
+ * more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
@@ -242,8 +245,7 @@ public void batchMultiGet(
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List,
- * List)}
+ * BatchWithResponse#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
@@ -307,8 +309,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value,
* Batch set lots of values. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
+ * more. The latest batch operation please see {@link Batch#commit(List)}
* @param tableName TableHandler name
* @param items list of items.
* @throws PException throws exception if any error occurs.
@@ -322,8 +323,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value,
* Batch set lots of values. Will wait for all requests done even if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
+ * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param items list of items.
* @param results output results; should be created by caller; after call done, the size of
@@ -360,8 +360,7 @@ public void multiSet(String tableName, byte[] hashKey, List
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
+ * more. The latest batch operation please see {@link Batch#commit(List)}
* @param tableName TableHandler name
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
@@ -381,8 +380,7 @@ public void batchMultiSet(String tableName, List items, int ttlSeco
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
+ * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
@@ -420,8 +418,7 @@ public int batchMultiSet2(String tableName, List items, List items, List> keys, List> keys, List>> key
* if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
+ * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param tableName table name
* @param keys List{hashKey,List{sortKey}}
* @param results output results; should be created by caller; after call done, the size of
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
index f2f69654..ce830a0b 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
@@ -7,6 +7,7 @@
import com.xiaomi.infra.pegasus.base.blob;
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.base.gpid;
+import com.xiaomi.infra.pegasus.client.request.range.GetRange;
import com.xiaomi.infra.pegasus.operator.*;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
@@ -43,6 +44,10 @@ public PegasusTable(PegasusClient client, Table table) {
this.metaList = client.getMetaList();
}
+ public int getDefaultTimeout() {
+ return defaultTimeout;
+ }
+
@Override
public Future asyncExist(byte[] hashKey, byte[] sortKey, int timeout) {
final DefaultPromise promise = table.newPromise();
@@ -1173,16 +1178,13 @@ public int batchMultiGet2(
public MultiGetSortKeysResult multiGetSortKeys(
byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout) throws PException {
if (timeout <= 0) timeout = defaultTimeout;
- MultiGetSortKeysResult sortKeysResult = new MultiGetSortKeysResult();
- sortKeysResult.keys = new ArrayList<>();
- ScanOptions options = new ScanOptions();
- options.noValue = true;
- ScanRangeResult result = scanRange(hashKey, null, null, options, maxFetchCount, timeout);
- for (Pair, byte[]> pair : result.results) {
- sortKeysResult.keys.add(pair.getLeft().getValue());
- }
- sortKeysResult.allFetched = result.allFetched;
- return sortKeysResult;
+ ScanOptions scanOptions = new ScanOptions();
+ scanOptions.noValue = true;
+ GetRange getRange = new GetRange(this, hashKey, timeout);
+ return getRange
+ .withOptions(scanOptions)
+ .commitAndWait(maxFetchCount)
+ .convertMultiGetSortKeysResult();
}
@Override
@@ -1818,67 +1820,6 @@ public List getUnorderedScanners(
return ret;
}
- /**
- * {@linkplain #scanRange(byte[], byte[], byte[], ScanOptions, int, int)} result, if fetch all
- * data for {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true
- */
- static class ScanRangeResult {
- public List, byte[]>> results;
- public boolean allFetched;
- }
-
- /**
- * get scan result for {startSortKey, stopSortKey} within hashKey
- *
- * @param hashKey used to decide which partition to put this k-v,
- * @param startSortKey start sort key scan from if null or length == 0, means start from begin
- * @param stopSortKey stop sort key scan to if null or length == 0, means stop to end
- * @param options scan options like endpoint inclusive/exclusive
- * @param maxFetchCount max count of k-v pairs to be fetched. if <=0 means fetch all data for
- * {startSortKey, stopSortKey}
- * @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with
- * "timeout" of config
- * @return ScanRangeResult result{pair((hashKey, sortKey), value}, if fetch all data for
- * {startSortKey, stopSortKey}, ScanRangeResult.allFetched=true
- * @throws PException
- */
- ScanRangeResult scanRange(
- byte[] hashKey,
- byte[] startSortKey,
- byte[] stopSortKey,
- ScanOptions options,
- int maxFetchCount,
- int timeout /*ms*/)
- throws PException {
- if (timeout <= 0) timeout = defaultTimeout;
- long deadlineTime = System.currentTimeMillis() + timeout;
-
- PegasusScannerInterface pegasusScanner =
- getScanner(hashKey, startSortKey, stopSortKey, options);
- ScanRangeResult scanRangeResult = new ScanRangeResult();
- scanRangeResult.allFetched = false;
- scanRangeResult.results = new ArrayList<>();
- if (System.currentTimeMillis() >= deadlineTime) {
- throw PException.timeout(
- metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException());
- }
-
- Pair, byte[]> pair;
- while ((pair = pegasusScanner.next()) != null
- && (maxFetchCount <= 0 || scanRangeResult.results.size() < maxFetchCount)) {
- if (System.currentTimeMillis() >= deadlineTime) {
- throw PException.timeout(
- metaList, table.getTableName(), new Request(hashKey), timeout, new TimeoutException());
- }
- scanRangeResult.results.add(pair);
- }
-
- if (pegasusScanner.next() == null) {
- scanRangeResult.allFetched = true;
- }
- return scanRangeResult;
- }
-
public void handleReplicaException(
Request request, DefaultPromise promise, client_operator op, Table table, int timeout) {
if (timeout <= 0) timeout = defaultTimeout;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java
index f54fbec7..58276e3e 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java
@@ -3,6 +3,8 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;
+import com.xiaomi.infra.pegasus.client.request.batch.Batch;
+import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.List;
@@ -219,6 +221,8 @@ public Future asyncMultiGet(
/**
* get multiple key-values under the same hashKey with sortKey range limited, async version
*
+ * @deprecated the API may can't get all records, please use {@linkplain
+ * com.xiaomi.infra.pegasus.client.request.range.GetRange}
* @param hashKey used to decide which partition the key may exist should not be null or empty.
* @param startSortKey the start sort key. null means "".
* @param stopSortKey the stop sort key. null or "" means fetch to the last sort key.
@@ -238,6 +242,7 @@ public Future asyncMultiGet(
* the same order as the listeners added. But listeners for different tables are not
* guaranteed to be dispatched in the same thread.
*/
+ @Deprecated
public Future asyncMultiGet(
byte[] hashKey,
byte[] startSortKey,
@@ -247,6 +252,7 @@ public Future asyncMultiGet(
int maxFetchSize,
int timeout /*ms*/);
+ @Deprecated
public Future asyncMultiGet(
byte[] hashKey,
byte[] startSortKey,
@@ -730,8 +736,7 @@ public static interface TTLListener extends GenericFutureListener> keys, List values, int t
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commitWaitAllComplete(List,
- * List)}
+ * BatchWithResponse#commitWaitAllComplete(List, List)}
* @param keys hashKey and sortKey pair list.
* @param results output results; should be created by caller; after call done, the size of
* results will be same with keys; the results[i] is a Pair: - if Pair.left != null : means
@@ -791,7 +795,10 @@ public MultiGetResult multiGet(byte[] hashKey, List sortKeys, int timeou
* sync version of MultiGet, please refer to the async version {@link #asyncMultiGet(byte[],
* byte[], byte[], MultiGetOptions, int, int, int)} and {@link #asyncMultiGet(byte[], byte[],
* byte[], MultiGetOptions, int)}
+ *
+ * @deprecated The API may can't get all records
*/
+ @Deprecated
public MultiGetResult multiGet(
byte[] hashKey,
byte[] startSortKey,
@@ -802,6 +809,7 @@ public MultiGetResult multiGet(
int timeout /*ms*/)
throws PException;
+ @Deprecated
public MultiGetResult multiGet(
byte[] hashKey,
byte[] startSortKey,
@@ -815,8 +823,7 @@ public MultiGetResult multiGet(
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
+ * more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)}
* @param keys List{hashKey,List{sortKey}}
* @param values output values; should be created by caller; if succeed, the size of values will
* be same with keys; the data for keys[i] is stored in values[i].
@@ -837,8 +844,7 @@ public void batchMultiGet(
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.BatchWithResponse#commit(List, List)}
+ * more. The latest batch operation please see {@link BatchWithResponse#commit(List, List)}
* @param keys List{hashKey,List{sortKey}}; if List{sortKey} is null or empty, means fetch all
* sortKeys under the hashKey.
* @param results output results; should be created by caller; after call done, the size of
@@ -890,8 +896,7 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/
* Batch set lots of values. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
+ * more. The latest batch operation please see {@link Batch#commit(List)}
* @param items list of items.
* @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a
* timeout value for current op, else the timeout value in the configuration file will be
@@ -907,8 +912,7 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/
* Batch set lots of values. Will wait for all requests done even if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
+ * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param items list of items.
* @param results output results; should be created by caller; after call done, the size of
* results will be same with items; the results[i] is a PException: - if results[i] != null :
@@ -942,8 +946,7 @@ public void multiSet(byte[] hashKey, List> values, int time
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
+ * more. The latest batch operation please see {@link Batch#commit(List)}
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl.
* @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a
@@ -962,8 +965,7 @@ public void batchMultiSet(List items, int ttlSeconds, int timeout /
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
+ * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param items list of items.
* @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0.
* @param results output results; should be created by caller; after call done, the size of
@@ -992,8 +994,7 @@ public int batchMultiSet2(
* Batch delete values of different keys. Will terminate immediately if any error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
+ * more. The latest batch operation please see {@link Batch#commit(List)}
* @param keys hashKey and sortKey pair list.
* @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a
* timeout value for current op, else the timeout value in the configuration file will be
@@ -1010,8 +1011,7 @@ public int batchMultiSet2(
* occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
+ * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param keys hashKey and sortKey pair list.
* @param results output results; should be created by caller; after call done, the size of
* results will be same with keys; the results[i] is a PException: - if results[i] != null :
@@ -1062,8 +1062,7 @@ public void delRange(
* error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commit(List)}
+ * more. The latest batch operation please see {@link Batch#commit(List)}
* @param keys List{hashKey,List{sortKey}}
* @param timeout how long will the operation timeout in milliseconds. if timeout > 0, it is a
* timeout value for current op, else the timeout value in the configuration file will be
@@ -1081,8 +1080,7 @@ public void batchMultiDel(List>> keys, int timeout /*m
* if some error occurs.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
- * more. The latest batch operation please see {@link
- * com.xiaomi.infra.pegasus.client.request.Batch#commitWaitAllComplete(List, List)}
+ * more. The latest batch operation please see {@link Batch#commitWaitAllComplete(List, List)}
* @param keys List{hashKey,List{sortKey}}
* @param results output results; should be created by caller; after call done, the size of
* results will be same with keys; the results[i] is a PException: - if results[i] != null :
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/AbstractBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/AbstractBatch.java
similarity index 97%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/AbstractBatch.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/AbstractBatch.java
index 20b65fe1..8e52dfae 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/AbstractBatch.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/AbstractBatch.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.FutureGroup;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Batch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Batch.java
similarity index 98%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/Batch.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Batch.java
index 1e2b693c..068056c6 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Batch.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Batch.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchWithResponse.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/BatchWithResponse.java
similarity index 98%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/BatchWithResponse.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/BatchWithResponse.java
index 0de78de8..50de7d2e 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchWithResponse.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/BatchWithResponse.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Delete.java
similarity index 95%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Delete.java
index 733dd4db..c70b7581 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Delete.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import java.io.Serializable;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/DeleteBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/DeleteBatch.java
similarity index 95%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/DeleteBatch.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/DeleteBatch.java
index 5c9059b3..a5634474 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/DeleteBatch.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/DeleteBatch.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import io.netty.util.concurrent.Future;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Get.java
similarity index 95%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Get.java
index fd1cd23d..b0203ac5 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Get.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import java.io.Serializable;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/GetBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/GetBatch.java
similarity index 95%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/GetBatch.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/GetBatch.java
index 6791bdc9..048d3b51 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/GetBatch.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/GetBatch.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import io.netty.util.concurrent.Future;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDelete.java
similarity index 96%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDelete.java
index c6ceedf1..073677a4 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDelete.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import java.io.Serializable;
import java.util.ArrayList;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDeleteBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDeleteBatch.java
similarity index 95%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDeleteBatch.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDeleteBatch.java
index 7c006755..d3a95391 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDeleteBatch.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiDeleteBatch.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import io.netty.util.concurrent.Future;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGet.java
similarity index 96%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGet.java
index d14d8ae2..fbc43096 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGet.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import java.io.Serializable;
import java.util.ArrayList;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGetBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGetBatch.java
similarity index 96%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGetBatch.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGetBatch.java
index 7a940f53..c42bf621 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGetBatch.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiGetBatch.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSet.java
similarity index 97%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSet.java
index 28b641c9..7b128ba4 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSet.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import java.io.Serializable;
import java.util.ArrayList;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSetBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSetBatch.java
similarity index 95%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSetBatch.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSetBatch.java
index 9e5ec035..f8feedc7 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSetBatch.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/MultiSetBatch.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import io.netty.util.concurrent.Future;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Set.java
similarity index 96%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Set.java
index 40728fee..190db8c9 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/Set.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import java.io.Serializable;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/SetBatch.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/SetBatch.java
similarity index 95%
rename from src/main/java/com/xiaomi/infra/pegasus/client/request/SetBatch.java
rename to src/main/java/com/xiaomi/infra/pegasus/client/request/batch/SetBatch.java
index d77213c8..4c2a1e7d 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/request/SetBatch.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/batch/SetBatch.java
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package com.xiaomi.infra.pegasus.client.request;
+package com.xiaomi.infra.pegasus.client.request.batch;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import io.netty.util.concurrent.Future;
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/range/GetRange.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/GetRange.java
new file mode 100644
index 00000000..4aacece5
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/GetRange.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.xiaomi.infra.pegasus.client.request.range;
+
+import com.xiaomi.infra.pegasus.client.PException;
+import com.xiaomi.infra.pegasus.client.PegasusScannerInterface;
+import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
+import java.util.concurrent.*;
+import org.apache.commons.lang3.tuple.Pair;
+
+public class GetRange extends Range {
+
+ public GetRange(PegasusTableInterface table, byte[] hashKey, int timeout) {
+ super(table, hashKey, timeout);
+ }
+
+ public ScanResult commitAndWait(int maxFetchCount) throws PException {
+ ScanResult scanResult;
+ try {
+ scanResult =
+ CompletableFuture.supplyAsync(
+ () -> {
+ ScanResult res = new ScanResult();
+ try {
+ PegasusScannerInterface scanner =
+ table.getScanner(hashKey, startSortKey, stopSortKey, scanOptions);
+ Pair, byte[]> pair;
+ while ((pair = scanner.next()) != null
+ && (maxFetchCount <= 0 || res.results.size() < maxFetchCount)) {
+ res.results.add(pair);
+ }
+ if (pair == null) {
+ res.allFetched = true;
+ }
+ return res;
+ } catch (PException e) {
+ return new ScanResult(e);
+ }
+ })
+ .get(scanOptions.timeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new PException(e);
+ }
+
+ if (scanResult != null && scanResult.exception != null) {
+ throw scanResult.exception;
+ }
+ return scanResult;
+ }
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/range/Range.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/Range.java
new file mode 100644
index 00000000..cf61283a
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/Range.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.xiaomi.infra.pegasus.client.request.range;
+
+import com.xiaomi.infra.pegasus.client.PException;
+import com.xiaomi.infra.pegasus.client.PegasusTable;
+import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
+import com.xiaomi.infra.pegasus.client.ScanOptions;
+
+/**
+ * range operation abstract class based {@linkplain PegasusTableInterface#getScanner(byte[], byte[],
+ * byte[], ScanOptions)} it is implemented by {@linkplain DeleteRange} and {@linkplain GetRange}
+ *
+ * @param generic type for response
+ */
+public abstract class Range {
+ public PegasusTableInterface table;
+ protected byte[] hashKey;
+ protected int timeout;
+
+ protected ScanOptions scanOptions = new ScanOptions();
+ protected byte[] startSortKey;
+ protected byte[] stopSortKey;
+
+ /**
+ * @param table table opened
+ * @param hashKey hashKey used to decide which partition to put this k-v
+ * @param timeout if exceed the timeout will throw timeout exception, if <=0, it is equal with
+ * "timeout" of config
+ */
+ public Range(PegasusTableInterface table, byte[] hashKey, int timeout) {
+ this.table = table;
+ this.hashKey = hashKey;
+ this.timeout = timeout <= 0 ? ((PegasusTable) table).getDefaultTimeout() : timeout;
+ }
+
+ /**
+ * @param maxRangeCount the max count of range operation result
+ * @return generic type for response
+ * @throws PException
+ */
+ public abstract Response commitAndWait(int maxRangeCount) throws PException;
+
+ /**
+ * set scan options
+ *
+ * @param scanOptions see {@linkplain ScanOptions}
+ * @return this
+ */
+ public Range withOptions(ScanOptions scanOptions) {
+ this.scanOptions = scanOptions;
+ return this;
+ }
+
+ /**
+ * set startSortKey
+ *
+ * @param startSortKey start sort key scan from if null or length == 0, means start from begin
+ * @param stopSortKey stop sort key scan to if null or length == 0, means stop to end
+ * @return this
+ */
+ public Range withSortKeyRange(byte[] startSortKey, byte[] stopSortKey) {
+ this.startSortKey = startSortKey;
+ this.stopSortKey = stopSortKey;
+ return this;
+ }
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/range/ScanResult.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/ScanResult.java
new file mode 100644
index 00000000..1511f66b
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/range/ScanResult.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.xiaomi.infra.pegasus.client.request.range;
+
+import static com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult;
+import static com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetSortKeysResult;
+
+import com.xiaomi.infra.pegasus.client.*;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+public class ScanResult {
+ public List, byte[]>> results;
+ public boolean allFetched;
+ public PException exception;
+
+ public ScanResult() {
+ this(new ArrayList<>(), false);
+ }
+
+ public ScanResult(List, byte[]>> results, boolean allFetched) {
+ this.results = results;
+ this.allFetched = allFetched;
+ }
+
+ public ScanResult(PException exception) {
+ this.exception = exception;
+ }
+
+ public MultiGetResult convertMultiGetResult() {
+ MultiGetResult multiGetResult = new MultiGetResult();
+ if (results == null) {
+ return multiGetResult;
+ }
+ multiGetResult.values = new ArrayList<>();
+ for (Pair, byte[]> pair : results) {
+ multiGetResult.values.add(Pair.of(pair.getLeft().getValue(), pair.getValue()));
+ }
+ multiGetResult.allFetched = allFetched;
+ return multiGetResult;
+ }
+
+ public MultiGetSortKeysResult convertMultiGetSortKeysResult() {
+ MultiGetSortKeysResult multiGetSortKeysResult = new MultiGetSortKeysResult();
+ if (results == null) {
+ return multiGetSortKeysResult;
+ }
+ multiGetSortKeysResult.keys = new ArrayList<>();
+ for (Pair, byte[]> pair : results) {
+ multiGetSortKeysResult.keys.add(pair.getLeft().getValue());
+ }
+ multiGetSortKeysResult.allFetched = allFetched;
+ return multiGetSortKeysResult;
+ }
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/example/BatchSample.java b/src/main/java/com/xiaomi/infra/pegasus/example/BatchSample.java
index d8c93664..9a9552af 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/example/BatchSample.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/example/BatchSample.java
@@ -22,11 +22,11 @@
import com.xiaomi.infra.pegasus.client.PegasusClientFactory;
import com.xiaomi.infra.pegasus.client.PegasusClientInterface;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
-import com.xiaomi.infra.pegasus.client.request.BatchWithResponse;
-import com.xiaomi.infra.pegasus.client.request.Get;
-import com.xiaomi.infra.pegasus.client.request.GetBatch;
-import com.xiaomi.infra.pegasus.client.request.Set;
-import com.xiaomi.infra.pegasus.client.request.SetBatch;
+import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse;
+import com.xiaomi.infra.pegasus.client.request.batch.Get;
+import com.xiaomi.infra.pegasus.client.request.batch.GetBatch;
+import com.xiaomi.infra.pegasus.client.request.batch.Set;
+import com.xiaomi.infra.pegasus.client.request.batch.SetBatch;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
index 563e7899..740b4462 100644
--- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
+++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
@@ -4,7 +4,6 @@
package com.xiaomi.infra.pegasus.client;
/** @author qinzuoyan */
-import com.xiaomi.infra.pegasus.client.PegasusTable.ScanRangeResult;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetSortKeysResult;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
@@ -2632,8 +2631,8 @@ private void testWriteSizeLimit(PegasusClientInterface client) {
}
}
- @Test // test for making sure return "maxFetchCount" if has "maxFetchCount" valid record
- public void testScanRangeWithValueExpired() throws PException, InterruptedException {
+ @Test
+ public void testGetSortKeysWithExpiredValue() throws PException, InterruptedException {
String tableName = "temp";
String hashKey = "hashKey";
// generate records: sortKeys=[expired_0....expired_999,persistent_0...persistent_9]
@@ -2641,78 +2640,18 @@ public void testScanRangeWithValueExpired() throws PException, InterruptedExcept
PegasusTable table =
(PegasusTable) PegasusClientFactory.getSingletonClient().openTable(tableName);
- // case A: scan all record
- // case A1: scan all record: if persistent record count >= maxFetchCount, it must return
- // maxFetchCount records
- ScanRangeResult caseA1 =
- table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 5, 0);
- assertScanResult(0, 4, false, caseA1);
- // case A2: scan all record: if persistent record count < maxFetchCount, it only return
- // persistent count records
- ScanRangeResult caseA2 =
- table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 100, 0);
- assertScanResult(0, 9, true, caseA2);
-
- // case B: scan limit record by "startSortKey" and "":
- // case B1: scan limit record by "expired_0" and "", if persistent record count >=
- // maxFetchCount, it must return maxFetchCount records
- ScanRangeResult caseB1 =
- table.scanRange(
- hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new ScanOptions(), 5, 0);
- assertScanResult(0, 4, false, caseB1);
- // case B2: scan limit record by "expired_0" and "", if persistent record count < maxFetchCount,
- // it only return valid records
- ScanRangeResult caseB2 =
- table.scanRange(
- hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new ScanOptions(), 50, 0);
- assertScanResult(0, 9, true, caseB2);
- // case B3: scan limit record by "persistent_5" and "", if following persistent record count <
- // maxFetchCount, it only return valid records
- ScanRangeResult caseB3 =
- table.scanRange(
- hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new ScanOptions(), 50, 0);
- assertScanResult(5, 9, true, caseB3);
- // case B4: scan limit record by "persistent_5" and "", if following persistent record count >
- // maxFetchCount, it only return valid records
- ScanRangeResult caseB4 =
- table.scanRange(
- hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new ScanOptions(), 3, 0);
- assertScanResult(5, 7, false, caseB4);
-
- // case C: scan limit record by "" and "stopSortKey":
- // case C1: scan limit record by "" and "expired_7", if will return 0 record
- ScanRangeResult caseC1 =
- table.scanRange(
- hashKey.getBytes(), "".getBytes(), "expired_7".getBytes(), new ScanOptions(), 3, 0);
- Assert.assertTrue(caseC1.allFetched);
- Assert.assertEquals(0, caseC1.results.size()); // among "" and "expired_7" has 0 valid record
- // case C2: scan limit record by "" and "persistent_7", if valid record count < maxFetchCount,
- // it only return valid record
- ScanRangeResult caseC2 =
- table.scanRange(
- hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new ScanOptions(), 10, 0);
- assertScanResult(0, 6, true, caseC2);
- // case C3: scan limit record by "" and "persistent_7", if valid record count > maxFetchCount,
- // it only return valid record
- ScanRangeResult caseC3 =
- table.scanRange(
- hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new ScanOptions(), 2, 0);
- assertScanResult(0, 1, false, caseC3);
-
- // case D: use multiGetSortKeys, which actually equal with case A but no value
- // case D1: maxFetchCount > 0, return maxFetchCount valid record
- MultiGetSortKeysResult caseD1 = table.multiGetSortKeys(hashKey.getBytes(), 5, -1, 0);
- Assert.assertFalse(caseD1.allFetched);
- Assert.assertEquals(5, caseD1.keys.size());
+ MultiGetSortKeysResult result1 = table.multiGetSortKeys(hashKey.getBytes(), 5, -1, 0);
+ Assertions.assertFalse(result1.allFetched);
+ Assertions.assertEquals(5, result1.keys.size());
for (int i = 0; i <= 4; i++) {
- Assertions.assertEquals("persistent_" + i, new String(caseD1.keys.get(i)));
+ Assertions.assertEquals("persistent_" + i, new String(result1.keys.get(i)));
}
// case D1: maxFetchCount < 0, return all valid record
- MultiGetSortKeysResult caseD2 = table.multiGetSortKeys(hashKey.getBytes(), 10, -1, 0);
- Assert.assertTrue(caseD2.allFetched);
- Assert.assertEquals(10, caseD2.keys.size());
+ MultiGetSortKeysResult result2 = table.multiGetSortKeys(hashKey.getBytes(), 10, -1, 0);
+ Assertions.assertTrue(result2.allFetched);
+ Assertions.assertEquals(10, result2.keys.size());
for (int i = 0; i <= 9; i++) {
- Assertions.assertEquals("persistent_" + i, new String(caseD2.keys.get(i)));
+ Assertions.assertEquals("persistent_" + i, new String(result2.keys.get(i)));
}
}
@@ -2742,20 +2681,4 @@ private void generateRecordsWithExpired(
}
PegasusClientFactory.closeSingletonClient();
}
-
- private void assertScanResult(
- int startIndex, int stopIndex, boolean expectAllFetched, ScanRangeResult actuallyRes) {
- Assertions.assertEquals(expectAllFetched, actuallyRes.allFetched);
- Assertions.assertEquals(stopIndex - startIndex + 1, actuallyRes.results.size());
- for (int i = startIndex; i <= stopIndex; i++) {
- Assertions.assertEquals(
- "hashKey", new String(actuallyRes.results.get(i - startIndex).getLeft().getKey()));
- Assertions.assertEquals(
- "persistent_" + i,
- new String(actuallyRes.results.get(i - startIndex).getLeft().getValue()));
- Assertions.assertEquals(
- "persistent_" + i + "_value",
- new String(actuallyRes.results.get(i - startIndex).getRight()));
- }
- }
}
diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBatch.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBatch.java
index 6f7d0e30..040e0ebf 100644
--- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBatch.java
+++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBatch.java
@@ -17,19 +17,19 @@
package com.xiaomi.infra.pegasus.client;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult;
-import com.xiaomi.infra.pegasus.client.request.BatchWithResponse;
-import com.xiaomi.infra.pegasus.client.request.Delete;
-import com.xiaomi.infra.pegasus.client.request.DeleteBatch;
-import com.xiaomi.infra.pegasus.client.request.Get;
-import com.xiaomi.infra.pegasus.client.request.GetBatch;
-import com.xiaomi.infra.pegasus.client.request.MultiDelete;
-import com.xiaomi.infra.pegasus.client.request.MultiDeleteBatch;
-import com.xiaomi.infra.pegasus.client.request.MultiGet;
-import com.xiaomi.infra.pegasus.client.request.MultiGetBatch;
-import com.xiaomi.infra.pegasus.client.request.MultiSet;
-import com.xiaomi.infra.pegasus.client.request.MultiSetBatch;
-import com.xiaomi.infra.pegasus.client.request.Set;
-import com.xiaomi.infra.pegasus.client.request.SetBatch;
+import com.xiaomi.infra.pegasus.client.request.batch.BatchWithResponse;
+import com.xiaomi.infra.pegasus.client.request.batch.Delete;
+import com.xiaomi.infra.pegasus.client.request.batch.DeleteBatch;
+import com.xiaomi.infra.pegasus.client.request.batch.Get;
+import com.xiaomi.infra.pegasus.client.request.batch.GetBatch;
+import com.xiaomi.infra.pegasus.client.request.batch.MultiDelete;
+import com.xiaomi.infra.pegasus.client.request.batch.MultiDeleteBatch;
+import com.xiaomi.infra.pegasus.client.request.batch.MultiGet;
+import com.xiaomi.infra.pegasus.client.request.batch.MultiGetBatch;
+import com.xiaomi.infra.pegasus.client.request.batch.MultiSet;
+import com.xiaomi.infra.pegasus.client.request.batch.MultiSetBatch;
+import com.xiaomi.infra.pegasus.client.request.batch.Set;
+import com.xiaomi.infra.pegasus.client.request.batch.SetBatch;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestRange.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestRange.java
new file mode 100644
index 00000000..67355d02
--- /dev/null
+++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestRange.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.xiaomi.infra.pegasus.client;
+
+import com.xiaomi.infra.pegasus.client.request.range.GetRange;
+import com.xiaomi.infra.pegasus.client.request.range.ScanResult;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestRange {
+
+ @Test // test for making sure return "maxFetchCount" if has "maxFetchCount" valid record
+ public void testScanRangeWithValueExpired()
+ throws PException, InterruptedException, TimeoutException, ExecutionException {
+ String tableName = "temp";
+ String hashKey = "hashKey";
+ // generate records: sortKeys=[expired_0....expired_999,persistent_0...persistent_9]
+ generateRecordsWithExpired(tableName, hashKey, 1000, 10);
+
+ PegasusTableInterface table = PegasusClientFactory.getSingletonClient().openTable(tableName);
+ GetRange getRange = new GetRange(table, hashKey.getBytes(), 0);
+ // case A: scan all record
+ // case A1: scan all record: if persistent record count >= maxFetchCount, it must return
+ // maxFetchCount records
+ ScanResult caseA1 = getRange.commitAndWait(5);
+ assertScanResult(0, 4, false, caseA1);
+ // case A2: scan all record: if persistent record count < maxFetchCount, it only return
+ // persistent count records
+ ScanResult caseA2 = getRange.commitAndWait(100);
+ assertScanResult(0, 9, true, caseA2);
+
+ // case B: scan limit record by "startSortKey" and "":
+ // case B1: scan limit record by "expired_0" and "", if persistent record count >=
+ // maxFetchCount, it must return maxFetchCount records
+ ScanResult caseB1 = getRange.withSortKeyRange("expired_0".getBytes(), null).commitAndWait(5);
+ assertScanResult(0, 4, false, caseB1);
+ // case B2: scan limit record by "expired_0" and "", if persistent record count < maxFetchCount,
+ // it only return valid records
+ ScanResult caseB2 = getRange.withSortKeyRange("expired_0".getBytes(), null).commitAndWait(50);
+ assertScanResult(0, 9, true, caseB2);
+ // case B3: scan limit record by "persistent_5" and "", if following persistent record count <
+ // maxFetchCount, it only return valid records
+ ScanResult caseB3 =
+ getRange.withSortKeyRange("persistent_5".getBytes(), null).commitAndWait(50);
+ assertScanResult(5, 9, true, caseB3);
+ // case B4: scan limit record by "persistent_5" and "", if following persistent record count >
+ // maxFetchCount, it only return valid records
+ ScanResult caseB4 = getRange.withSortKeyRange("persistent_5".getBytes(), null).commitAndWait(3);
+ assertScanResult(5, 7, false, caseB4);
+
+ // case C: scan limit record by "" and "stopSortKey":
+ // case C1: scan limit record by "" and "expired_7", if will return 0 record
+ ScanResult caseC1 = getRange.withSortKeyRange(null, "expired_7".getBytes()).commitAndWait(3);
+ Assertions.assertTrue(caseC1.allFetched);
+ Assertions.assertEquals(
+ 0, caseC1.results.size()); // among "" and "expired_7" has 0 valid record
+ // case C2: scan limit record by "" and "persistent_7", if valid record count < maxFetchCount,
+ // it only return valid record
+ ScanResult caseC2 =
+ getRange.withSortKeyRange(null, "persistent_7".getBytes()).commitAndWait(10);
+ assertScanResult(0, 6, true, caseC2);
+ // case C3: scan limit record by "" and "persistent_7", if valid record count > maxFetchCount,
+ // it only return valid record
+ ScanResult caseC3 = getRange.withSortKeyRange(null, "persistent_7".getBytes()).commitAndWait(2);
+ assertScanResult(0, 1, false, caseC3);
+ }
+
+ private void generateRecordsWithExpired(
+ String tableName, String hashKey, int expiredCount, int persistentCount)
+ throws PException, InterruptedException {
+ PegasusClientInterface client = PegasusClientFactory.getSingletonClient();
+ // assign prefix to make sure the expire record is stored front of persistent
+ String expiredSortKeyPrefix = "expired_";
+ String persistentSortKeyPrefix = "persistent_";
+ while (expiredCount-- > 0) {
+ client.set(
+ tableName,
+ hashKey.getBytes(),
+ (expiredSortKeyPrefix + expiredCount).getBytes(),
+ (expiredSortKeyPrefix + expiredCount + "_value").getBytes(),
+ 1);
+ }
+ // sleep to make sure the record is expired
+ Thread.sleep(1000);
+ while (persistentCount-- > 0) {
+ client.set(
+ tableName,
+ hashKey.getBytes(),
+ (persistentSortKeyPrefix + persistentCount).getBytes(),
+ (persistentSortKeyPrefix + persistentCount + "_value").getBytes());
+ }
+ PegasusClientFactory.closeSingletonClient();
+ }
+
+ private void assertScanResult(
+ int startIndex, int stopIndex, boolean expectAllFetched, ScanResult actuallyRes) {
+ Assertions.assertEquals(expectAllFetched, actuallyRes.allFetched);
+ Assertions.assertEquals(stopIndex - startIndex + 1, actuallyRes.results.size());
+ for (int i = startIndex; i <= stopIndex; i++) {
+ Assertions.assertEquals(
+ "hashKey", new String(actuallyRes.results.get(i - startIndex).getLeft().getKey()));
+ Assertions.assertEquals(
+ "persistent_" + i,
+ new String(actuallyRes.results.get(i - startIndex).getLeft().getValue()));
+ Assertions.assertEquals(
+ "persistent_" + i + "_value",
+ new String(actuallyRes.results.get(i - startIndex).getRight()));
+ }
+ }
+}