Skip to content

Commit

Permalink
chore | Add support for providing filters in spanJoiner (#181)
Browse files Browse the repository at this point in the history
* Add support for providing filters in spanJoiner

* Resolve PR reviews
  • Loading branch information
AnandShivansh authored Jun 7, 2024
1 parent 4202d01 commit 36bd206
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import lombok.AllArgsConstructor;
import lombok.Value;
Expand Down Expand Up @@ -86,26 +86,32 @@ private class DefaultSpanJoiner implements SpanJoiner {

@Override
public <T> Single<Map<T, Span>> joinSpan(
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) {
Collection<T> joinSources,
SpanIdGetter<T> spanIdGetter,
Collection<FilterArgument> filterArguments) {
Function<T, Single<List<String>>> idsGetter =
source -> spanIdGetter.getSpanId(source).map(List::of);
return this.joinSpans(joinSources, idsGetter, SPAN_KEY).map(this::reduceMap);
return this.joinSpans(joinSources, idsGetter, filterArguments, SPAN_KEY).map(this::reduceMap);
}

@Override
public <T> Single<ListMultimap<T, Span>> joinSpans(
Collection<T> joinSources, MultipleSpanIdGetter<T> multipleSpanIdGetter) {
return this.joinSpans(joinSources, multipleSpanIdGetter::getSpanIds, SPANS_KEY);
Collection<T> joinSources,
MultipleSpanIdGetter<T> multipleSpanIdGetter,
Collection<FilterArgument> filterArguments) {
return this.joinSpans(
joinSources, multipleSpanIdGetter::getSpanIds, filterArguments, SPANS_KEY);
}

private <T> Single<ListMultimap<T, Span>> joinSpans(
Collection<T> joinSources,
Function<T, Single<List<String>>> idsGetter,
Collection<FilterArgument> filterArguments,
String joinSpanKey) {
return this.buildSourceToIdsMap(joinSources, idsGetter)
.flatMap(
sourceToSpanIdsMap ->
this.buildSpanRequest(sourceToSpanIdsMap, joinSpanKey)
this.buildSpanRequest(sourceToSpanIdsMap, joinSpanKey, filterArguments)
.flatMap(spanDao::getSpans)
.map(this::buildSpanIdToSpanMap)
.map(
Expand Down Expand Up @@ -152,15 +158,16 @@ private Map<String, Span> buildSpanIdToSpanMap(SpanResultSet resultSet) {
}

private <T> Single<SpanRequest> buildSpanRequest(
ListMultimap<T, String> sourceToSpanIdsMultimap, String joinSpanKey) {
ListMultimap<T, String> sourceToSpanIdsMultimap,
String joinSpanKey,
Collection<FilterArgument> filterArguments) {
Collection<String> spanIds =
sourceToSpanIdsMultimap.values().stream()
.distinct()
.collect(Collectors.toUnmodifiableList());
List<SelectedField> selectedFields = getSelections(joinSpanKey);
return buildSpanIdsFilter(context, spanIds)
.flatMap(
filterArguments -> buildSpanRequest(spanIds.size(), filterArguments, selectedFields));
return buildSpanIdsFilter(context, spanIds, filterArguments)
.flatMap(filters -> buildSpanRequest(spanIds.size(), filters, selectedFields));
}

private Single<SpanRequest> buildSpanRequest(
Expand All @@ -182,8 +189,14 @@ private Single<SpanRequest> buildSpanRequest(
}

private Single<List<AttributeAssociation<FilterArgument>>> buildSpanIdsFilter(
GraphQlRequestContext context, Collection<String> spanIds) {
return filterRequestBuilder.build(context, SPAN, Set.of(new SpanIdFilter(spanIds)));
GraphQlRequestContext context,
Collection<String> spanIds,
Collection<FilterArgument> filterArguments) {
return filterRequestBuilder.build(
context,
SPAN,
Stream.concat(filterArguments.stream(), Stream.of(new SpanIdFilter(spanIds)))
.collect(Collectors.toUnmodifiableList()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument;
import org.hypertrace.core.graphql.span.schema.Span;

public interface SpanJoiner {
Expand All @@ -16,21 +17,40 @@ public interface SpanJoiner {
new SpanJoiner() {
@Override
public <T> Single<Map<T, Span>> joinSpan(
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) {
Collection<T> joinSources,
SpanIdGetter<T> spanIdGetter,
Collection<FilterArgument> filterArguments) {
return Single.just(Collections.emptyMap());
}

@Override
public <T> Single<ListMultimap<T, Span>> joinSpans(
Collection<T> joinSources, MultipleSpanIdGetter<T> multipleSpanIdGetter) {
Collection<T> joinSources,
MultipleSpanIdGetter<T> multipleSpanIdGetter,
Collection<FilterArgument> filterArguments) {
return Single.just(ArrayListMultimap.create());
}
};

<T> Single<Map<T, Span>> joinSpan(Collection<T> joinSources, SpanIdGetter<T> spanIdGetter);
default <T> Single<Map<T, Span>> joinSpan(
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) {
return joinSpan(joinSources, spanIdGetter, Collections.emptyList());
}

default <T> Single<ListMultimap<T, Span>> joinSpans(
Collection<T> joinSources, MultipleSpanIdGetter<T> multipleSpanIdGetter) {
return joinSpans(joinSources, multipleSpanIdGetter, Collections.emptyList());
}

<T> Single<Map<T, Span>> joinSpan(
Collection<T> joinSources,
SpanIdGetter<T> spanIdGetter,
Collection<FilterArgument> filterArguments);

<T> Single<ListMultimap<T, Span>> joinSpans(
Collection<T> joinSources, MultipleSpanIdGetter<T> multipleSpanIdGetter);
Collection<T> joinSources,
MultipleSpanIdGetter<T> multipleSpanIdGetter,
Collection<FilterArgument> filterArguments);

@FunctionalInterface
interface SpanIdGetter<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import static org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString.SPAN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -15,6 +15,7 @@
import graphql.schema.DataFetchingFieldSelectionSet;
import graphql.schema.SelectedField;
import io.reactivex.rxjava3.core.Single;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -88,7 +89,7 @@ void fetchSpans() {
mockSelectionSet,
SelectionQuery.builder().selectionPath(List.of("pathToSpan", "span")).build()))
.thenReturn(Stream.of(mock(SelectedField.class), mock(SelectedField.class)));
when(mockFilterRequestBuilder.build(eq(mockRequestContext), eq(SPAN), anySet()))
when(mockFilterRequestBuilder.build(eq(mockRequestContext), eq(SPAN), anyList()))
.thenReturn(Single.just(List.of(mockFilter)));

when(mockResultSetRequestBuilder.build(
Expand All @@ -112,7 +113,10 @@ void fetchSpans() {
List.of("pathToSpan"))
.blockingGet();
assertEquals(
expected, joiner.joinSpan(joinSources, new TestJoinSourceIdGetter()).blockingGet());
expected,
joiner
.joinSpan(joinSources, new TestJoinSourceIdGetter(), Collections.emptyList())
.blockingGet());
}

@Test
Expand All @@ -129,7 +133,7 @@ void fetchMultipleSpans() {
mockSelectionSet,
SelectionQuery.builder().selectionPath(List.of("pathToSpans", "spans")).build()))
.thenReturn(Stream.of(mock(SelectedField.class), mock(SelectedField.class)));
when(mockFilterRequestBuilder.build(eq(mockRequestContext), eq(SPAN), anySet()))
when(mockFilterRequestBuilder.build(eq(mockRequestContext), eq(SPAN), anyList()))
.thenReturn(Single.just(List.of(mockFilter)));

when(mockResultSetRequestBuilder.build(
Expand All @@ -155,7 +159,9 @@ void fetchMultipleSpans() {
.blockingGet();
assertEquals(
expected,
joiner.joinSpans(joinSources, new TestMultipleJoinSourceIdGetter()).blockingGet());
joiner
.joinSpans(joinSources, new TestMultipleJoinSourceIdGetter(), Collections.emptyList())
.blockingGet());
}

private void mockResult(List<Span> spans) {
Expand Down

0 comments on commit 36bd206

Please sign in to comment.