Skip to content

Commit

Permalink
Backport to branch(3) : Fix scan with limit behavior in DynamoDB adap…
Browse files Browse the repository at this point in the history
…ter (#2303)

Co-authored-by: Toshihiro Suzuki <[email protected]>
  • Loading branch information
feeblefakie and brfrn169 authored Oct 30, 2024
1 parent 523cc12 commit 29bbd7f
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 84 deletions.
30 changes: 22 additions & 8 deletions core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,34 @@
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

@NotThreadSafe
public class QueryScanner implements Scanner {

private final PaginatedRequest request;
private final ResultInterpreter resultInterpreter;

private Iterator<Map<String, AttributeValue>> itemsIterator;
@Nullable private Integer remainingLimit;
@Nullable private Map<String, AttributeValue> lastEvaluatedKey;
private int totalResultCount;

private ScannerIterator scannerIterator;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public QueryScanner(PaginatedRequest request, ResultInterpreter resultInterpreter) {
public QueryScanner(PaginatedRequest request, int limit, ResultInterpreter resultInterpreter) {
this.request = request;
this.resultInterpreter = resultInterpreter;

handleResponse(request.execute());
if (limit > 0) {
remainingLimit = limit;
handleResponse(request.execute(limit));
} else {
remainingLimit = null;
handleResponse(request.execute());
}

this.resultInterpreter = resultInterpreter;
}

@Override
Expand All @@ -49,18 +58,23 @@ private boolean hasNext() {
return true;
}
if (lastEvaluatedKey != null) {
handleResponse(request.execute(lastEvaluatedKey));
if (remainingLimit != null) {
handleResponse(request.execute(lastEvaluatedKey, remainingLimit));
} else {
handleResponse(request.execute(lastEvaluatedKey));
}
return itemsIterator.hasNext();
}
return false;
}

private void handleResponse(PaginatedRequestResponse response) {
List<Map<String, AttributeValue>> items = response.items();
totalResultCount += items.size();
if (remainingLimit != null) {
remainingLimit -= items.size();
}
itemsIterator = items.iterator();
if ((request.limit() == null || totalResultCount < request.limit())
&& response.hasLastEvaluatedKey()) {
if ((remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey()) {
lastEvaluatedKey = response.lastEvaluatedKey();
} else {
lastEvaluatedKey = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,16 @@ private Scanner executeScanWithIndex(Selection selection, TableMetadata tableMet

builder.expressionAttributeNames(expressionAttributeNames);

int limit = 0;
if (selection instanceof Scan) {
Scan scan = (Scan) selection;
if (scan.getLimit() > 0) {
builder.limit(scan.getLimit());
}
limit = scan.getLimit();
}

com.scalar.db.storage.dynamo.request.QueryRequest request =
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
return new QueryScanner(
request, new ResultInterpreter(selection.getProjections(), tableMetadata));
request, limit, new ResultInterpreter(selection.getProjections(), tableMetadata));
}

private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
Expand All @@ -171,10 +171,6 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
}
}

if (scan.getLimit() > 0) {
builder.limit(scan.getLimit());
}

if (!scan.getProjections().isEmpty()) {
Map<String, String> expressionAttributeNames = new HashMap<>();
projectionExpression(builder, scan, expressionAttributeNames);
Expand All @@ -184,20 +180,17 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
if (scan.getConsistency() != Consistency.EVENTUAL) {
builder.consistentRead(true);
}

com.scalar.db.storage.dynamo.request.QueryRequest queryRequest =
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
return new QueryScanner(
queryRequest, new ResultInterpreter(scan.getProjections(), tableMetadata));
queryRequest, scan.getLimit(), new ResultInterpreter(scan.getProjections(), tableMetadata));
}

private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
DynamoOperation dynamoOperation = new DynamoOperation(scan, tableMetadata);
ScanRequest.Builder builder = ScanRequest.builder().tableName(dynamoOperation.getTableName());

if (scan.getLimit() > 0) {
builder.limit(scan.getLimit());
}

if (!scan.getProjections().isEmpty()) {
Map<String, String> expressionAttributeNames = new HashMap<>();
projectionExpression(builder, scan, expressionAttributeNames);
Expand All @@ -207,10 +200,13 @@ private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
if (scan.getConsistency() != Consistency.EVENTUAL) {
builder.consistentRead(true);
}

com.scalar.db.storage.dynamo.request.ScanRequest requestWrapper =
new com.scalar.db.storage.dynamo.request.ScanRequest(client, builder.build());
return new QueryScanner(
requestWrapper, new ResultInterpreter(scan.getProjections(), tableMetadata));
requestWrapper,
scan.getLimit(),
new ResultInterpreter(scan.getProjections(), tableMetadata));
}

private void projectionExpression(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,34 @@ public interface PaginatedRequest {
/**
* Execute the request
*
* @return the request response
* @return the response
*/
PaginatedRequestResponse execute();

/**
* Execute the request with limit
*
* @param limit the maximum number of items to evaluate (not necessarily the number of matching
* items)
* @return the response
*/
PaginatedRequestResponse execute(int limit);

/**
* Execute the request that will be evaluated starting from the given start key
*
* @param exclusiveStartKey The primary key of the first item that this operation will evaluate.
* @return the request response
* @param exclusiveStartKey the primary key of the first item that this operation will evaluate.
* @return the response
*/
PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey);

/**
* Returns the request limit
* Execute the request that will be evaluated starting from the given start key with limit
*
* @return the request limit
* @param exclusiveStartKey the primary key of the first item that this operation will evaluate.
* @param limit the maximum number of items to evaluate (not necessarily the number of matching
* items)
* @return the response
*/
Integer limit();
PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey, int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ public QueryRequest(
this.dynamoRequest = dynamoRequest;
}

@Override
public PaginatedRequestResponse execute() {
QueryResponse response = client.query(dynamoRequest);

return new PaginatedRequestResponse(
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
}

@Override
public PaginatedRequestResponse execute(int limit) {
QueryRequest request =
new QueryRequest(this.client, this.dynamoRequest.toBuilder().limit(limit).build());
return request.execute();
}

@Override
public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey) {
QueryRequest request =
Expand All @@ -29,15 +44,16 @@ public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveSta
}

@Override
public PaginatedRequestResponse execute() {
QueryResponse response = client.query(dynamoRequest);

return new PaginatedRequestResponse(
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
}

@Override
public Integer limit() {
return dynamoRequest.limit();
public PaginatedRequestResponse execute(
Map<String, AttributeValue> exclusiveStartKey, int limit) {
QueryRequest request =
new QueryRequest(
this.client,
this.dynamoRequest
.toBuilder()
.exclusiveStartKey(exclusiveStartKey)
.limit(limit)
.build());
return request.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ public ScanRequest(
this.dynamoRequest = dynamoRequest;
}

@Override
public PaginatedRequestResponse execute() {
ScanResponse response = client.scan(dynamoRequest);

return new PaginatedRequestResponse(
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
}

@Override
public PaginatedRequestResponse execute(int limit) {
ScanRequest request =
new ScanRequest(this.client, this.dynamoRequest.toBuilder().limit(limit).build());
return request.execute();
}

@Override
public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey) {
ScanRequest request =
Expand All @@ -29,15 +44,16 @@ public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveSta
}

@Override
public PaginatedRequestResponse execute() {
ScanResponse response = client.scan(dynamoRequest);

return new PaginatedRequestResponse(
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
}

@Override
public Integer limit() {
return dynamoRequest.limit();
public PaginatedRequestResponse execute(
Map<String, AttributeValue> exclusiveStartKey, int limit) {
ScanRequest request =
new ScanRequest(
this.client,
this.dynamoRequest
.toBuilder()
.exclusiveStartKey(exclusiveStartKey)
.limit(limit)
.build());
return request.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ public void one_ShouldReturnResult() {
Map<String, AttributeValue> item = Collections.emptyMap();
List<Map<String, AttributeValue>> items = Arrays.asList(item, item, item);
when(request.execute()).thenReturn(response);
when(request.limit()).thenReturn(null);
when(response.items()).thenReturn(items);
when(resultInterpreter.interpret(item)).thenReturn(result);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);

// Act
Optional<Result> actual1 = queryScanner.one();
Expand All @@ -71,11 +70,10 @@ public void all_ShouldReturnResults() {
Map<String, AttributeValue> item = Collections.emptyMap();
List<Map<String, AttributeValue>> items = Arrays.asList(item, item, item);
when(request.execute()).thenReturn(response);
when(request.limit()).thenReturn(null);
when(response.items()).thenReturn(items);
when(resultInterpreter.interpret(item)).thenReturn(result);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);

// Act
List<Result> results1 = queryScanner.all();
Expand All @@ -100,9 +98,8 @@ public void iterator_ShouldReturnResults() {
when(response.items()).thenReturn(items);
when(resultInterpreter.interpret(item)).thenReturn(result);
when(request.execute()).thenReturn(response);
when(request.limit()).thenReturn(null);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);

// Act
Iterator<Result> iterator = queryScanner.iterator();
Expand Down Expand Up @@ -134,9 +131,8 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() {
when(resultInterpreter.interpret(item)).thenReturn(result);
when(request.execute()).thenReturn(response);
when(request.execute(lastEvaluatedKey)).thenReturn(response);
when(request.limit()).thenReturn(null);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);

// Act
Optional<Result> actual1 = queryScanner.one();
Expand Down Expand Up @@ -164,26 +160,27 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() {
@Test
public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResults() {
// Arrange
int limit = 3;

Map<String, AttributeValue> item = Collections.emptyMap();
List<Map<String, AttributeValue>> items = Arrays.asList(item, item);
List<Map<String, AttributeValue>> items1 = Arrays.asList(item, item);
List<Map<String, AttributeValue>> items2 = Collections.singletonList(item);
Map<String, AttributeValue> lastEvaluatedKey = Collections.emptyMap();

when(request.limit()).thenReturn(4);
when(response.items()).thenReturn(items);
when(response.items()).thenReturn(items1).thenReturn(items2);
when(response.hasLastEvaluatedKey()).thenReturn(true);
when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey);
when(request.execute()).thenReturn(response);
when(request.execute(lastEvaluatedKey)).thenReturn(response);
when(request.execute(limit)).thenReturn(response);
when(request.execute(lastEvaluatedKey, limit - items1.size())).thenReturn(response);
when(resultInterpreter.interpret(item)).thenReturn(result);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, limit, resultInterpreter);

// Act
Optional<Result> actual1 = queryScanner.one();
Optional<Result> actual2 = queryScanner.one();
Optional<Result> actual3 = queryScanner.one();
Optional<Result> actual4 = queryScanner.one();
Optional<Result> actual5 = queryScanner.one();

// Assert
assertThat(actual1).isPresent();
Expand All @@ -192,12 +189,10 @@ public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResu
assertThat(actual2.get()).isEqualTo(result);
assertThat(actual3).isPresent();
assertThat(actual3.get()).isEqualTo(result);
assertThat(actual4).isPresent();
assertThat(actual4.get()).isEqualTo(result);
assertThat(actual5).isNotPresent();
assertThat(actual4).isNotPresent();

verify(resultInterpreter, times(4)).interpret(item);
verify(request).execute(lastEvaluatedKey);
verify(request).execute();
verify(resultInterpreter, times(limit)).interpret(item);
verify(request).execute(limit);
verify(request).execute(lastEvaluatedKey, limit - items1.size());
}
}
Loading

0 comments on commit 29bbd7f

Please sign in to comment.