Skip to content

Commit

Permalink
feat: adding resource support (#16)
Browse files Browse the repository at this point in the history
* feat: adding resource support

* feat: support for building structured trace with resources

* docs: add comment on resource index
  • Loading branch information
aaron-steinfeld authored Mar 17, 2021
1 parent ea09b26 commit 655fee2
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 10 deletions.
3 changes: 3 additions & 0 deletions data-model/src/main/avro/Event.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ protocol EventProtocol {
// Event may be associated with multiple entities
array<string> entity_id_list = [];

// The index of the associated resource in the structured trace resource list, or -1 if none
int resource_index = -1;

// Raw attributes corresponding to this event that came from the original tracer.
union { null, Attributes } attributes = null;

Expand Down
3 changes: 3 additions & 0 deletions data-model/src/main/avro/RawSpan.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ protocol RawSpanProtocol {
import idl "Attributes.avdl";
import idl "Entity.avdl";
import idl "Event.avdl";
import idl "Resource.avdl";

record RawSpan {
string customer_id;
Expand All @@ -12,6 +13,8 @@ protocol RawSpanProtocol {

array<Entity> entity_list = [];

union { null, Resource } resource = null;

Event event;

long received_time_millis = 0;
Expand Down
8 changes: 8 additions & 0 deletions data-model/src/main/avro/Resource.avdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
@namespace("org.hypertrace.core.datamodel")
protocol ResourceProtocol {
import idl "Attributes.avdl";

record Resource {
Attributes attributes;
}
}
3 changes: 3 additions & 0 deletions data-model/src/main/avro/StructuredTrace.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ protocol StructuredTraceProtocol {
import idl "Entity.avdl";
import idl "Event.avdl";
import idl "Timestamps.avdl";
import idl "Resource.avdl";

enum EdgeType {
EVENT_EVENT,
Expand Down Expand Up @@ -54,6 +55,8 @@ protocol StructuredTraceProtocol {
// For e.g. Service (Entity) generates Span (Event)
array<Entity> entity_list;

array<Resource> resource_list = [];

// Each event_node represents a Span
array<Event> event_list;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.hypertrace.core.datamodel.shared.trace;

import static java.util.Objects.nonNull;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -23,6 +26,7 @@
import org.hypertrace.core.datamodel.MetricValue;
import org.hypertrace.core.datamodel.Metrics;
import org.hypertrace.core.datamodel.RawSpan;
import org.hypertrace.core.datamodel.Resource;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.datamodel.StructuredTrace.Builder;
import org.hypertrace.core.datamodel.Timestamps;
Expand All @@ -42,6 +46,7 @@ public class StructuredTraceBuilder {

private final Map<String, Entity> entityMap;
private final Map<ByteBuffer, Event> eventMap;
private final List<Resource> resourceList;
private ArrayList<ByteBuffer> orderedEventNodes;
private ArrayList<String> orderedEntityNodes;

Expand All @@ -59,16 +64,11 @@ public StructuredTraceBuilder(
List<Event> eventList,
Map<String, Entity> entityMap,
String customerId,
ByteBuffer traceId) {
this(eventList, entityMap, customerId, traceId, null);
}

public StructuredTraceBuilder(
List<Event> eventList,
Map<String, Entity> entityMap,
String customerId,
ByteBuffer traceId, Timestamps timestamps) {
ByteBuffer traceId,
Timestamps timestamps,
List<Resource> resourceList) {
this.eventList = eventList;
this.resourceList = resourceList;
this.customerId = customerId;
this.traceId = traceId;
this.entityMap = Map.copyOf(entityMap);
Expand Down Expand Up @@ -191,6 +191,7 @@ private StructuredTrace build() {
builder.setEventEdgeList(new ArrayList<>());
builder.setEntityEdgeList(new ArrayList<>());
builder.setEntityEventEdgeList(new ArrayList<>());
builder.setResourceList(this.resourceList);

//Node Builders
//Initialize EVENT NODE
Expand Down Expand Up @@ -420,10 +421,21 @@ public static StructuredTrace buildStructuredTraceFromRawSpans(List<RawSpan> raw
String customerId,
Timestamps timestamps) {
Map<String, Entity> entityMap = new HashMap<>();
// Relying on insertion ordered keyset
LinkedHashMap<Resource, Integer> resourceIndexMap = new LinkedHashMap<>();
List<Event> eventList = new ArrayList<>();
for (RawSpan rawSpan : rawSpanList) {
eventList.add(rawSpan.getEvent());
List<Entity> entitiesList = rawSpan.getEntityList();

if (nonNull(rawSpan.getResource())) {
// If a resource exists on span, record the resource and its index
rawSpan
.getEvent()
.setResourceIndex(
resourceIndexMap.computeIfAbsent(
rawSpan.getResource(), unused -> resourceIndexMap.size()));
}
for (Entity entity : entitiesList) {
if (!entityMap.containsKey(entity.getEntityId())) {
entityMap.put(entity.getEntityId(), entity);
Expand All @@ -440,7 +452,8 @@ public static StructuredTrace buildStructuredTraceFromRawSpans(List<RawSpan> raw
entityMap,
customerId,
traceId,
timestamps);
timestamps,
List.copyOf(resourceIndexMap.keySet()));

return structuredTraceBuilder.buildStructuredTrace();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.hypertrace.core.datamodel.shared.trace;

import static java.util.Objects.nonNull;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.hypertrace.core.datamodel.Attributes;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.RawSpan;
import org.hypertrace.core.datamodel.Resource;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.junit.jupiter.api.Test;

class StructuredTraceBuilderTest {
private static final String CUSTOMER_ID = "customer-id";
private static final AtomicInteger CREATION_COUNTER = new AtomicInteger(0);
private static final ByteBuffer TRACE_ID = ByteBuffer.wrap("trace-id".getBytes());

@Test
void correctlyDematerializesResourcesFromRawSpans() {
Resource firstHostResource = buildResource(Map.of("host", "first-host"));
Resource duplicateFirstHostResource = buildResource(Map.of("host", "first-host"));
Resource secondHostResource = buildResource(Map.of("host", "second-host"));
Resource firstHostExtendedResource =
buildResource(Map.of("host", "first-host", "other-key", "other-value"));

List<RawSpan> rawSpanList =
List.of(
this.buildRawSpanHoldingResource("first", firstHostResource),
this.buildRawSpanHoldingResource("second", secondHostResource),
this.buildRawSpanHoldingResource("third", duplicateFirstHostResource),
this.buildRawSpanHoldingResource("fourth", null),
this.buildRawSpanHoldingResource("fifth", firstHostExtendedResource));

StructuredTrace trace =
StructuredTraceBuilder.buildStructuredTraceFromRawSpans(rawSpanList, TRACE_ID, CUSTOMER_ID);

assertEquals(
List.of(firstHostResource, secondHostResource, firstHostExtendedResource),
trace.getResourceList());

assertEventMatches(trace.getEventList().get(0), "first", 0);
assertEventMatches(trace.getEventList().get(1), "second", 1);
assertEventMatches(trace.getEventList().get(2), "third", 0);
assertEventMatches(trace.getEventList().get(3), "fourth", -1);
assertEventMatches(trace.getEventList().get(4), "fifth", 2);
}

private RawSpan buildRawSpanHoldingResource(String id, @Nullable Resource resource) {
RawSpan.Builder builder =
RawSpan.newBuilder()
.setCustomerId(CUSTOMER_ID)
.setTraceId(TRACE_ID)
.setEvent(
Event.newBuilder()
.setEventId(ByteBuffer.wrap(id.getBytes()))
.setStartTimeMillis(
CREATION_COUNTER.getAndIncrement()) // Used for predictable ordering
.setCustomerId(CUSTOMER_ID)
.build());

if (nonNull(resource)) {
builder.setResource(resource);
}
return builder.build();
}

private void assertEventMatches(Event event, String id, int resourceIndex) {
assertEquals(id, Charset.defaultCharset().decode(event.getEventId()).toString());
assertEquals(resourceIndex, event.getResourceIndex());
}

private Resource buildResource(Map<String, String> resourceMap) {
return resourceMap.entrySet().stream()
.collect(
Collectors.collectingAndThen(
Collectors.toMap(
Entry::getKey, entry -> AttributeValueCreator.create(entry.getValue())),
map ->
Resource.newBuilder()
.setAttributes(Attributes.newBuilder().setAttributeMap(map).build())
.build()));
}
}

0 comments on commit 655fee2

Please sign in to comment.