diff --git a/libs/arrow/src/main/java/org/opensearch/arrow/StreamManager.java b/libs/arrow/src/main/java/org/opensearch/arrow/StreamManager.java index 900765ed8c0ff..6bd0fa234b7e0 100644 --- a/libs/arrow/src/main/java/org/opensearch/arrow/StreamManager.java +++ b/libs/arrow/src/main/java/org/opensearch/arrow/StreamManager.java @@ -20,9 +20,6 @@ */ @ExperimentalApi public abstract class StreamManager implements AutoCloseable { - - public abstract void setFlightClient(Object flightClient); - private final ConcurrentHashMap streams; /** diff --git a/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightService.java b/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightService.java index 4b84819999997..a05642999f52f 100644 --- a/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightService.java +++ b/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightService.java @@ -117,7 +117,6 @@ protected void doStart() { final Location location = Location.forGrpcInsecure(host, port); server = FlightServer.builder(allocator, location, producer).build(); client = FlightClient.builder(allocator, location).build(); - streamManager.setFlightClient(client); server.start(); logger.info("Arrow Flight server started successfully"); } catch (IOException e) { diff --git a/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightStreamManager.java b/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightStreamManager.java index 2d1a53957b12b..101a7fc535b5e 100644 --- a/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightStreamManager.java +++ b/modules/arrow-flight/src/main/java/org/opensearch/flight/FlightStreamManager.java @@ -25,7 +25,7 @@ */ public class FlightStreamManager extends StreamManager { - private FlightClient flightClient; + private final FlightClient flightClient; /** * Constructs a new FlightStreamManager. @@ -42,12 +42,6 @@ public FlightStreamManager(FlightClient flightClient) { * @param ticket The StreamTicket identifying the desired stream. * @return The VectorSchemaRoot associated with the given ticket. */ - @Override - public void setFlightClient(Object flightClient) { - assert flightClient instanceof FlightClient; - this.flightClient = (FlightClient) flightClient; - } - @Override public VectorSchemaRoot getVectorSchemaRoot(StreamTicket ticket) { // TODO: for remote streams, register streams in cluster state with node details @@ -58,6 +52,7 @@ public VectorSchemaRoot getVectorSchemaRoot(StreamTicket ticket) { @Override public StreamTicket generateUniqueTicket() { + // return new StreamTicket("123".getBytes()) {}; return new StreamTicket(UUID.randomUUID().toString().getBytes()) {}; } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java index f600c233b9273..4c9b592abe250 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java @@ -499,7 +499,7 @@ ReducedQueryPhase reducedQueryPhase( for (SearchPhaseResult entry : queryResults) { QuerySearchResult result = entry.queryResult(); if (entry instanceof StreamSearchResult) { - tickets.addAll(((StreamSearchResult)entry).getFlightTickets()); + tickets.addAll(((StreamSearchResult) entry).getFlightTickets()); } from = result.from(); // sorted queries can set the size to 0 if they have enough competitive hits. @@ -728,7 +728,7 @@ public static final class ReducedQueryPhase { this.from = from; this.isEmptyResult = isEmptyResult; this.sortValueFormats = sortValueFormats; - this.osTickets = osTickets; + this.osTickets = osTickets; } /** diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index f26f8c2a75555..8c30d37076cc9 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -60,7 +60,6 @@ import org.opensearch.search.query.QuerySearchRequest; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.search.query.ScrollQuerySearchResult; -import org.opensearch.search.query.StreamQueryResponse; import org.opensearch.search.stream.StreamSearchResult; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterService; @@ -244,7 +243,6 @@ public void sendExecuteQuery( // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request // this used to be the QUERY_AND_FETCH which doesn't exist anymore. - if (request.isStreamRequest()) { Writeable.Reader reader = StreamSearchResult::new; final ActionListener handler = responseWrapper.apply(connection, listener); diff --git a/server/src/main/java/org/opensearch/action/search/SearchType.java b/server/src/main/java/org/opensearch/action/search/SearchType.java index a8ada789adf22..a8e75c5f89113 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchType.java +++ b/server/src/main/java/org/opensearch/action/search/SearchType.java @@ -89,7 +89,7 @@ public static SearchType fromId(byte id) { } else if (id == 1 || id == 3) { // TODO this bwc layer can be removed once this is back-ported to 5.3 QUERY_AND_FETCH is removed // now return QUERY_THEN_FETCH; - } else if (id == 5) { + } else if (id == 5) { return STREAM; } else { throw new IllegalArgumentException("No search type for [" + id + "]"); diff --git a/server/src/main/java/org/opensearch/action/search/StreamAsyncAction.java b/server/src/main/java/org/opensearch/action/search/StreamAsyncAction.java index 67b6c7c11ce81..59f443ad7ccc4 100644 --- a/server/src/main/java/org/opensearch/action/search/StreamAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/StreamAsyncAction.java @@ -33,30 +33,22 @@ package org.opensearch.action.search; import org.apache.logging.log4j.Logger; -import org.apache.lucene.search.TopFieldDocs; +import org.opensearch.arrow.StreamManager; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.util.concurrent.AbstractRunnable; -import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.core.action.ActionListener; -import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchHits; import org.opensearch.search.SearchPhaseResult; -import org.opensearch.search.SearchShardTarget; -import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; -import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.internal.ShardSearchRequest; -import org.opensearch.search.profile.SearchProfileShardResults; -import org.opensearch.search.query.QuerySearchResult; import org.opensearch.search.stream.OSTicket; import org.opensearch.search.stream.StreamSearchResult; -import org.opensearch.search.suggest.Suggest; +import org.opensearch.search.stream.join.Join; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.Transport; -import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -66,14 +58,58 @@ import java.util.function.BiFunction; /** - * Async transport action for query then fetch + * Stream at coordinator layer * * @opensearch.internal */ class StreamAsyncAction extends SearchQueryThenFetchAsyncAction { - public StreamAsyncAction(Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, Map> indexRoutings, SearchPhaseController searchPhaseController, Executor executor, QueryPhaseResultConsumer resultConsumer, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState, SearchTask task, SearchResponse.Clusters clusters, SearchRequestContext searchRequestContext, Tracer tracer) { - super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, resultConsumer, request, listener, shardsIts, timeProvider, clusterState, task, clusters, searchRequestContext, tracer); + private final StreamManager streamManager; + private final Join join; + + public StreamAsyncAction( + Logger logger, + SearchTransportService searchTransportService, + BiFunction nodeIdToConnection, + Map aliasFilter, + Map concreteIndexBoosts, + Map> indexRoutings, + SearchPhaseController searchPhaseController, + Executor executor, + QueryPhaseResultConsumer resultConsumer, + SearchRequest request, + ActionListener listener, + GroupShardsIterator shardsIts, + TransportSearchAction.SearchTimeProvider timeProvider, + ClusterState clusterState, + SearchTask task, + SearchResponse.Clusters clusters, + SearchRequestContext searchRequestContext, + Tracer tracer, + StreamManager streamManager + ) { + super( + logger, + searchTransportService, + nodeIdToConnection, + aliasFilter, + concreteIndexBoosts, + indexRoutings, + searchPhaseController, + executor, + resultConsumer, + request, + listener, + shardsIts, + timeProvider, + clusterState, + task, + clusters, + searchRequestContext, + tracer + ); + this.streamManager = streamManager; + this.join = searchRequestContext.getRequest().source().getJoin(); } @Override @@ -82,7 +118,8 @@ protected SearchPhase getNextPhase(final SearchPhaseResults r } class StreamSearchReducePhase extends SearchPhase { - private SearchPhaseContext context; + private final SearchPhaseContext context; + protected StreamSearchReducePhase(String name, SearchPhaseContext context) { super(name); this.context = context; @@ -92,24 +129,77 @@ protected StreamSearchReducePhase(String name, SearchPhaseContext context) { public void run() { context.execute(new StreamReduceAction(context, this)); } - }; + } class StreamReduceAction extends AbstractRunnable { - private SearchPhaseContext context; + private final SearchPhaseContext context; private SearchPhase phase; + StreamReduceAction(SearchPhaseContext context, SearchPhase phase) { this.context = context; - } + @Override protected void doRun() throws Exception { + List tickets = new ArrayList<>(); for (SearchPhaseResult entry : results.getAtomicArray().asList()) { if (entry instanceof StreamSearchResult) { tickets.addAll(((StreamSearchResult) entry).getFlightTickets()); + ((StreamSearchResult) entry).getFlightTickets().forEach(osTicket -> { + System.out.println("Ticket: " + new String(osTicket.getBytes(), StandardCharsets.UTF_8)); + // VectorSchemaRoot root = streamManager.getVectorSchemaRoot(osTicket); + // System.out.println("Number of rows: " + root.getRowCount()); + }); } } - InternalSearchResponse internalSearchResponse = new InternalSearchResponse(SearchHits.empty(),null, null, null, false, false, 1, Collections.emptyList(), tickets); + + // shard/table, schema + + // ticket should contain which IndexShard it comes from + // based on the search request, perform join using these tickets + + // join operate on 2 indexes using condition + // join contain already contain the schema, or at least hold the schema data + + // StreamTicket joinResult = streamManager.registerStream((allocator) -> new ArrowStreamProvider.Task() { + // @Override + // public VectorSchemaRoot init(BufferAllocator allocator) { + // IntVector docIDVector = new IntVector("docID", allocator); + // FieldVector[] vectors = new FieldVector[]{ + // docIDVector + // }; + // VectorSchemaRoot root = new VectorSchemaRoot(Arrays.asList(vectors)); + // return root; + // } + // + // public void run(VectorSchemaRoot root, ArrowStreamProvider.FlushSignal flushSignal) { + // // TODO perform join algo + // IntVector docIDVector = (IntVector) root.getVector("docID"); + // for (int i = 0; i < 10; i++) { + // docIDVector.set(i, i); + // } + // root.setRowCount(10); + // flushSignal.awaitConsumption(1000); + // } + // + // @Override + // public void onCancel() { + // + // } + // }); + + InternalSearchResponse internalSearchResponse = new InternalSearchResponse( + SearchHits.empty(), + null, + null, + null, + false, + false, + 1, + Collections.emptyList(), + List.of(new OSTicket("456".getBytes(), null)) + ); context.sendSearchResponse(internalSearchResponse, results.getAtomicArray()); } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 5df862dd78017..c1f637276fee3 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -124,7 +124,8 @@ import java.util.stream.StreamSupport; import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; -import static org.opensearch.action.search.SearchType.*; +import static org.opensearch.action.search.SearchType.DFS_QUERY_THEN_FETCH; +import static org.opensearch.action.search.SearchType.QUERY_THEN_FETCH; import static org.opensearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort; /** @@ -1324,7 +1325,8 @@ private AbstractSearchAsyncAction searchAsyncAction task, clusters, searchRequestContext, - tracer + tracer, + searchService.getStreamManager() ); break; default: diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 235537240cfc5..4f0462f0b5cdd 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -129,11 +129,7 @@ public class FeatureFlags { ); public static final String ARROW_STREAMS = "opensearch.experimental.feature.arrow.streams.enabled"; - public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting( - ARROW_STREAMS, - true, - Property.NodeScope - ); + public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, true, Property.NodeScope); private static final List> ALL_FEATURE_FLAG_SETTINGS = List.of( REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7244490ee0589..9868ce4ddfe02 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -314,7 +314,9 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; -import static org.opensearch.common.util.FeatureFlags.*; +import static org.opensearch.common.util.FeatureFlags.ARROW_STREAMS_SETTING; +import static org.opensearch.common.util.FeatureFlags.BACKGROUND_TASK_EXECUTION_EXPERIMENTAL; +import static org.opensearch.common.util.FeatureFlags.TELEMETRY; import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; @@ -1364,12 +1366,13 @@ protected Node( throw new IllegalStateException( String.format( Locale.ROOT, - "Only one StreamManagerPlugin can be installed. Found: %d", streamManagerPlugins.size() + "Only one StreamManagerPlugin can be installed. Found: %d", + streamManagerPlugins.size() ) ); } - if(!streamManagerPlugins.isEmpty()) { + if (!streamManagerPlugins.isEmpty()) { streamManager = streamManagerPlugins.get(0).getStreamManager(); logger.info("StreamManager initialized"); } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 05465e32631fd..2fa5b4f70f5f8 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -140,6 +140,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC }; } + static String[] addString(String[] originalArray, String newString) { + String[] newArray = new String[originalArray.length + 1]; + System.arraycopy(originalArray, 0, newArray, 0, originalArray.length); + newArray[newArray.length - 1] = newString; + return newArray; + } + /** * Parses the rest request on top of the SearchRequest, preserving values that are not overridden by the rest request. * @@ -163,6 +170,10 @@ public static void parseSearchRequest( searchRequest.source().parseXContent(requestContentParser, true); } + if (searchRequest.source().getJoin() != null) { + searchRequest.indices(addString(searchRequest.indices(), searchRequest.source().getJoin().getIndex())); + } + final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize()); searchRequest.setBatchedReduceSize(batchedReduceSize); if (request.hasParam("pre_filter_shard_size")) { diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 0757f125eb216..d4966635422ea 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -146,7 +146,7 @@ final class DefaultSearchContext extends SearchContext { private final IndexShard indexShard; private final ClusterService clusterService; private final IndexService indexService; - private final StreamManager streamManager; + private final StreamManager streamManager; private final ContextIndexSearcher searcher; private final DfsSearchResult dfsResult; private final QuerySearchResult queryResult; diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index ddb247a40507e..af59655f8ddcc 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -404,6 +404,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final TaskResourceTrackingService taskResourceTrackingService; private final StreamManager streamManager; + public StreamManager getStreamManager() { + return streamManager; + } + public SearchService( ClusterService clusterService, IndicesService indicesService, @@ -585,7 +589,7 @@ protected ReaderContext removeReaderContext(long id) { } @Override - protected void doStart() { } + protected void doStart() {} @Override protected void doStop() { @@ -869,7 +873,6 @@ public void executeQueryPhase( }, wrapFailureListener(listener, readerContext, markAsUsed)); } - public void executeStreamPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); @@ -1761,7 +1764,8 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException } private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException { - assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType(); + assert request.searchType() == SearchType.QUERY_THEN_FETCH || request.searchType() == SearchType.STREAM : "unexpected search type: " + + request.searchType(); final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null; final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed(getKeepAlive(request)) : () -> {}; try (Releasable ignored = markAsUsed) { diff --git a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java index f494103b625d3..c4374e23abbec 100644 --- a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java @@ -74,6 +74,8 @@ import org.opensearch.search.sort.SortBuilder; import org.opensearch.search.sort.SortBuilders; import org.opensearch.search.sort.SortOrder; +import org.opensearch.search.stream.join.Join; +import org.opensearch.search.stream.join.JoinFieldParser; import org.opensearch.search.suggest.SuggestBuilder; import java.io.IOException; @@ -164,6 +166,7 @@ public static HighlightBuilder highlight() { private QueryBuilder queryBuilder; private Join join; + public Join getJoin() { return join; } @@ -1398,7 +1401,7 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th derivedFieldsObject = parser.map(); } else if (JOIN_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { join = JoinFieldParser.parse(parser); - } else { + } else { throw new ParsingException( parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].", diff --git a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java index baf93cb6cf741..8f53612e98727 100644 --- a/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java +++ b/server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java @@ -114,7 +114,7 @@ public InternalSearchResponse(StreamInput in) throws IOException { in.readOptionalWriteable(SearchProfileShardResults::new), in.readVInt(), readSearchExtBuildersOnOrAfter(in), - (in.readBoolean()? in.readList(OSTicket::new): null) + (in.readBoolean() ? in.readList(OSTicket::new) : null) ); } diff --git a/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java b/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java index 0a53e30ce2ac3..6d8e3330bc042 100644 --- a/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java +++ b/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java @@ -51,7 +51,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public class /**/SearchLookup { +public class /**/ SearchLookup { /** * The maximum depth of field dependencies. * When a runtime field's doc values depends on another runtime field's doc values, diff --git a/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java b/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java index 24aa7c7dea41e..2f8966da2f7a5 100644 --- a/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java +++ b/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java @@ -9,9 +9,12 @@ package org.opensearch.search.query; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.Collector; @@ -19,21 +22,33 @@ import org.opensearch.arrow.ArrowStreamProvider; import org.opensearch.arrow.StreamManager; import org.opensearch.arrow.StreamTicket; -import org.opensearch.arrow.query.ArrowDocIdCollector; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.SearchContextSourcePrinter; import org.opensearch.search.aggregations.AggregationProcessor; +import org.opensearch.search.fetch.subphase.FieldAndFormat; import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; import org.opensearch.search.stream.OSTicket; import org.opensearch.search.stream.StreamSearchResult; +import org.opensearch.search.stream.collector.ArrowCollector; +import org.opensearch.search.stream.collector.ArrowFieldAdaptor; +import org.opensearch.search.stream.join.Join; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import static org.opensearch.search.stream.collector.ArrowFieldAdaptor.getArrowType; + +/** + * Produce stream from a shard search + */ public class StreamSearchPhase extends QueryPhase { private static final Logger LOGGER = LogManager.getLogger(StreamSearchPhase.class); @@ -59,7 +74,9 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep } } - + /** + * Default implementation of {@link QueryPhaseSearcher}. + */ public static class DefaultStreamSearchPhaseSearcher extends DefaultQueryPhaseSearcher { @Override @@ -108,35 +125,79 @@ private boolean searchWithCollector( boolean timeoutSet ) { + // TODO bowen safe check doc values using index reader + + String searchIndex = searchContext.shardTarget().getIndex(); + List fields = searchContext.fetchFieldsContext().fields(); + Join join = searchContext.request().source().getJoin(); + if (join != null) { + String secondIndex = join.getIndex(); + List secondIndexFields = join.getFields(); + if (searchIndex.equals(secondIndex)) { + fields = secondIndexFields; + } + } + + // map from OpenSearch field to Arrow Field type + List arrowFieldAdaptors = new ArrayList<>(); + fields.forEach(field -> { + QueryShardContext shardContext = searchContext.getQueryShardContext(); + MappedFieldType fieldType = shardContext.fieldMapper(field.field); + ArrowType arrowType = getArrowType(fieldType.typeName()); + System.out.println("field: " + field.field + " type: " + arrowType); + arrowFieldAdaptors.add(new ArrowFieldAdaptor(field.field, arrowType, fieldType.typeName())); + }); + QuerySearchResult queryResult = searchContext.queryResult(); + StreamManager streamManager = searchContext.streamManager(); if (streamManager == null) { throw new RuntimeException("StreamManager not setup"); } + + // For each shard search, we open one stream StreamTicket ticket = streamManager.registerStream((allocator -> new ArrowStreamProvider.Task() { @Override public VectorSchemaRoot init(BufferAllocator allocator) { - IntVector docIDVector = new IntVector("docID", allocator); - FieldVector[] vectors = new FieldVector[]{ - docIDVector - }; - VectorSchemaRoot root = new VectorSchemaRoot(Arrays.asList(vectors)); - return root; + Map arrowFields = new HashMap<>(); + + Field docIdField = new Field("docId", FieldType.notNullable(new ArrowType.Int(32, true)), null); + arrowFields.put("docId", docIdField); + Field scoreField = new Field( + "score", + FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), + null + ); + arrowFields.put("score", scoreField); + + arrowFieldAdaptors.forEach(field -> { + Field arrowField = new Field(field.getFieldName(), FieldType.nullable(field.getArrowType()), null); + arrowFields.put(field.getFieldName(), arrowField); + }); + + Schema schema = new Schema(arrowFields.values()); + System.out.println("Schema: " + schema); + return VectorSchemaRoot.create(schema, allocator); } @Override public void run(VectorSchemaRoot root, ArrowStreamProvider.FlushSignal flushSignal) { try { Collector collector = QueryCollectorContext.createQueryCollector(collectors); - final ArrowDocIdCollector arrowDocIdCollector = new ArrowDocIdCollector(collector, root, flushSignal, 1000); + final ArrowCollector arrowCollector = new ArrowCollector(collector, arrowFieldAdaptors, root, 20, flushSignal); try { - searcher.search(query, arrowDocIdCollector); + // TODO arrow collect with query + // TODO arrow random access doc values + System.out.println("run query"); + searcher.search(query, arrowCollector); } catch (EarlyTerminatingCollector.EarlyTerminationException e) { - // EarlyTerminationException is not caught in ContextIndexSearcher to allow force termination of collection. Postcollection + // EarlyTerminationException is not caught in ContextIndexSearcher to allow force termination of collection. + // Postcollection // still needs to be processed for Aggregations when early termination takes place. - searchContext.bucketCollectorProcessor().processPostCollection(arrowDocIdCollector); + searchContext.bucketCollectorProcessor().processPostCollection(arrowCollector); queryResult.terminatedEarly(true); } + if (searchContext.isSearchTimedOut()) { assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set"; if (searchContext.request().allowPartialSearchResults() == false) { @@ -144,7 +205,8 @@ public void run(VectorSchemaRoot root, ArrowStreamProvider.FlushSignal flushSign } queryResult.searchTimedOut(true); } - if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) { + if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER + && queryResult.terminatedEarly() == null) { queryResult.terminatedEarly(false); } @@ -162,7 +224,7 @@ public void onCancel() { } })); StreamSearchResult streamSearchResult = searchContext.streamSearchResult(); - streamSearchResult.flights(List.of(new OSTicket(ticket.getBytes()))); + streamSearchResult.flights(List.of(new OSTicket(ticket.getBytes(), searchContext.shardTarget()))); return false; } } diff --git a/server/src/main/java/org/opensearch/search/stream/OSTicket.java b/server/src/main/java/org/opensearch/search/stream/OSTicket.java index 7b5e700290985..32ec1e02a5986 100644 --- a/server/src/main/java/org/opensearch/search/stream/OSTicket.java +++ b/server/src/main/java/org/opensearch/search/stream/OSTicket.java @@ -15,19 +15,26 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.SearchShardTarget; import java.io.IOException; import java.nio.charset.StandardCharsets; +/** + * OpenSearch Ticket + */ @ExperimentalApi public class OSTicket extends StreamTicket implements Writeable, ToXContentFragment { - public OSTicket(byte[] bytes) { + SearchShardTarget shard; + + public OSTicket(byte[] bytes, SearchShardTarget shard) { super(bytes); + this.shard = shard; } public OSTicket(StreamInput in) throws IOException { - this(in.readByteArray()); + this(in.readByteArray(), new SearchShardTarget(in)); } @Override @@ -39,5 +46,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { out.writeByteArray(this.getBytes()); + shard.writeTo(out); } } diff --git a/server/src/main/java/org/opensearch/search/query/StreamQueryResponse.java b/server/src/main/java/org/opensearch/search/stream/StreamQueryResponse.java similarity index 84% rename from server/src/main/java/org/opensearch/search/query/StreamQueryResponse.java rename to server/src/main/java/org/opensearch/search/stream/StreamQueryResponse.java index 12f46e0be483e..a949cd0996a94 100644 --- a/server/src/main/java/org/opensearch/search/query/StreamQueryResponse.java +++ b/server/src/main/java/org/opensearch/search/stream/StreamQueryResponse.java @@ -6,14 +6,16 @@ * compatible open source license. */ -package org.opensearch.search.query; +package org.opensearch.search.stream; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.transport.TransportResponse; import org.opensearch.search.SearchPhaseResult; import java.io.IOException; +/** + * Stream query response. + */ public class StreamQueryResponse extends SearchPhaseResult { @Override public void writeTo(StreamOutput out) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/stream/StreamSearchResult.java b/server/src/main/java/org/opensearch/search/stream/StreamSearchResult.java index 8e9ae3c9b29d7..5366ce938d6eb 100644 --- a/server/src/main/java/org/opensearch/search/stream/StreamSearchResult.java +++ b/server/src/main/java/org/opensearch/search/stream/StreamSearchResult.java @@ -9,7 +9,6 @@ package org.opensearch.search.stream; import org.opensearch.common.annotation.ExperimentalApi; - import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.search.SearchPhaseResult; @@ -21,6 +20,9 @@ import java.io.IOException; import java.util.List; +/** + * Search phase result for streaming search. + */ @ExperimentalApi public class StreamSearchResult extends SearchPhaseResult { private List flightTickets; @@ -67,7 +69,7 @@ public void setShardIndex(int shardIndex) { @Override public QuerySearchResult queryResult() { - return queryResult; + return queryResult; } public List getFlightTickets() { diff --git a/server/src/main/java/org/opensearch/search/stream/collector/ArrowCollector.java b/server/src/main/java/org/opensearch/search/stream/collector/ArrowCollector.java new file mode 100644 index 0000000000000..2a1f156eaab57 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/stream/collector/ArrowCollector.java @@ -0,0 +1,171 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.stream.collector; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.FilterCollector; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.BytesRef; +import org.opensearch.arrow.ArrowStreamProvider; +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Arrow collector for OpenSearch fields values + */ +@ExperimentalApi +public class ArrowCollector extends FilterCollector { + + List fields; + private final VectorSchemaRoot root; + private final ArrowStreamProvider.FlushSignal flushSignal; + private final int batchSize; + + public ArrowCollector( + Collector in, + List fields, + VectorSchemaRoot root, + int batchSize, + ArrowStreamProvider.FlushSignal flushSignal + ) { + super(in); + this.fields = fields; + this.root = root; + this.batchSize = batchSize; + this.flushSignal = flushSignal; + } + + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + System.out.println("getLeafCollector"); + + Map docValueIterators = new HashMap<>(); + Map vectors = new HashMap<>(); + // TODO bowen the vector we get from root may not work with concurrent segment search? + // looks fine if the segment search is executed in sequential + vectors.put("docId", root.getVector("docId")); + vectors.put("score", root.getVector("score")); + fields.forEach(field -> { + try { + ArrowFieldAdaptor.DocValuesType dv = field.getDocValues(context.reader()); + docValueIterators.put(field.fieldName, dv); + vectors.put(field.fieldName, root.getVector(field.fieldName)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + final int[] currentRow = { 0 }; + return new LeafCollector() { + + private Scorable scorer; + + @Override + public void collect(int docId) throws IOException { + // innerLeafCollector.collect(docId); + + System.out.println("collect docId " + docId); + System.out.println("current row " + currentRow[0]); + + FieldVector docIDVector = vectors.get("docId"); + ((IntVector) docIDVector).setSafe(currentRow[0], docId); + + FieldVector scoreVector = vectors.get("score"); + ((Float4Vector) scoreVector).setSafe(currentRow[0], scorer.score()); + System.out.println("set score " + scorer.score()); + + // read from the lucene field values + for (Map.Entry entry : docValueIterators.entrySet()) { + + String field = entry.getKey(); + + ArrowFieldAdaptor.DocValuesType dv = entry.getValue(); + boolean numeric = false; + SortedNumericDocValues numericDocValues = null; + SortedSetDocValues sortedDocValues = null; + if (dv instanceof ArrowFieldAdaptor.NumericDocValuesType) { + numericDocValues = ((ArrowFieldAdaptor.NumericDocValuesType) dv).getNumericDocValues(); + numeric = true; + } else if (dv instanceof ArrowFieldAdaptor.SortedDocValuesType) { + sortedDocValues = ((ArrowFieldAdaptor.SortedDocValuesType) dv).getSortedDocValues(); + } + + FieldVector vector = vectors.get(field); + + if (numeric) { + if (numericDocValues.advanceExact(docId)) { + long value = numericDocValues.nextValue(); + ((BigIntVector) vector).setSafe(currentRow[0], value); + System.out.println("set numeric value " + value); + } + } else { + if (sortedDocValues.advanceExact(docId)) { + long ord = sortedDocValues.nextOrd(); + BytesRef keyword = sortedDocValues.lookupOrd(ord); + ((VarCharVector) vector).setSafe(currentRow[0], keyword.utf8ToString().getBytes()); + System.out.println("set string value " + keyword.utf8ToString()); + } + } + } + + currentRow[0]++; + if (currentRow[0] >= batchSize) { + root.setRowCount(batchSize); + flushSignal.awaitConsumption(1000); + System.out.println("flushed when batch size hit"); + currentRow[0] = 0; + } + } + + @Override + public void finish() throws IOException { + if (currentRow[0] > 0) { + root.setRowCount(currentRow[0]); + flushSignal.awaitConsumption(1000); + System.out.println("finish flush"); + currentRow[0] = 0; + } + } + + @Override + public void setScorer(Scorable scorable) throws IOException { + // innerLeafCollector.setScorer(scorable); + this.scorer = scorable; + } + }; + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public void setWeight(Weight weight) { + if (this.in != null) { + this.in.setWeight(weight); + } + } +} diff --git a/server/src/main/java/org/opensearch/arrow/query/ArrowDocIdCollector.java b/server/src/main/java/org/opensearch/search/stream/collector/ArrowDocIdCollector.java similarity index 87% rename from server/src/main/java/org/opensearch/arrow/query/ArrowDocIdCollector.java rename to server/src/main/java/org/opensearch/search/stream/collector/ArrowDocIdCollector.java index 0790f4e9044f6..2cb5e52993db7 100644 --- a/server/src/main/java/org/opensearch/arrow/query/ArrowDocIdCollector.java +++ b/server/src/main/java/org/opensearch/search/stream/collector/ArrowDocIdCollector.java @@ -6,8 +6,10 @@ * compatible open source license. */ -package org.opensearch.arrow.query; -import org.apache.arrow.vector .*; +package org.opensearch.search.stream.collector; + +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.Collector; import org.apache.lucene.search.FilterCollector; @@ -19,11 +21,14 @@ import java.io.IOException; +/** + * A collector that collects document IDs and stores them in an Arrow vector. + */ public class ArrowDocIdCollector extends FilterCollector { private final VectorSchemaRoot root; private final ArrowStreamProvider.FlushSignal flushSignal; private final int batchSize; - private IntVector docIDVector; + private final IntVector docIDVector; private int currentRow; public ArrowDocIdCollector(Collector in, VectorSchemaRoot root, ArrowStreamProvider.FlushSignal flushSignal, int batchSize) { @@ -47,10 +52,9 @@ public ScoreMode scoreMode() { return ScoreMode.TOP_DOCS; } - @Override public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { - LeafCollector inner = (this.in == null ? null: super.getLeafCollector(context)); + LeafCollector inner = (this.in == null ? null : super.getLeafCollector(context)); return new LeafCollector() { @Override public void setScorer(Scorable scorer) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/stream/collector/ArrowFieldAdaptor.java b/server/src/main/java/org/opensearch/search/stream/collector/ArrowFieldAdaptor.java new file mode 100644 index 0000000000000..c9c6fbdd78dd9 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/stream/collector/ArrowFieldAdaptor.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.stream.collector; + +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Adaptor class to map an OpenSearch field to an Arrow field. + */ +@ExperimentalApi +public class ArrowFieldAdaptor { + public String getFieldName() { + return fieldName; + } + + public ArrowType getArrowType() { + return arrowType; + } + + String fieldName; + ArrowType arrowType; + String fieldType; + + public ArrowFieldAdaptor(String fieldName, ArrowType arrowType, String fieldType) { + this.fieldName = fieldName; + this.arrowType = arrowType; + this.fieldType = fieldType; + } + + /** + * Gets the appropriate DocValues for this field based on its OpenSearch field type. + * + * @param leafReader The LeafReader to get DocValues from + * @return The appropriate DocValues object for this field + * @throws IOException If there's an error reading from the index + * @throws IllegalArgumentException If the field type is not supported + */ + public DocValuesType getDocValues(LeafReader leafReader) throws IOException { + switch (fieldType.toLowerCase()) { + case "long": + case "integer": + case "short": + case "byte": + case "double": + case "float": + case "date": + return new NumericDocValuesType(leafReader.getSortedNumericDocValues(fieldName)); + + case "keyword": + case "ip": + case "boolean": + return new SortedDocValuesType(leafReader.getSortedSetDocValues(fieldName)); + + default: + throw new IllegalArgumentException("Unsupported field type: " + fieldType); + } + } + + // TODO bowen can use lucene DocValuesType? + /** + * Interface for DocValues types supported by Arrow. + */ + @ExperimentalApi + public interface DocValuesType {} + + /** + * Class for NumericDocValues types supported by Arrow. + */ + public static class NumericDocValuesType implements DocValuesType { + private final SortedNumericDocValues numericDocValues; + + public NumericDocValuesType(SortedNumericDocValues numericDocValues) { + this.numericDocValues = numericDocValues; + } + + public SortedNumericDocValues getNumericDocValues() { + return numericDocValues; + } + } + + /** + * Class for SortedDocValues types supported by Arrow. + */ + public static class SortedDocValuesType implements DocValuesType { + private final SortedSetDocValues sortedDocValues; + + public SortedDocValuesType(SortedSetDocValues sortedDocValues) { + this.sortedDocValues = sortedDocValues; + } + + public SortedSetDocValues getSortedDocValues() { + return sortedDocValues; + } + } + + private static final Map typeMap; + + static { + typeMap = new HashMap<>(); + + // Numeric types + typeMap.put("long", new ArrowType.Int(64, true)); + typeMap.put("integer", new ArrowType.Int(32, true)); + typeMap.put("short", new ArrowType.Int(16, true)); + typeMap.put("byte", new ArrowType.Int(8, true)); + typeMap.put("double", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)); + typeMap.put("float", new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)); + + // Date type + typeMap.put("date", new ArrowType.Date(DateUnit.MILLISECOND)); + + // Keyword type (assuming it's represented as a string in Arrow) + typeMap.put("keyword", new ArrowType.Utf8()); + + // Boolean type + typeMap.put("boolean", new ArrowType.Bool()); + + // IP type (assuming it's represented as a fixed size binary in Arrow) + typeMap.put("ip", new ArrowType.FixedSizeBinary(16)); // IPv6 address length + } + + /** + * Gets the appropriate ArrowType for a given OpenSearch field type. + * + * @param openSearchType The OpenSearch field type + * @return The corresponding ArrowType + * @throws IllegalArgumentException If the field type is not supported + */ + public static ArrowType getArrowType(String openSearchType) { + ArrowType arrowType = typeMap.get(openSearchType.toLowerCase()); + if (arrowType == null) { + throw new IllegalArgumentException("Unsupported OpenSearch type: " + openSearchType); + } + return arrowType; + } +} diff --git a/server/src/main/java/org/opensearch/search/stream/collector/package-info.java b/server/src/main/java/org/opensearch/search/stream/collector/package-info.java new file mode 100644 index 0000000000000..d9e6a22bcf143 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/stream/collector/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * The arrow related classes + */ +package org.opensearch.search.stream.collector; diff --git a/server/src/main/java/org/opensearch/search/builder/Join.java b/server/src/main/java/org/opensearch/search/stream/join/Join.java similarity index 96% rename from server/src/main/java/org/opensearch/search/builder/Join.java rename to server/src/main/java/org/opensearch/search/stream/join/Join.java index f17b0a21d7433..2301b05e880bd 100644 --- a/server/src/main/java/org/opensearch/search/builder/Join.java +++ b/server/src/main/java/org/opensearch/search/stream/join/Join.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.search.builder; +package org.opensearch.search.stream.join; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.xcontent.XContentParser; @@ -16,6 +16,9 @@ import java.io.IOException; import java.util.List; +/** + * Represents a join condition in a search query. + */ @ExperimentalApi public class Join { private String index; @@ -99,5 +102,3 @@ void parseCondition(XContentParser parser) throws IOException { setCondition(condition); } } - - diff --git a/server/src/main/java/org/opensearch/search/builder/JoinCondition.java b/server/src/main/java/org/opensearch/search/stream/join/JoinCondition.java similarity index 90% rename from server/src/main/java/org/opensearch/search/builder/JoinCondition.java rename to server/src/main/java/org/opensearch/search/stream/join/JoinCondition.java index d9c638d163f45..607a1b8afcc40 100644 --- a/server/src/main/java/org/opensearch/search/builder/JoinCondition.java +++ b/server/src/main/java/org/opensearch/search/stream/join/JoinCondition.java @@ -6,10 +6,13 @@ * compatible open source license. */ -package org.opensearch.search.builder; +package org.opensearch.search.stream.join; import org.opensearch.common.annotation.ExperimentalApi; +/** + * Represents a join condition in a search query. + */ @ExperimentalApi public class JoinCondition { private String leftField; diff --git a/server/src/main/java/org/opensearch/search/builder/JoinFieldParser.java b/server/src/main/java/org/opensearch/search/stream/join/JoinFieldParser.java similarity index 96% rename from server/src/main/java/org/opensearch/search/builder/JoinFieldParser.java rename to server/src/main/java/org/opensearch/search/stream/join/JoinFieldParser.java index d2e1d1e75c1e4..04fd1278d8746 100644 --- a/server/src/main/java/org/opensearch/search/builder/JoinFieldParser.java +++ b/server/src/main/java/org/opensearch/search/stream/join/JoinFieldParser.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.search.builder; +package org.opensearch.search.stream.join; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.fetch.subphase.FieldAndFormat; @@ -17,6 +17,9 @@ import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; +/** + * Parser for {@link Join} objects. + */ public class JoinFieldParser { public static Join parse(XContentParser parser) throws IOException { @@ -68,5 +71,4 @@ private static void parseJoinField(Join join, String currentFieldName, XContentP } } - } diff --git a/server/src/main/java/org/opensearch/search/stream/join/package-info.java b/server/src/main/java/org/opensearch/search/stream/join/package-info.java new file mode 100644 index 0000000000000..2f436088240ae --- /dev/null +++ b/server/src/main/java/org/opensearch/search/stream/join/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * stream join + */ +package org.opensearch.search.stream.join; diff --git a/server/src/main/java/org/opensearch/search/stream/package-info.java b/server/src/main/java/org/opensearch/search/stream/package-info.java new file mode 100644 index 0000000000000..c78ffe4c6bc3b --- /dev/null +++ b/server/src/main/java/org/opensearch/search/stream/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * The OpenSearch search stream package. + */ +package org.opensearch.search.stream; diff --git a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java index 0d5ecbf8134c5..c744f2592e24f 100644 --- a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java @@ -435,7 +435,7 @@ public void execute() { query = geoShapeQuery("geopoint", new Rectangle(0.0, 55.0, 55.0, 0.0)).toQuery(queryShardContext); topDocs = searcher.search(query, 10); assertEquals(4, topDocs.totalHits.value); - } + } } } diff --git a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java index 34aeb466ae360..954e9b22c3c2a 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java @@ -556,8 +556,8 @@ public void testArrow() throws Exception { final int numDocs = scaledRandomIntBetween(100, 200); for (int i = 0; i < numDocs; ++i) { Document doc = new Document(); - doc.add(new StringField("joinField", Integer.toString(i%10), Store.NO)); - doc.add(new SortedSetDocValuesField("joinField", new BytesRef(Integer.toString(i%10)))); + doc.add(new StringField("joinField", Integer.toString(i % 10), Store.NO)); + doc.add(new SortedSetDocValuesField("joinField", new BytesRef(Integer.toString(i % 10)))); w.addDocument(doc); } w.close();