Skip to content

Commit

Permalink
feat: updating filter result to add decorations (#236)
Browse files Browse the repository at this point in the history
* updating filter result

* updating instrumentations to use header injections

* sorting imports

* fix lint

---------

Co-authored-by: test <[email protected]>
  • Loading branch information
varkey98 and test authored Jul 25, 2024
1 parent 849c958 commit e8225ec
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 27 deletions.
10 changes: 10 additions & 0 deletions sdk/filter/result/filter_result.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
package result

type KeyValueString struct {
Key string
Value string
}

type Decorations struct {
RequestHeaderInjections []KeyValueString
}

type FilterResult struct {
Block bool
ResponseStatusCode int32
ResponseMessage string
Decorations *Decorations
}
15 changes: 10 additions & 5 deletions sdk/instrumentation/google.golang.org/grpc/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ import (
"google.golang.org/grpc/test/bufconn"
)

var _ helloworld.GreeterServer = server{}
// type assertion
var _ helloworld.GreeterServer = (*server)(nil)

type server struct {
err error
replyHeader metadata.MD
replyTrailer metadata.MD
err error
requestHeader metadata.MD
replyHeader metadata.MD
replyTrailer metadata.MD
*helloworld.UnimplementedGreeterServer
}

func (s server) SayHello(ctx context.Context, req *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
func (s *server) SayHello(ctx context.Context, req *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
var reply *helloworld.HelloReply
if md, ok := metadata.FromIncomingContext(ctx); ok {
s.requestHeader = md
}
if s.err == nil {
reply = &helloworld.HelloReply{Message: fmt.Sprintf("Hello %s", req.GetName())}
}
Expand Down
9 changes: 9 additions & 0 deletions sdk/instrumentation/google.golang.org/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
internalconfig "github.com/hypertrace/goagent/sdk/internal/config"
"github.com/hypertrace/goagent/sdk/internal/container"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -109,6 +110,14 @@ func wrapHandler(
filterResult := filter.Evaluate(span)
if filterResult.Block {
return nil, status.Error(StatusCode(int(filterResult.ResponseStatusCode)), StatusText(int(filterResult.ResponseStatusCode)))
} else if filterResult.Decorations != nil {
if md, ok := metadata.FromIncomingContext(ctx); ok {
for _, header := range filterResult.Decorations.RequestHeaderInjections {
md.Append(header.Key, header.Value)
span.SetAttribute("rpc.request.metadata."+header.Key, header.Value)
}
ctx = metadata.NewIncomingContext(ctx, md)
}
}

res, err := delegateHandler(ctx, req)
Expand Down
124 changes: 124 additions & 0 deletions sdk/instrumentation/google.golang.org/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,130 @@ func TestServerInterceptorFilterWithMaxProcessingBodyLen(t *testing.T) {
assert.NoError(t, err)
}

func TestServerInterceptorFilterDecorations(t *testing.T) {
var spans []*mock.Span
mockInterceptor := makeMockUnaryServerInterceptor(&spans)
// wrap interceptor with filter
s := grpc.NewServer(
grpc.UnaryInterceptor(
WrapUnaryServerInterceptor(mockInterceptor, mock.SpanFromContext, &Options{Filter: mock.Filter{
Evaluator: func(span sdk.Span) result.FilterResult {
return result.FilterResult{Block: false, Decorations: &result.Decorations{
RequestHeaderInjections: []result.KeyValueString{
{
Key: "injected-header",
Value: "injected-value",
},
},
}}
},
}}, nil),
),
)
defer s.Stop()

mockServer := &server{}
helloworld.RegisterGreeterServer(s, mockServer)

dialer := createDialer(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("failed to dial bufnet: %v", err)
}
defer conn.Close()

client := helloworld.NewGreeterClient(conn)

ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value"))
_, err = client.SayHello(ctx, &helloworld.HelloRequest{
Name: "Pupo",
})
assert.NoError(t, err)

md := mockServer.requestHeader
// assert original header
val, found := md["test_key"]
assert.True(t, found)
assert.Equal(t, "test_value", val[0])

// assert injected header
val, found = md["injected-header"]
assert.True(t, found)
assert.Equal(t, "injected-value", val[0])
assert.NoError(t, err)

assert.Equal(t, 1, len(spans))
span := spans[0]
spanAttributePresent := false
span.GetAttributes().Iterate(func(key string, value interface{}) bool {
if key == "rpc.request.metadata.injected-header" {
assert.Equal(t, "injected-value", value.(string))
spanAttributePresent = true
return false
}
return true
})
assert.True(t, spanAttributePresent)
}

func TestServerInterceptorFilterEmptyDecorations(t *testing.T) {
spans := []*mock.Span{}
mockInterceptor := makeMockUnaryServerInterceptor(&spans)
// wrap interceptor with filter
s := grpc.NewServer(
grpc.UnaryInterceptor(
WrapUnaryServerInterceptor(mockInterceptor, mock.SpanFromContext, &Options{Filter: mock.Filter{
Evaluator: func(span sdk.Span) result.FilterResult {
return result.FilterResult{Block: false, Decorations: &result.Decorations{}}
},
}}, nil),
),
)
defer s.Stop()

mockServer := &server{}
helloworld.RegisterGreeterServer(s, mockServer)

dialer := createDialer(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("failed to dial bufnet: %v", err)
}
defer conn.Close()

client := helloworld.NewGreeterClient(conn)

ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value"))
_, err = client.SayHello(ctx, &helloworld.HelloRequest{
Name: "Pupo",
})
assert.NoError(t, err)

md := mockServer.requestHeader
// assert original header
val, found := md["test_key"]
assert.True(t, found)
assert.Equal(t, "test_value", val[0])

assert.Equal(t, 1, len(spans))
}

func TestServerHandlerHelloWorldSuccess(t *testing.T) {
defer internalconfig.ResetConfig()

Expand Down
4 changes: 2 additions & 2 deletions sdk/instrumentation/net/http/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestSetScalarAttributeSuccess(t *testing.T) {
h := http.Header{}
h.Set("key_1", "value_1")
span := mock.NewSpan()
SetAttributesFromHeaders("request", headerMapAccessor{h}, span)
SetAttributesFromHeaders("request", &headerMapAccessor{h}, span)
assert.Equal(t, "value_1", span.ReadAttribute("http.request.header.key_1").(string))

_ = span.ReadAttribute("container_id") // needed in containarized envs
Expand All @@ -25,7 +25,7 @@ func TestSetMultivalueAttributeSuccess(t *testing.T) {
h.Add("key_1", "value_2")

span := mock.NewSpan()
SetAttributesFromHeaders("response", headerMapAccessor{h}, span)
SetAttributesFromHeaders("response", &headerMapAccessor{h}, span)

assert.Equal(t, "value_1", span.ReadAttribute("http.response.header.key_1[0]").(string))
assert.Equal(t, "value_2", span.ReadAttribute("http.response.header.key_1[1]").(string))
Expand Down
10 changes: 5 additions & 5 deletions sdk/instrumentation/net/http/contenttype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestRecordingDecisionReturnsFalseOnNoContentType(t *testing.T) {
assert.Equal(t, false, ShouldRecordBodyOfContentType(headerMapAccessor{http.Header{"A": []string{"B"}}}))
assert.Equal(t, false, ShouldRecordBodyOfContentType(&headerMapAccessor{http.Header{"A": []string{"B"}}}))
}

func TestRecordingDecisionSuccessOnHeaderSet(t *testing.T) {
Expand All @@ -32,7 +32,7 @@ func TestRecordingDecisionSuccessOnHeaderSet(t *testing.T) {
for _, tCase := range tCases {
h := http.Header{}
h.Set("Content-Type", tCase.contentType)
assert.Equal(t, tCase.shouldRecord, ShouldRecordBodyOfContentType(headerMapAccessor{h}))
assert.Equal(t, tCase.shouldRecord, ShouldRecordBodyOfContentType(&headerMapAccessor{h}))
}
}

Expand All @@ -56,7 +56,7 @@ func TestRecordingDecisionSuccessOnHeaderAdd(t *testing.T) {
for _, header := range tCase.contentTypes {
h.Add("Content-Type", header)
}
assert.Equal(t, tCase.shouldRecord, ShouldRecordBodyOfContentType(headerMapAccessor{h}))
assert.Equal(t, tCase.shouldRecord, ShouldRecordBodyOfContentType(&headerMapAccessor{h}))
}
}

Expand All @@ -80,7 +80,7 @@ func TestXMLRecordingDecisionSuccessOnHeaderAdd(t *testing.T) {
for _, header := range tCase.contentTypes {
h.Add("Content-Type", header)
}
assert.Equal(t, tCase.shouldRecord, ShouldRecordBodyOfContentType(headerMapAccessor{h}))
assert.Equal(t, tCase.shouldRecord, ShouldRecordBodyOfContentType(&headerMapAccessor{h}))
}
internalconfig.ResetConfig()
}
Expand All @@ -99,6 +99,6 @@ func TestHasMultiPartFormDataContentTypeHeader(t *testing.T) {
for _, tCase := range tCases {
h := http.Header{}
h.Set("Content-Type", tCase.contentType)
assert.Equal(t, tCase.isMultiPartFormData, HasMultiPartFormDataContentTypeHeader(headerMapAccessor{h}))
assert.Equal(t, tCase.isMultiPartFormData, HasMultiPartFormDataContentTypeHeader(&headerMapAccessor{h}))
}
}
19 changes: 13 additions & 6 deletions sdk/instrumentation/net/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func WrapHandler(delegate http.Handler, spanFromContext sdk.SpanFromContext, opt
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
span := h.spanFromContextRetriever(ctx)
headersAccessor := NewHeaderMapAccessor(r.Header)

h.mh.AddToRequestCount(1, r)

Expand Down Expand Up @@ -77,19 +78,19 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Sets an attribute per each request header.
if h.dataCaptureConfig.HttpHeaders.Request.Value {
SetAttributesFromHeaders("request", NewHeaderMapAccessor(r.Header), span)
SetAttributesFromHeaders("request", headersAccessor, span)
}

// nil check for body is important as this block turns the body into another
// object that isn't nil and that will leverage the "Observer effect".
if r.Body != nil && h.dataCaptureConfig.HttpBody.Request.Value && ShouldRecordBodyOfContentType(headerMapAccessor{r.Header}) {
if r.Body != nil && h.dataCaptureConfig.HttpBody.Request.Value && ShouldRecordBodyOfContentType(headersAccessor) {
body, err := io.ReadAll(r.Body)
if err != nil {
return
}
defer r.Body.Close()

isMultipartFormDataBody := HasMultiPartFormDataContentTypeHeader(headerMapAccessor{r.Header})
isMultipartFormDataBody := HasMultiPartFormDataContentTypeHeader(headersAccessor)

// Only records the body if it is not empty and the content type
// header is not streamable
Expand All @@ -106,23 +107,29 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if filterResult.Block {
w.WriteHeader(int(filterResult.ResponseStatusCode))
return
} else if filterResult.Decorations != nil {
for _, header := range filterResult.Decorations.RequestHeaderInjections {
headersAccessor.AddHeader(header.Key, header.Value)
span.SetAttribute("http.request.header."+header.Key, header.Value)
}
}

// create http.ResponseWriter interceptor for tracking status code
wi := &rwInterceptor{w: w, statusCode: 200}

// tag found status code on exit
defer func() {
responseHeadersAccessor := NewHeaderMapAccessor(wi.Header())
if h.dataCaptureConfig.HttpBody.Response.Value &&
len(wi.body) > 0 &&
ShouldRecordBodyOfContentType(headerMapAccessor{wi.Header()}) {
ShouldRecordBodyOfContentType(responseHeadersAccessor) {
setTruncatedBodyAttribute("response", wi.body, int(h.dataCaptureConfig.BodyMaxSizeBytes.Value), span,
HasMultiPartFormDataContentTypeHeader(headerMapAccessor{wi.Header()}))
HasMultiPartFormDataContentTypeHeader(responseHeadersAccessor))
}

if h.dataCaptureConfig.HttpHeaders.Response.Value {
// Sets an attribute per each response header.
SetAttributesFromHeaders("response", headerMapAccessor{wi.Header()}, span)
SetAttributesFromHeaders("response", responseHeadersAccessor, span)
}
}()

Expand Down
Loading

0 comments on commit e8225ec

Please sign in to comment.