From 261a2cdc0f4caa3ae3c4677bddff088e5714a7a4 Mon Sep 17 00:00:00 2001 From: Shivansh Anand Srivastava Date: Thu, 23 May 2024 03:13:37 +0530 Subject: [PATCH] Add SpanJoiner based on spanId --- .../core/graphql/span/SpanSchemaModule.java | 2 + .../span/joiner/DefaultSpanJoinerBuilder.java | 180 ++++++++++++++++++ .../core/graphql/span/joiner/SpanJoin.java | 13 ++ .../core/graphql/span/joiner/SpanJoiner.java | 27 +++ .../span/joiner/SpanJoinerBuilder.java | 15 ++ .../graphql/span/joiner/SpanJoinerModule.java | 20 ++ .../span/joiner/SpanJoinerBuilderTest.java | 165 ++++++++++++++++ 7 files changed, 422 insertions(+) create mode 100644 hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/DefaultSpanJoinerBuilder.java create mode 100644 hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoin.java create mode 100644 hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoiner.java create mode 100644 hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilder.java create mode 100644 hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerModule.java create mode 100644 hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilderTest.java diff --git a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/SpanSchemaModule.java b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/SpanSchemaModule.java index bbc21668..f677143c 100644 --- a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/SpanSchemaModule.java +++ b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/SpanSchemaModule.java @@ -4,6 +4,7 @@ import com.google.inject.multibindings.Multibinder; import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder; import org.hypertrace.core.graphql.span.dao.SpanDaoModule; +import org.hypertrace.core.graphql.span.joiner.SpanJoinerModule; import org.hypertrace.core.graphql.span.request.SpanRequestModule; import org.hypertrace.core.graphql.spi.schema.GraphQlSchemaFragment; @@ -17,5 +18,6 @@ protected void configure() { requireBinding(ResultSetRequestBuilder.class); install(new SpanDaoModule()); install(new SpanRequestModule()); + install(new SpanJoinerModule()); } } diff --git a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/DefaultSpanJoinerBuilder.java b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/DefaultSpanJoinerBuilder.java new file mode 100644 index 00000000..bf4ea147 --- /dev/null +++ b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/DefaultSpanJoinerBuilder.java @@ -0,0 +1,180 @@ +package org.hypertrace.core.graphql.span.joiner; + +import static com.google.common.collect.ImmutableList.copyOf; +import static com.google.common.collect.Iterables.concat; +import static org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString.SPAN; +import static org.hypertrace.core.graphql.span.joiner.SpanJoin.SPAN_KEY; + +import graphql.schema.DataFetchingFieldSelectionSet; +import graphql.schema.SelectedField; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.inject.Inject; +import lombok.AllArgsConstructor; +import lombok.Value; +import lombok.experimental.Accessors; +import org.hypertrace.core.graphql.common.request.AttributeAssociation; +import org.hypertrace.core.graphql.common.request.AttributeRequest; +import org.hypertrace.core.graphql.common.request.FilterRequestBuilder; +import org.hypertrace.core.graphql.common.request.ResultSetRequest; +import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder; +import org.hypertrace.core.graphql.common.schema.arguments.TimeRangeArgument; +import org.hypertrace.core.graphql.common.schema.attributes.AttributeScope; +import org.hypertrace.core.graphql.common.schema.attributes.arguments.AttributeExpression; +import org.hypertrace.core.graphql.common.schema.id.Identifiable; +import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument; +import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterOperatorType; +import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterType; +import org.hypertrace.core.graphql.common.schema.results.arguments.order.OrderArgument; +import org.hypertrace.core.graphql.context.GraphQlRequestContext; +import org.hypertrace.core.graphql.span.dao.SpanDao; +import org.hypertrace.core.graphql.span.request.SpanRequest; +import org.hypertrace.core.graphql.span.schema.Span; +import org.hypertrace.core.graphql.span.schema.SpanResultSet; +import org.hypertrace.core.graphql.utils.schema.GraphQlSelectionFinder; +import org.hypertrace.core.graphql.utils.schema.SelectionQuery; + +public class DefaultSpanJoinerBuilder implements SpanJoinerBuilder { + + private static final int ZERO_OFFSET = 0; + + private final SpanDao spanDao; + private final GraphQlSelectionFinder selectionFinder; + private final ResultSetRequestBuilder resultSetRequestBuilder; + private final FilterRequestBuilder filterRequestBuilder; + + @Inject + DefaultSpanJoinerBuilder( + SpanDao spanDao, + GraphQlSelectionFinder selectionFinder, + ResultSetRequestBuilder resultSetRequestBuilder, + FilterRequestBuilder filterRequestBuilder) { + this.spanDao = spanDao; + this.selectionFinder = selectionFinder; + this.resultSetRequestBuilder = resultSetRequestBuilder; + this.filterRequestBuilder = filterRequestBuilder; + } + + @Override + public Single build( + GraphQlRequestContext context, + TimeRangeArgument timeRange, + DataFetchingFieldSelectionSet selectionSet, + List pathToSpanJoin) { + return Single.just( + new DefaultSpanJoiner( + context, timeRange, this.getSelections(selectionSet, pathToSpanJoin))); + } + + private List getSelections( + DataFetchingFieldSelectionSet selectionSet, List pathToSpanJoin) { + List fullPath = copyOf(concat(pathToSpanJoin, List.of(SPAN_KEY))); + return selectionFinder + .findSelections(selectionSet, SelectionQuery.builder().selectionPath(fullPath).build()) + .collect(Collectors.toUnmodifiableList()); + } + + @AllArgsConstructor + private class DefaultSpanJoiner implements SpanJoiner { + + private final GraphQlRequestContext context; + private final TimeRangeArgument timeRange; + private final List selectedFields; + + @Override + public Single> joinSpans( + Collection joinSources, SpanIdGetter spanIdGetter) { + return this.buildSourceToIdMap(joinSources, spanIdGetter).flatMap(this::joinSpans); + } + + private Single> joinSpans(Map sourceToSpanIdMap) { + return this.buildSpanRequest(sourceToSpanIdMap) + .flatMap(spanDao::getSpans) + .map(this::buildSpanIdToSpanMap) + .map(spanIdToSpanMap -> buildSourceToSpanMap(sourceToSpanIdMap, spanIdToSpanMap)); + } + + private Map buildSourceToSpanMap( + Map sourceToSpanIdMap, Map spanIdToSpanMap) { + return sourceToSpanIdMap.entrySet().stream() + .filter(entry -> spanIdToSpanMap.containsKey(entry.getValue())) + .collect( + Collectors.toUnmodifiableMap( + Entry::getKey, entry -> spanIdToSpanMap.get(entry.getValue()))); + } + + private Map buildSpanIdToSpanMap(SpanResultSet resultSet) { + return resultSet.results().stream() + .collect(Collectors.toUnmodifiableMap(Identifiable::id, Function.identity())); + } + + private Single buildSpanRequest(Map sourceToSpanIdMap) { + Collection spanIds = sourceToSpanIdMap.values(); + return buildSpanIdsFilter(spanIds) + .flatMap(filterArguments -> buildSpanRequest(spanIds.size(), filterArguments)); + } + + private Single buildSpanRequest( + int size, List> filterArguments) { + return resultSetRequestBuilder + .build( + context, + SPAN, + size, + ZERO_OFFSET, + timeRange, + Collections.emptyList(), + filterArguments, + selectedFields.stream(), + Optional.empty()) + .map(spanEventsRequest -> new SpanJoinRequest(context, spanEventsRequest)); + } + + private Single>> buildSpanIdsFilter( + Collection spanIds) { + return filterRequestBuilder.build(context, SPAN, Set.of(new SpanIdFilter(spanIds))); + } + + private Single> buildSourceToIdMap( + Collection joinSources, SpanIdGetter spanIdGetter) { + return Observable.fromIterable(joinSources) + .flatMapSingle(source -> this.maybeBuildMapEntry(source, spanIdGetter)) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + } + + private Single> maybeBuildMapEntry( + T source, SpanIdGetter spanIdGetter) { + return spanIdGetter.getSpanId(source).map(id -> Map.entry(source, id)); + } + } + + @Value + @Accessors(fluent = true) + private static class SpanIdFilter implements FilterArgument { + FilterType type = FilterType.ID; + String key = null; + AttributeExpression keyExpression = null; + FilterOperatorType operator = FilterOperatorType.IN; + Collection value; + AttributeScope idType = null; + String idScope = SPAN; + } + + @Value + @Accessors(fluent = true) + private static class SpanJoinRequest implements SpanRequest { + GraphQlRequestContext context; + ResultSetRequest spanEventsRequest; + Collection logEventAttributes = Collections.emptyList(); + boolean fetchTotal = false; + } +} diff --git a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoin.java b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoin.java new file mode 100644 index 00000000..6611267b --- /dev/null +++ b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoin.java @@ -0,0 +1,13 @@ +package org.hypertrace.core.graphql.span.joiner; + +import graphql.annotations.annotationTypes.GraphQLField; +import graphql.annotations.annotationTypes.GraphQLName; +import org.hypertrace.core.graphql.span.schema.Span; + +public interface SpanJoin { + String SPAN_KEY = "span"; + + @GraphQLField + @GraphQLName(SPAN_KEY) + Span span(); +} diff --git a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoiner.java b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoiner.java new file mode 100644 index 00000000..7cfe87b7 --- /dev/null +++ b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoiner.java @@ -0,0 +1,27 @@ +package org.hypertrace.core.graphql.span.joiner; + +import io.reactivex.rxjava3.core.Single; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import org.hypertrace.core.graphql.span.schema.Span; + +public interface SpanJoiner { + + /** A NOOP joiner */ + SpanJoiner NO_OP_JOINER = + new SpanJoiner() { + @Override + public Single> joinSpans( + Collection joinSources, SpanIdGetter spanIdGetter) { + return Single.just(Collections.emptyMap()); + } + }; + + Single> joinSpans(Collection joinSources, SpanIdGetter spanIdGetter); + + @FunctionalInterface + interface SpanIdGetter { + Single getSpanId(T source); + } +} diff --git a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilder.java b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilder.java new file mode 100644 index 00000000..a06ccf45 --- /dev/null +++ b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilder.java @@ -0,0 +1,15 @@ +package org.hypertrace.core.graphql.span.joiner; + +import graphql.schema.DataFetchingFieldSelectionSet; +import io.reactivex.rxjava3.core.Single; +import java.util.List; +import org.hypertrace.core.graphql.common.schema.arguments.TimeRangeArgument; +import org.hypertrace.core.graphql.context.GraphQlRequestContext; + +public interface SpanJoinerBuilder { + Single build( + GraphQlRequestContext context, + TimeRangeArgument timeRange, + DataFetchingFieldSelectionSet selectionSet, + List pathToSpanJoin); +} diff --git a/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerModule.java b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerModule.java new file mode 100644 index 00000000..162eb36b --- /dev/null +++ b/hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerModule.java @@ -0,0 +1,20 @@ +package org.hypertrace.core.graphql.span.joiner; + +import com.google.inject.AbstractModule; +import org.hypertrace.core.graphql.common.request.FilterRequestBuilder; +import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder; +import org.hypertrace.core.graphql.span.dao.SpanDao; +import org.hypertrace.core.graphql.utils.schema.GraphQlSelectionFinder; + +public class SpanJoinerModule extends AbstractModule { + + @Override + protected void configure() { + bind(SpanJoinerBuilder.class).to(DefaultSpanJoinerBuilder.class); + + requireBinding(SpanDao.class); + requireBinding(GraphQlSelectionFinder.class); + requireBinding(ResultSetRequestBuilder.class); + requireBinding(FilterRequestBuilder.class); + } +} diff --git a/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilderTest.java b/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilderTest.java new file mode 100644 index 00000000..d135e32a --- /dev/null +++ b/hypertrace-core-graphql-span-schema/src/test/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilderTest.java @@ -0,0 +1,165 @@ +package org.hypertrace.core.graphql.span.joiner; + +import static java.util.Collections.emptyList; +import static java.util.Map.entry; +import static org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString.SPAN; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import graphql.schema.DataFetchingFieldSelectionSet; +import graphql.schema.SelectedField; +import io.reactivex.rxjava3.core.Single; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; +import lombok.Value; +import lombok.experimental.Accessors; +import org.hypertrace.core.graphql.common.request.AttributeAssociation; +import org.hypertrace.core.graphql.common.request.FilterRequestBuilder; +import org.hypertrace.core.graphql.common.request.ResultSetRequest; +import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder; +import org.hypertrace.core.graphql.common.schema.arguments.TimeRangeArgument; +import org.hypertrace.core.graphql.common.schema.attributes.arguments.AttributeExpression; +import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument; +import org.hypertrace.core.graphql.common.schema.results.arguments.order.OrderArgument; +import org.hypertrace.core.graphql.context.GraphQlRequestContext; +import org.hypertrace.core.graphql.log.event.schema.LogEventResultSet; +import org.hypertrace.core.graphql.span.dao.SpanDao; +import org.hypertrace.core.graphql.span.joiner.SpanJoiner.SpanIdGetter; +import org.hypertrace.core.graphql.span.request.SpanRequest; +import org.hypertrace.core.graphql.span.schema.Span; +import org.hypertrace.core.graphql.span.schema.SpanResultSet; +import org.hypertrace.core.graphql.utils.schema.GraphQlSelectionFinder; +import org.hypertrace.core.graphql.utils.schema.SelectionQuery; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class SpanJoinerBuilderTest { + + private static final String FIRST_SPAN_ID = "spanId1"; + private static final String SECOND_SPAN_ID = "spanId2"; + + @Mock private SpanDao mockSpanDao; + @Mock private GraphQlSelectionFinder mockSelectionFinder; + @Mock private ResultSetRequestBuilder mockResultSetRequestBuilder; + @Mock private FilterRequestBuilder mockFilterRequestBuilder; + @Mock private DataFetchingFieldSelectionSet mockSelectionSet; + @Mock private GraphQlRequestContext mockRequestContext; + @Mock private ResultSetRequest mockResultSetRequest; + @Mock private AttributeAssociation mockFilter; + @Mock private TimeRangeArgument mockTimeRangeArgument; + + private SpanJoinerBuilder spanJoinerBuilder; + + @BeforeEach + void setup() { + spanJoinerBuilder = + new DefaultSpanJoinerBuilder( + mockSpanDao, + mockSelectionFinder, + mockResultSetRequestBuilder, + mockFilterRequestBuilder); + } + + @Test + void fetchSpans() { + Span span1 = new TestSpan(FIRST_SPAN_ID); + Span span2 = new TestSpan(SECOND_SPAN_ID); + TestJoinSource joinSource1 = new TestJoinSource(FIRST_SPAN_ID); + TestJoinSource joinSource2 = new TestJoinSource(SECOND_SPAN_ID); + Map expected = + Map.ofEntries(entry(joinSource1, span1), entry(joinSource2, span2)); + List joinSources = List.of(joinSource1, joinSource2); + mockRequestedSelectionFields( + List.of(mock(SelectedField.class), mock(SelectedField.class)), "pathToSpan"); + mockRequestBuilding(); + mockResult(List.of(span1, span2)); + SpanJoiner joiner = + this.spanJoinerBuilder + .build( + this.mockRequestContext, + this.mockTimeRangeArgument, + this.mockSelectionSet, + List.of("pathToSpan")) + .blockingGet(); + assertEquals( + expected, joiner.joinSpans(joinSources, new TestJoinSourceIdGetter()).blockingGet()); + } + + private void mockRequestBuilding() { + when(mockFilterRequestBuilder.build(eq(mockRequestContext), eq(SPAN), anySet())) + .thenReturn(Single.just(List.of(mockFilter))); + + when(mockResultSetRequestBuilder.build( + eq(mockRequestContext), + eq(SPAN), + eq(2), + eq(0), + eq(mockTimeRangeArgument), + eq(emptyList()), + eq(List.of(mockFilter)), + any(Stream.class), + eq(Optional.empty()))) + .thenReturn(Single.just(mockResultSetRequest)); + } + + private void mockRequestedSelectionFields(List selectedFields, String location) { + when(mockSelectionFinder.findSelections( + mockSelectionSet, + SelectionQuery.builder().selectionPath(List.of(location, "span")).build())) + .thenReturn(selectedFields.stream()); + } + + private void mockResult(List spans) { + when(mockSpanDao.getSpans(any(SpanRequest.class))) + .thenAnswer(invocation -> Single.just(new TestSpanResultSet(spans))); + } + + @Value + private static class TestJoinSource { + String spanId; + } + + private static class TestJoinSourceIdGetter implements SpanIdGetter { + @Override + public Single getSpanId(TestJoinSource source) { + if (source.getSpanId() == null || source.getSpanId().isEmpty()) { + return Single.error(new IllegalArgumentException("Empty spanId")); + } + return Single.just(source.getSpanId()); + } + } + + @Value + @Accessors(fluent = true) + private static class TestSpanResultSet implements SpanResultSet { + List results; + long count = 0; + long total = 0; + } + + @Value + @Accessors(fluent = true) + private static class TestSpan implements Span { + String id; + + @Override + public Object attribute(AttributeExpression expression) { + return null; + } + + @Override + public LogEventResultSet logEvents() { + return null; + } + } +}