-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Star Tree Request/Response structure #227
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,7 +56,11 @@ | |
import org.opensearch.index.IndexSortConfig; | ||
import org.opensearch.index.analysis.IndexAnalyzers; | ||
import org.opensearch.index.cache.bitset.BitsetFilterCache; | ||
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; | ||
import org.opensearch.index.compositeindex.datacube.Metric; | ||
import org.opensearch.index.compositeindex.datacube.MetricStat; | ||
import org.opensearch.index.fielddata.IndexFieldData; | ||
import org.opensearch.index.mapper.CompositeDataCubeFieldType; | ||
import org.opensearch.index.mapper.ContentPath; | ||
import org.opensearch.index.mapper.DerivedFieldResolver; | ||
import org.opensearch.index.mapper.DerivedFieldResolverFactory; | ||
|
@@ -73,13 +77,17 @@ | |
import org.opensearch.script.ScriptContext; | ||
import org.opensearch.script.ScriptFactory; | ||
import org.opensearch.script.ScriptService; | ||
import org.opensearch.search.aggregations.AggregatorFactory; | ||
import org.opensearch.search.aggregations.metrics.SumAggregatorFactory; | ||
import org.opensearch.search.aggregations.support.AggregationUsageService; | ||
import org.opensearch.search.aggregations.support.ValuesSourceRegistry; | ||
import org.opensearch.search.lookup.SearchLookup; | ||
import org.opensearch.search.query.startree.StarTreeQuery; | ||
import org.opensearch.search.startree.OriginalOrStarTreeQuery; | ||
import org.opensearch.search.startree.StarTreeQuery; | ||
import org.opensearch.transport.RemoteClusterAware; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
|
@@ -90,6 +98,7 @@ | |
import java.util.function.LongSupplier; | ||
import java.util.function.Predicate; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
|
||
import static java.util.Collections.emptyList; | ||
import static java.util.Collections.emptyMap; | ||
|
@@ -529,6 +538,66 @@ private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction<QueryBuil | |
} | ||
} | ||
|
||
public ParsedQuery toStarTreeQuery(CompositeIndexFieldInfo starTree, QueryBuilder queryBuilder, Query query) { | ||
Map<String, List<Predicate<Long>>> predicateMap = getStarTreePredicates(queryBuilder); | ||
StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, predicateMap, null); | ||
OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, query); | ||
return new ParsedQuery(originalOrStarTreeQuery); | ||
} | ||
|
||
/** | ||
* Parse query body to star-tree predicates | ||
* @param queryBuilder | ||
* @return | ||
*/ | ||
private Map<String, List<Predicate<Long>>> getStarTreePredicates(QueryBuilder queryBuilder) { | ||
// Assuming the following variables have been initialized: | ||
Map<String, List<Predicate<Long>>> predicateMap = new HashMap<>(); | ||
|
||
// Check if the query builder is an instance of TermQueryBuilder | ||
if (queryBuilder instanceof TermQueryBuilder) { | ||
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder; | ||
String field = tq.fieldName(); | ||
long inputQueryVal = Long.parseLong(tq.value().toString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Convert to sortable long |
||
|
||
// Get or create the list of predicates for the given field | ||
List<Predicate<Long>> predicates = predicateMap.getOrDefault(field, new ArrayList<>()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we use predicates, we can't use binary search during star tree traversal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haven't really put much thought into this yet, let me revisit the part on how to better store information to use filters. For request/response parsing I had just inspired changes from previous POCs. |
||
|
||
// Create a predicate to match the input query value | ||
Predicate<Long> predicate = dimVal -> dimVal == inputQueryVal; | ||
predicates.add(predicate); | ||
|
||
// Put the predicates list back into the map | ||
predicateMap.put(field, predicates); | ||
} else { | ||
throw new IllegalArgumentException("The query is not a term query"); | ||
} | ||
return predicateMap; | ||
|
||
} | ||
|
||
public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) { | ||
String field = null; | ||
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics() | ||
.stream() | ||
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); | ||
|
||
// Existing support only for MetricAggregators without sub-aggregations | ||
if (aggregatorFactory.getSubFactories().getFactories().length != 0) { | ||
return false; | ||
} | ||
|
||
// TODO: increment supported aggregation type | ||
if (aggregatorFactory instanceof SumAggregatorFactory) { | ||
field = ((SumAggregatorFactory) aggregatorFactory).getField(); | ||
if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM)) { | ||
return true; | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
public Index index() { | ||
return indexSettings.getIndex(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,13 +31,20 @@ | |
|
||
package org.opensearch.search.aggregations.metrics; | ||
|
||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.SegmentReader; | ||
import org.opensearch.common.lucene.Lucene; | ||
import org.opensearch.common.util.Comparators; | ||
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; | ||
import org.opensearch.index.codec.composite.CompositeIndexReader; | ||
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; | ||
import org.opensearch.search.aggregations.Aggregator; | ||
import org.opensearch.search.internal.SearchContext; | ||
import org.opensearch.search.sort.SortOrder; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
/** | ||
* Base class to aggregate all docs into a single numeric metric value. | ||
|
@@ -107,4 +114,14 @@ public BucketComparator bucketComparator(String key, SortOrder order) { | |
return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC); | ||
} | ||
} | ||
|
||
protected StarTreeValues getStarTreeValues(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException { | ||
SegmentReader reader = Lucene.segmentReader(ctx.reader()); | ||
if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to see if its better to load them as doubleValuesSource similar to how existing fields are loaded. And that too load the specific fields requested instead of loading the entire star tree values. ( for example in sum aggregator, we can fetch the doubleFieldData of a particular field of star tree metric , for eg : sum_status_metric can be loaded in ) Then you don't need to worry about conversion either. |
||
CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); | ||
StarTreeValues values = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); | ||
final AtomicReference<StarTreeValues> aggrVal = new AtomicReference<>(null); | ||
|
||
return values; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,8 @@ | |
import org.opensearch.common.lease.Releasables; | ||
import org.opensearch.common.util.BigArrays; | ||
import org.opensearch.common.util.DoubleArray; | ||
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; | ||
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; | ||
import org.opensearch.index.fielddata.SortedNumericDoubleValues; | ||
import org.opensearch.search.DocValueFormat; | ||
import org.opensearch.search.aggregations.Aggregator; | ||
|
@@ -45,6 +47,8 @@ | |
import org.opensearch.search.aggregations.support.ValuesSource; | ||
import org.opensearch.search.aggregations.support.ValuesSourceConfig; | ||
import org.opensearch.search.internal.SearchContext; | ||
import org.opensearch.search.startree.OriginalOrStarTreeQuery; | ||
import org.opensearch.search.startree.StarTreeQuery; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
|
@@ -56,13 +60,13 @@ | |
*/ | ||
public class SumAggregator extends NumericMetricsAggregator.SingleValue { | ||
|
||
private final ValuesSource.Numeric valuesSource; | ||
private final DocValueFormat format; | ||
protected final ValuesSource.Numeric valuesSource; | ||
protected final DocValueFormat format; | ||
|
||
private DoubleArray sums; | ||
private DoubleArray compensations; | ||
protected DoubleArray sums; | ||
protected DoubleArray compensations; | ||
|
||
SumAggregator( | ||
public SumAggregator( | ||
String name, | ||
ValuesSourceConfig valuesSourceConfig, | ||
SearchContext context, | ||
|
@@ -86,6 +90,14 @@ public ScoreMode scoreMode() { | |
|
||
@Override | ||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { | ||
if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { | ||
StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); | ||
return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); | ||
} | ||
return getDefaultLeafCollector(ctx, sub); | ||
} | ||
|
||
private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { | ||
if (valuesSource == null) { | ||
return LeafBucketCollector.NO_OP_COLLECTOR; | ||
} | ||
|
@@ -118,6 +130,28 @@ public void collect(int doc, long bucket) throws IOException { | |
}; | ||
} | ||
|
||
private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) | ||
throws IOException { | ||
final BigArrays bigArrays = context.bigArrays(); | ||
final CompensatedSum kahanSummation = new CompensatedSum(0, 0); | ||
|
||
StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); | ||
|
||
// | ||
String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); | ||
|
||
return new LeafBucketCollectorBase(sub, starTreeValues) { | ||
@Override | ||
public void collect(int doc, long bucket) throws IOException { | ||
// TODO: Fix the response for collecting star tree sum | ||
sums = bigArrays.grow(sums, bucket + 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we extract out the default implementation I really like the approach of reusing the existing aggregators. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'll try and do that, I am inclined to on refactoring & re-using same implementations wherever possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); we can check if we are able to get sortedNumericDoubleValues , otherwise we need to convert to double for each doc |
||
compensations = bigArrays.grow(compensations, bucket + 1); | ||
compensations.set(bucket, kahanSummation.delta()); | ||
sums.set(bucket, kahanSummation.value()); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public double metric(long owningBucketOrd) { | ||
if (valuesSource == null || owningBucketOrd >= sums.size()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be converted to dimension field name of star tree?