-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add SpanJoiner based on spanId (#176)
- Loading branch information
1 parent
3548c83
commit 4df2934
Showing
7 changed files
with
422 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
180 changes: 180 additions & 0 deletions
180
...chema/src/main/java/org/hypertrace/core/graphql/span/joiner/DefaultSpanJoinerBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
package org.hypertrace.core.graphql.span.joiner; | ||
|
||
import static com.google.common.collect.ImmutableList.copyOf; | ||
import static com.google.common.collect.Iterables.concat; | ||
import static org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString.SPAN; | ||
import static org.hypertrace.core.graphql.span.joiner.SpanJoin.SPAN_KEY; | ||
|
||
import graphql.schema.DataFetchingFieldSelectionSet; | ||
import graphql.schema.SelectedField; | ||
import io.reactivex.rxjava3.core.Observable; | ||
import io.reactivex.rxjava3.core.Single; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Map.Entry; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import javax.inject.Inject; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Value; | ||
import lombok.experimental.Accessors; | ||
import org.hypertrace.core.graphql.common.request.AttributeAssociation; | ||
import org.hypertrace.core.graphql.common.request.AttributeRequest; | ||
import org.hypertrace.core.graphql.common.request.FilterRequestBuilder; | ||
import org.hypertrace.core.graphql.common.request.ResultSetRequest; | ||
import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder; | ||
import org.hypertrace.core.graphql.common.schema.arguments.TimeRangeArgument; | ||
import org.hypertrace.core.graphql.common.schema.attributes.AttributeScope; | ||
import org.hypertrace.core.graphql.common.schema.attributes.arguments.AttributeExpression; | ||
import org.hypertrace.core.graphql.common.schema.id.Identifiable; | ||
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument; | ||
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterOperatorType; | ||
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterType; | ||
import org.hypertrace.core.graphql.common.schema.results.arguments.order.OrderArgument; | ||
import org.hypertrace.core.graphql.context.GraphQlRequestContext; | ||
import org.hypertrace.core.graphql.span.dao.SpanDao; | ||
import org.hypertrace.core.graphql.span.request.SpanRequest; | ||
import org.hypertrace.core.graphql.span.schema.Span; | ||
import org.hypertrace.core.graphql.span.schema.SpanResultSet; | ||
import org.hypertrace.core.graphql.utils.schema.GraphQlSelectionFinder; | ||
import org.hypertrace.core.graphql.utils.schema.SelectionQuery; | ||
|
||
public class DefaultSpanJoinerBuilder implements SpanJoinerBuilder { | ||
|
||
private static final int ZERO_OFFSET = 0; | ||
|
||
private final SpanDao spanDao; | ||
private final GraphQlSelectionFinder selectionFinder; | ||
private final ResultSetRequestBuilder resultSetRequestBuilder; | ||
private final FilterRequestBuilder filterRequestBuilder; | ||
|
||
@Inject | ||
DefaultSpanJoinerBuilder( | ||
SpanDao spanDao, | ||
GraphQlSelectionFinder selectionFinder, | ||
ResultSetRequestBuilder resultSetRequestBuilder, | ||
FilterRequestBuilder filterRequestBuilder) { | ||
this.spanDao = spanDao; | ||
this.selectionFinder = selectionFinder; | ||
this.resultSetRequestBuilder = resultSetRequestBuilder; | ||
this.filterRequestBuilder = filterRequestBuilder; | ||
} | ||
|
||
@Override | ||
public Single<SpanJoiner> build( | ||
GraphQlRequestContext context, | ||
TimeRangeArgument timeRange, | ||
DataFetchingFieldSelectionSet selectionSet, | ||
List<String> pathToSpanJoin) { | ||
return Single.just( | ||
new DefaultSpanJoiner( | ||
context, timeRange, this.getSelections(selectionSet, pathToSpanJoin))); | ||
} | ||
|
||
private List<SelectedField> getSelections( | ||
DataFetchingFieldSelectionSet selectionSet, List<String> pathToSpanJoin) { | ||
List<String> fullPath = copyOf(concat(pathToSpanJoin, List.of(SPAN_KEY))); | ||
return selectionFinder | ||
.findSelections(selectionSet, SelectionQuery.builder().selectionPath(fullPath).build()) | ||
.collect(Collectors.toUnmodifiableList()); | ||
} | ||
|
||
@AllArgsConstructor | ||
private class DefaultSpanJoiner implements SpanJoiner { | ||
|
||
private final GraphQlRequestContext context; | ||
private final TimeRangeArgument timeRange; | ||
private final List<SelectedField> selectedFields; | ||
|
||
@Override | ||
public <T> Single<Map<T, Span>> joinSpans( | ||
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) { | ||
return this.buildSourceToIdMap(joinSources, spanIdGetter).flatMap(this::joinSpans); | ||
} | ||
|
||
private <T> Single<Map<T, Span>> joinSpans(Map<T, String> sourceToSpanIdMap) { | ||
return this.buildSpanRequest(sourceToSpanIdMap) | ||
.flatMap(spanDao::getSpans) | ||
.map(this::buildSpanIdToSpanMap) | ||
.map(spanIdToSpanMap -> buildSourceToSpanMap(sourceToSpanIdMap, spanIdToSpanMap)); | ||
} | ||
|
||
private <T> Map<T, Span> buildSourceToSpanMap( | ||
Map<T, String> sourceToSpanIdMap, Map<String, Span> spanIdToSpanMap) { | ||
return sourceToSpanIdMap.entrySet().stream() | ||
.filter(entry -> spanIdToSpanMap.containsKey(entry.getValue())) | ||
.collect( | ||
Collectors.toUnmodifiableMap( | ||
Entry::getKey, entry -> spanIdToSpanMap.get(entry.getValue()))); | ||
} | ||
|
||
private Map<String, Span> buildSpanIdToSpanMap(SpanResultSet resultSet) { | ||
return resultSet.results().stream() | ||
.collect(Collectors.toUnmodifiableMap(Identifiable::id, Function.identity())); | ||
} | ||
|
||
private <T> Single<SpanRequest> buildSpanRequest(Map<T, String> sourceToSpanIdMap) { | ||
Collection<String> spanIds = sourceToSpanIdMap.values(); | ||
return buildSpanIdsFilter(spanIds) | ||
.flatMap(filterArguments -> buildSpanRequest(spanIds.size(), filterArguments)); | ||
} | ||
|
||
private Single<SpanRequest> buildSpanRequest( | ||
int size, List<AttributeAssociation<FilterArgument>> filterArguments) { | ||
return resultSetRequestBuilder | ||
.build( | ||
context, | ||
SPAN, | ||
size, | ||
ZERO_OFFSET, | ||
timeRange, | ||
Collections.emptyList(), | ||
filterArguments, | ||
selectedFields.stream(), | ||
Optional.empty()) | ||
.map(spanEventsRequest -> new SpanJoinRequest(context, spanEventsRequest)); | ||
} | ||
|
||
private Single<List<AttributeAssociation<FilterArgument>>> buildSpanIdsFilter( | ||
Collection<String> spanIds) { | ||
return filterRequestBuilder.build(context, SPAN, Set.of(new SpanIdFilter(spanIds))); | ||
} | ||
|
||
private <T> Single<Map<T, String>> buildSourceToIdMap( | ||
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) { | ||
return Observable.fromIterable(joinSources) | ||
.flatMapSingle(source -> this.maybeBuildMapEntry(source, spanIdGetter)) | ||
.collect(Collectors.toMap(Entry::getKey, Entry::getValue)); | ||
} | ||
|
||
private <T> Single<Entry<T, String>> maybeBuildMapEntry( | ||
T source, SpanIdGetter<T> spanIdGetter) { | ||
return spanIdGetter.getSpanId(source).map(id -> Map.entry(source, id)); | ||
} | ||
} | ||
|
||
@Value | ||
@Accessors(fluent = true) | ||
private static class SpanIdFilter implements FilterArgument { | ||
FilterType type = FilterType.ID; | ||
String key = null; | ||
AttributeExpression keyExpression = null; | ||
FilterOperatorType operator = FilterOperatorType.IN; | ||
Collection<String> value; | ||
AttributeScope idType = null; | ||
String idScope = SPAN; | ||
} | ||
|
||
@Value | ||
@Accessors(fluent = true) | ||
private static class SpanJoinRequest implements SpanRequest { | ||
GraphQlRequestContext context; | ||
ResultSetRequest<OrderArgument> spanEventsRequest; | ||
Collection<AttributeRequest> logEventAttributes = Collections.emptyList(); | ||
boolean fetchTotal = false; | ||
} | ||
} |
13 changes: 13 additions & 0 deletions
13
...e-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoin.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package org.hypertrace.core.graphql.span.joiner; | ||
|
||
import graphql.annotations.annotationTypes.GraphQLField; | ||
import graphql.annotations.annotationTypes.GraphQLName; | ||
import org.hypertrace.core.graphql.span.schema.Span; | ||
|
||
public interface SpanJoin { | ||
String SPAN_KEY = "span"; | ||
|
||
@GraphQLField | ||
@GraphQLName(SPAN_KEY) | ||
Span span(); | ||
} |
27 changes: 27 additions & 0 deletions
27
...graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoiner.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package org.hypertrace.core.graphql.span.joiner; | ||
|
||
import io.reactivex.rxjava3.core.Single; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
import org.hypertrace.core.graphql.span.schema.Span; | ||
|
||
public interface SpanJoiner { | ||
|
||
/** A NOOP joiner */ | ||
SpanJoiner NO_OP_JOINER = | ||
new SpanJoiner() { | ||
@Override | ||
public <T> Single<Map<T, Span>> joinSpans( | ||
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) { | ||
return Single.just(Collections.emptyMap()); | ||
} | ||
}; | ||
|
||
<T> Single<Map<T, Span>> joinSpans(Collection<T> joinSources, SpanIdGetter<T> spanIdGetter); | ||
|
||
@FunctionalInterface | ||
interface SpanIdGetter<T> { | ||
Single<String> getSpanId(T source); | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
...-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package org.hypertrace.core.graphql.span.joiner; | ||
|
||
import graphql.schema.DataFetchingFieldSelectionSet; | ||
import io.reactivex.rxjava3.core.Single; | ||
import java.util.List; | ||
import org.hypertrace.core.graphql.common.schema.arguments.TimeRangeArgument; | ||
import org.hypertrace.core.graphql.context.GraphQlRequestContext; | ||
|
||
public interface SpanJoinerBuilder { | ||
Single<SpanJoiner> build( | ||
GraphQlRequestContext context, | ||
TimeRangeArgument timeRange, | ||
DataFetchingFieldSelectionSet selectionSet, | ||
List<String> pathToSpanJoin); | ||
} |
20 changes: 20 additions & 0 deletions
20
...l-span-schema/src/main/java/org/hypertrace/core/graphql/span/joiner/SpanJoinerModule.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package org.hypertrace.core.graphql.span.joiner; | ||
|
||
import com.google.inject.AbstractModule; | ||
import org.hypertrace.core.graphql.common.request.FilterRequestBuilder; | ||
import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder; | ||
import org.hypertrace.core.graphql.span.dao.SpanDao; | ||
import org.hypertrace.core.graphql.utils.schema.GraphQlSelectionFinder; | ||
|
||
public class SpanJoinerModule extends AbstractModule { | ||
|
||
@Override | ||
protected void configure() { | ||
bind(SpanJoinerBuilder.class).to(DefaultSpanJoinerBuilder.class); | ||
|
||
requireBinding(SpanDao.class); | ||
requireBinding(GraphQlSelectionFinder.class); | ||
requireBinding(ResultSetRequestBuilder.class); | ||
requireBinding(FilterRequestBuilder.class); | ||
} | ||
} |
Oops, something went wrong.