Skip to content

Commit

Permalink
Implement aggregate count API for MongoCollection (#84)
Browse files Browse the repository at this point in the history
* Implement aggregate count API for MongoCollection
  • Loading branch information
suresh-prakash authored Jun 28, 2022
1 parent e1195c4 commit f38193f
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hypertrace.core.documentstore.utils.Utils.convertJsonToMap;
import static org.hypertrace.core.documentstore.utils.Utils.createDocumentsFromResource;
import static org.hypertrace.core.documentstore.utils.Utils.readFileFromResource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -52,7 +53,6 @@
import org.hypertrace.core.documentstore.query.Sort;
import org.hypertrace.core.documentstore.query.SortingSpec;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -103,6 +103,7 @@ public void testFindAll() throws IOException {

Iterator<Document> resultDocs = collection.find(query);
assertSizeEqual(resultDocs, "mongo/collection_data.json");
assertSizeEqual(query, "mongo/collection_data.json");
}

@Test
Expand Down Expand Up @@ -137,6 +138,7 @@ public void testFindSimple() throws IOException {

Iterator<Document> resultDocs = collection.find(query);
assertDocsEqual(resultDocs, "mongo/simple_filter_response.json");
assertSizeEqual(query, "mongo/simple_filter_response.json");
}

@Test
Expand All @@ -163,6 +165,7 @@ public void testFindWithDuplicateSelections() throws IOException {

Iterator<Document> resultDocs = collection.find(query);
assertDocsEqual(resultDocs, "mongo/simple_filter_response.json");
assertSizeEqual(query, "mongo/simple_filter_response.json");
}

@Test
Expand Down Expand Up @@ -202,6 +205,7 @@ public void testFindWithSortingAndPagination() throws IOException {

Iterator<Document> resultDocs = collection.find(query);
assertDocsEqual(resultDocs, "mongo/filter_with_sorting_and_pagination_response.json");
assertSizeEqual(query, "mongo/filter_with_sorting_and_pagination_response.json");
}

@Test
Expand Down Expand Up @@ -243,6 +247,7 @@ public void testFindWithDuplicateSortingAndPagination() throws IOException {

Iterator<Document> resultDocs = collection.find(query);
assertDocsEqual(resultDocs, "mongo/filter_with_sorting_and_pagination_response.json");
assertSizeEqual(query, "mongo/filter_with_sorting_and_pagination_response.json");
}

@Test
Expand Down Expand Up @@ -279,6 +284,7 @@ public void testFindWithNestedFields() throws IOException {

Iterator<Document> resultDocs = collection.find(query);
assertDocsEqual(resultDocs, "mongo/filter_on_nested_fields_response.json");
assertSizeEqual(query, "mongo/filter_on_nested_fields_response.json");
}

@Test
Expand All @@ -287,6 +293,7 @@ public void testAggregateEmpty() throws IOException {

Iterator<Document> resultDocs = collection.aggregate(query);
assertSizeEqual(resultDocs, "mongo/collection_data.json");
assertSizeEqual(query, "mongo/collection_data.json");
}

@Test
Expand All @@ -298,6 +305,7 @@ public void testAggregateSimple() throws IOException {

Iterator<Document> resultDocs = collection.aggregate(query);
assertDocsEqual(resultDocs, "mongo/count_response.json");
assertSizeEqual(query, "mongo/count_response.json");
}

@Test
Expand All @@ -310,6 +318,7 @@ public void testAggregateWithDuplicateSelections() throws IOException {

Iterator<Document> resultDocs = collection.aggregate(query);
assertDocsEqual(resultDocs, "mongo/count_response.json");
assertSizeEqual(query, "mongo/count_response.json");
}

@Test
Expand Down Expand Up @@ -346,6 +355,7 @@ public void testAggregateWithFiltersAndOrdering() throws IOException {

Iterator<Document> resultDocs = collection.aggregate(query);
assertDocsEqual(resultDocs, "mongo/sum_response.json");
assertSizeEqual(query, "mongo/sum_response.json");
}

@Test
Expand Down Expand Up @@ -385,6 +395,7 @@ public void testAggregateWithFiltersAndDuplicateOrderingAndDuplicateAggregations

Iterator<Document> resultDocs = collection.aggregate(query);
assertDocsEqual(resultDocs, "mongo/sum_response.json");
assertSizeEqual(query, "mongo/sum_response.json");
}

@Test
Expand All @@ -403,6 +414,7 @@ public void testAggregateWithNestedFields() throws IOException {

Iterator<Document> resultDocs = collection.aggregate(query);
assertDocsEqual(resultDocs, "mongo/aggregate_on_nested_fields_response.json");
assertSizeEqual(query, "mongo/aggregate_on_nested_fields_response.json");
}

@Test
Expand All @@ -417,6 +429,7 @@ public void testAggregateWithoutAggregationAlias() {
.build();

assertThrows(IllegalArgumentException.class, () -> collection.aggregate(query));
assertThrows(IllegalArgumentException.class, () -> collection.count(query));
}

@Test
Expand All @@ -441,6 +454,7 @@ public void testAggregateWithUnsupportedExpressionNesting() {
.build();

assertThrows(UnsupportedOperationException.class, () -> collection.aggregate(query));
assertThrows(UnsupportedOperationException.class, () -> collection.count(query));
}

@Test
Expand All @@ -467,6 +481,7 @@ public void testAggregateWithMultipleGroupingLevels() throws IOException {

Iterator<Document> resultDocs = collection.aggregate(query);
assertDocsEqual(resultDocs, "mongo/multi_level_grouping_response.json");
assertSizeEqual(query, "mongo/multi_level_grouping_response.json");
}

@Test
Expand All @@ -487,6 +502,7 @@ public void testDistinctCount() throws IOException {

Iterator<Document> resultDocs = collection.aggregate(query);
assertDocsEqual(resultDocs, "mongo/distinct_count_response.json");
assertSizeEqual(query, "mongo/distinct_count_response.json");
}

@Test
Expand All @@ -506,6 +522,7 @@ public void testUnnestAndAggregate() throws IOException {

Iterator<Document> iterator = collection.aggregate(query);
assertDocsEqual(iterator, "mongo/aggregate_on_nested_array_reponse.json");
assertSizeEqual(query, "mongo/aggregate_on_nested_array_reponse.json");
}

@Test
Expand All @@ -520,6 +537,7 @@ public void testUnnestAndAggregate_preserveEmptyTrue() throws IOException {

Iterator<Document> iterator = collection.aggregate(query);
assertDocsEqual(iterator, "mongo/unwind_preserving_empty_array_response.json");
assertSizeEqual(query, "mongo/unwind_preserving_empty_array_response.json");
}

@Test
Expand All @@ -534,6 +552,7 @@ public void testUnnestAndAggregate_preserveEmptyFalse() throws IOException {

Iterator<Document> iterator = collection.aggregate(query);
assertDocsEqual(iterator, "mongo/unwind_not_preserving_empty_array_response.json");
assertSizeEqual(query, "mongo/unwind_not_preserving_empty_array_response.json");
}

@Test
Expand All @@ -553,6 +572,7 @@ public void testUnnest() throws IOException {

Iterator<Document> iterator = collection.aggregate(query);
assertDocsEqual(iterator, "mongo/unwind_response.json");
assertSizeEqual(query, "mongo/unwind_response.json");
}

@Test
Expand Down Expand Up @@ -581,6 +601,7 @@ public void testFilterAndUnnest() throws IOException {

Iterator<Document> iterator = collection.aggregate(query);
assertDocsEqual(iterator, "mongo/unwind_filter_response.json");
assertSizeEqual(query, "mongo/unwind_filter_response.json");
}

private static void assertDocsEqual(Iterator<Document> documents, String filePath)
Expand All @@ -593,7 +614,7 @@ private static void assertDocsEqual(Iterator<Document> documents, String filePat
actual.add(convertDocumentToMap(documents.next()));
}

Assertions.assertEquals(expected, actual);
assertEquals(expected, actual);
}

private static void assertSizeEqual(Iterator<Document> documents, String filePath)
Expand All @@ -606,6 +627,13 @@ private static void assertSizeEqual(Iterator<Document> documents, String filePat
documents.next();
}

Assertions.assertEquals(expected, actual);
assertEquals(expected, actual);
}

private static void assertSizeEqual(final Query query, final String filePath) throws IOException {
final long actualSize = collection.count(query);
final String fileContent = readFileFromResource(filePath).orElseThrow();
final long expectedSize = convertJsonToMap(fileContent).size();
assertEquals(expectedSize, actualSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ public interface Collection {
*/
long total(Query query);

/**
* Count the result-set size of executing the given query. Note that this method is a generic
* version of {@link #count()} and {@link #total(Query)}
*
* @param query The query definition whose result-set size is to be determined
* @return The number of documents conforming to the input query
*/
long count(final org.hypertrace.core.documentstore.query.Query query);

/**
* @param documents to be upserted in bulk
* @return true if the operation succeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,11 @@ public CloseableIterator<Document> aggregate(
return convertToDocumentIterator(queryExecutor.aggregate(query));
}

@Override
public long count(org.hypertrace.core.documentstore.query.Query query) {
return queryExecutor.count(query);
}

@Override
public boolean delete(Key key) {
DeleteResult deleteResult = collection.deleteOne(this.selectionCriteriaForKey(key));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.hypertrace.core.documentstore.mongo;

import static java.lang.Long.parseLong;
import static java.util.Collections.singleton;
import static java.util.function.Predicate.not;
import static org.hypertrace.core.documentstore.mongo.MongoPaginationHelper.applyPagination;
import static org.hypertrace.core.documentstore.mongo.MongoPaginationHelper.getLimitClause;
import static org.hypertrace.core.documentstore.mongo.MongoPaginationHelper.getSkipClause;
import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.COUNT_ALIAS;
import static org.hypertrace.core.documentstore.mongo.clause.MongoCountClauseSupplier.getCountClause;
import static org.hypertrace.core.documentstore.mongo.parser.MongoFilterTypeExpressionParser.getFilter;
import static org.hypertrace.core.documentstore.mongo.parser.MongoFilterTypeExpressionParser.getFilterClause;
import static org.hypertrace.core.documentstore.mongo.parser.MongoGroupTypeExpressionParser.getGroupClause;
Expand All @@ -21,6 +24,7 @@
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.bson.conversions.Bson;
Expand Down Expand Up @@ -79,6 +83,29 @@ public MongoCursor<BasicDBObject> aggregate(final Query originalQuery) {
return iterable.cursor();
}

public long count(final Query originalQuery) {
final Query query = transformAndLog(originalQuery);

final List<BasicDBObject> pipeline =
Stream.concat(
AGGREGATE_PIPELINE_FUNCTIONS.stream()
.flatMap(function -> function.apply(query).stream()),
Stream.of(getCountClause()))
.filter(not(BasicDBObject::isEmpty))
.collect(Collectors.toList());

logPipeline(pipeline);
final AggregateIterable<BasicDBObject> iterable = collection.aggregate(pipeline);

try (final MongoCursor<BasicDBObject> cursor = iterable.cursor()) {
if (cursor.hasNext()) {
return parseLong(cursor.next().get(COUNT_ALIAS).toString());
}
}

return 0;
}

private void logClauses(
final Query query,
final Bson projection,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.hypertrace.core.documentstore.mongo.clause;

import static org.hypertrace.core.documentstore.mongo.MongoUtils.PREFIX;

import com.mongodb.BasicDBObject;

public class MongoCountClauseSupplier {
public static final String COUNT_ALIAS = "count";
private static final String COUNT_CLAUSE = PREFIX + "count";

public static BasicDBObject getCountClause() {
return new BasicDBObject(COUNT_CLAUSE, COUNT_ALIAS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ public CloseableIterator<Document> aggregate(
throw new UnsupportedOperationException();
}

@Override
public long count(org.hypertrace.core.documentstore.query.Query query) {
throw new UnsupportedOperationException();
}

@VisibleForTesting
protected PreparedStatement buildPreparedStatement(String sqlQuery, Params params)
throws SQLException, RuntimeException {
Expand Down

0 comments on commit f38193f

Please sign in to comment.