Skip to content

Commit

Permalink
Add sliced scroll support to ChunkCommand (#18205)
Browse files Browse the repository at this point in the history
* Add sliced scroll support to ChunkCommand

* method to check if shardInfo is primary & started

---------

Co-authored-by: Maxwell <[email protected]>
Co-authored-by: Maxwell Anipah <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2024
1 parent 4da7c08 commit 38b0487
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
*/
package org.graylog.storage.elasticsearch7;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.graylog.plugins.views.search.searchfilters.db.UsedSearchFiltersToQueryStringsMapper;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.query.BoolQueryBuilder;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.query.QueryBuilder;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.index.query.QueryBuilders;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.search.builder.SearchSourceBuilder;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.search.slice.SliceBuilder;
import org.graylog2.indexer.searches.ChunkCommand;
import org.graylog2.indexer.searches.SearchesConfig;
import org.graylog2.indexer.searches.Sorting;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.Stream;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -67,6 +67,8 @@ public SearchSourceBuilder create(final ChunkCommand chunkCommand) {
searchSourceBuilder.fetchSource(chunkCommand.fields().toArray(new String[0]), new String[0]);
chunkCommand.batchSize()
.ifPresent(batchSize -> searchSourceBuilder.size(Math.toIntExact(batchSize)));
chunkCommand.sliceParams()
.ifPresent(sliceParams -> searchSourceBuilder.slice(new SliceBuilder(sliceParams.id(), sliceParams.max())));
return searchSourceBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
*/
package org.graylog.storage.opensearch2;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.graylog.plugins.views.search.searchfilters.db.UsedSearchFiltersToQueryStringsMapper;
import org.graylog.shaded.opensearch2.org.opensearch.index.query.BoolQueryBuilder;
import org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilder;
import org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders;
import org.graylog.shaded.opensearch2.org.opensearch.search.builder.SearchSourceBuilder;
import org.graylog.shaded.opensearch2.org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.graylog.shaded.opensearch2.org.opensearch.search.slice.SliceBuilder;
import org.graylog2.indexer.searches.ChunkCommand;
import org.graylog2.indexer.searches.SearchesConfig;
import org.graylog2.indexer.searches.Sorting;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.Stream;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -67,6 +67,8 @@ public SearchSourceBuilder create(final ChunkCommand chunkCommand) {
searchSourceBuilder.fetchSource(chunkCommand.fields().toArray(new String[0]), new String[0]);
chunkCommand.batchSize()
.ifPresent(batchSize -> searchSourceBuilder.size(Math.toIntExact(batchSize)));
chunkCommand.sliceParams()
.ifPresent(sliceParams -> searchSourceBuilder.slice(new SliceBuilder(sliceParams.id(), sliceParams.max())));
return searchSourceBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.function.Function;

public record ShardsInfo(String index, int shard, ShardType shardType, State state, long docs, String store, String ip, String node ) {
public static boolean isStartedPrimaryShard(ShardsInfo shardsInfo) {
return shardsInfo.shardType() == ShardsInfo.ShardType.PRIMARY && shardsInfo.state() == ShardsInfo.State.STARTED;
}

public static ShardsInfo create(JsonNode jsonNode) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import org.graylog.plugins.views.search.searchfilters.model.UsedSearchFilter;
import org.graylog2.plugin.indexer.searches.timeranges.TimeRange;

Expand Down Expand Up @@ -50,11 +51,27 @@ public abstract class ChunkCommand {
public abstract Optional<TimeRange> range();

public abstract OptionalInt limit();

public abstract OptionalInt offset();

public abstract List<String> fields();

public abstract OptionalLong batchSize();

public abstract Optional<SliceParams> sliceParams();

public abstract boolean highlight();

public abstract Builder toBuilder();

public record SliceParams(int id, int max) {
public SliceParams {
Preconditions.checkArgument(id >= 0 && id <= 9, "slice id must be between 0 and 9");
Preconditions.checkArgument(max >= 2 && max <= 10, "slice max must be between 2 and 10");
Preconditions.checkArgument(max > id, "max must be greater than id");
}
}

public static Builder builder() {
return new AutoValue_ChunkCommand.Builder()
.query("")
Expand All @@ -66,16 +83,33 @@ public static Builder builder() {
@AutoValue.Builder
public static abstract class Builder {
public abstract Builder query(String query);

public abstract Builder indices(Set<String> indices);

public abstract Builder streams(Set<String> streams);

public abstract Builder sorting(Sorting sorting);

public abstract Builder filter(@Nullable String filter);

public abstract Builder filters(List<UsedSearchFilter> filters);

public abstract Builder range(TimeRange range);

public abstract Builder limit(int limit);

public abstract Builder offset(int offset);

public abstract Builder fields(List<String> fields);

public abstract Builder batchSize(int batchSize);

public abstract Builder sliceParams(@Nullable SliceParams sliceParams);

public Builder sliceParams(int id, int max) {
return sliceParams(new SliceParams(id, max));
}

public abstract Builder highlight(boolean highlight);

public abstract ChunkCommand build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ public IndexSet getIndexSet() {
};
when(streamService.load(STREAM_ID)).thenReturn(stream);
CountResult result = searches.count("*", AbsoluteRange.create(
new DateTime(2015, 1, 1, 0, 0, DateTimeZone.UTC),
new DateTime(2015, 1, 2, 0, 0, DateTimeZone.UTC)),
new DateTime(2015, 1, 1, 0, 0, DateTimeZone.UTC),
new DateTime(2015, 1, 2, 0, 0, DateTimeZone.UTC)),
"streams:" + STREAM_ID);

assertThat(result.count()).isEqualTo(5L);
Expand All @@ -228,8 +228,8 @@ public void testCountWithInvalidFilter() throws Exception {
importFixture("org/graylog2/indexer/searches/SearchesIT.json");

CountResult result = searches.count("*", AbsoluteRange.create(
new DateTime(2015, 1, 1, 0, 0, DateTimeZone.UTC),
new DateTime(2015, 1, 2, 0, 0, DateTimeZone.UTC)),
new DateTime(2015, 1, 1, 0, 0, DateTimeZone.UTC),
new DateTime(2015, 1, 2, 0, 0, DateTimeZone.UTC)),
"foobar-not-a-filter");

assertThat(result.count()).isEqualTo(0L);
Expand Down Expand Up @@ -547,6 +547,26 @@ public void scrollReturnsMultipleChunksRespectingBatchSize() throws Exception {
assertThat(resultMessages).hasSize(10);
}

@Test
public void scrollWithChunkCommandAndSlicedScroll() {
importFixture("org/graylog2/indexer/searches/SearchesIT.json");

when(indexSetRegistry.getForIndices(Collections.singleton("graylog_0"))).thenReturn(Collections.singleton(indexSet));
var searchesAdapter = createSearchesAdapter();

var chunkCommand1 = ChunkCommand.builder()
.indices(Set.of("graylog_0"))
.sliceParams(0, 2)
.build();

final ChunkedResult scrollResult1 = searchesAdapter.scroll(chunkCommand1);
final ChunkedResult scrollResult2 = searchesAdapter.scroll(chunkCommand1.toBuilder().sliceParams(1, 2).build());

assertThat(scrollResult1).isNotNull();
assertThat(scrollResult2).isNotNull();
assertThat(scrollResult1.totalHits() + scrollResult2.totalHits()).isEqualTo(10L);
}

@Test
public void scrollReturnsMultipleChunksRespectingLimit() throws Exception {
importFixture("org/graylog2/indexer/searches/SearchesIT.json");
Expand Down

0 comments on commit 38b0487

Please sign in to comment.