Skip to content

Commit

Permalink
Add TraceOptions to api
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Nov 5, 2024
1 parent 2de6df9 commit be78227
Show file tree
Hide file tree
Showing 7 changed files with 742 additions and 0 deletions.
8 changes: 8 additions & 0 deletions stats/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type RPCTagInfo struct {
// FailFast indicates if this RPC is failfast.
// This field is only valid on client side, it's always false on server side.
FailFast bool
// NameResolutionDelay indicates whether there was a delay in name
// resolution.
//
// This field is only valid on client side, it's always false on server side.
NameResolutionDelay bool
// IsTransparentRetry indicates whether the stream is undergoing a
// transparent retry.
IsTransparentRetry bool
}

// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
Expand Down
109 changes: 109 additions & 0 deletions stats/opentelemetry/internal/tracing/custom_map_carrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package tracing implements the OpenTelemetry carrier for context propagation
// in gRPC tracing.
package tracing

import (
"context"

otelpropagation "go.opentelemetry.io/otel/propagation"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)

// GRPCTraceBinHeaderKey is the gRPC metadata header key `grpc-trace-bin` used
// to propagate trace context in binary format.
const GRPCTraceBinHeaderKey = "grpc-trace-bin"

// CustomCarrier is a TextMapCarrier that uses gRPC context to store and
// retrieve any propagated key-value pairs in text format along with binary
// format for `grpc-trace-bin` header
type CustomCarrier struct {
otelpropagation.TextMapCarrier

ctx context.Context
}

// NewCustomCarrier creates a new CustomMapCarrier with
// the given context.
func NewCustomCarrier(ctx context.Context) *CustomCarrier {
return &CustomCarrier{
ctx: ctx,
}
}

// Get returns the string value associated with the passed key from the gRPC
// context. It returns an empty string if the key is not present in the
// context.
func (c *CustomCarrier) Get(key string) string {
md, ok := metadata.FromIncomingContext(c.ctx)
if !ok {
return ""
}
values := md.Get(key)
if len(values) == 0 {
return ""
}
return values[0]
}

// Set stores the key-value pair in string format in the gRPC context.
// If the key already exists, its value will be overwritten.
func (c *CustomCarrier) Set(key, value string) {
md, ok := metadata.FromOutgoingContext(c.ctx)
if !ok {
md = metadata.MD{}
}
md.Set(key, value)
c.ctx = metadata.NewOutgoingContext(c.ctx, md)
}

// GetBinary returns the binary value from the gRPC context in the incoming RPC,
// associated with the header `grpc-trace-bin`.
func (c CustomCarrier) GetBinary() []byte {
values := stats.Trace(c.ctx)
if len(values) == 0 {
return nil
}

return values
}

// SetBinary sets the binary value to the gRPC context, which will be sent in
// the outgoing RPC with the header `grpc-trace-bin`.
func (c *CustomCarrier) SetBinary(value []byte) {
c.ctx = stats.SetTrace(c.ctx, value)
}

// Keys returns the keys stored in the gRPC context for the outgoing RPC.
func (c *CustomCarrier) Keys() []string {
md, _ := metadata.FromOutgoingContext(c.ctx)
keys := make([]string, 0, len(md))
for k := range md {
keys = append(keys, k)
}
return keys
}

// Context returns the underlying *context.Context associated with the
// CustomCarrier.
func (c *CustomCarrier) Context() context.Context {
return c.ctx
}
183 changes: 183 additions & 0 deletions stats/opentelemetry/internal/tracing/custom_map_carrier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* htestp://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package tracing

import (
"context"
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

func (s) TestGet(t *testing.T) {
tests := []struct {
name string
md metadata.MD
key string
want string
}{
{
name: "existing key",
md: metadata.Pairs("key1", "value1"),
key: "key1",
want: "value1",
},
{
name: "non-existing key",
md: metadata.Pairs("key1", "value1"),
key: "key2",
want: "",
},
{
name: "empty key",
md: metadata.MD{},
key: "key1",
want: "",
},
}

for _, test := range tests {
ctx, cancel := context.WithCancel(context.Background())
t.Run(test.name, func(t *testing.T) {
c := NewCustomCarrier(metadata.NewIncomingContext(ctx, test.md))
got := c.Get(test.key)
if got != test.want {
t.Fatalf("got %s, want %s", got, test.want)
}
cancel()
})
}
}

func (s) TestSet(t *testing.T) {
tests := []struct {
name string
initialMD metadata.MD // Metadata to initialize the context with
setKey string // Key to set using c.Set()
setValue string // Value to set using c.Set()
wantKeys []string // Expected keys returned by c.Keys()
}{
{
name: "set new key",
initialMD: metadata.MD{},
setKey: "key1",
setValue: "value1",
wantKeys: []string{"key1"},
},
{
name: "override existing key",
initialMD: metadata.MD{"key1": []string{"oldvalue"}},
setKey: "key1",
setValue: "newvalue",
wantKeys: []string{"key1"},
},
{
name: "set key with existing unrelated key",
initialMD: metadata.MD{"key2": []string{"value2"}},
setKey: "key1",
setValue: "value1",
wantKeys: []string{"key2", "key1"}, // Order matesters here!
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c := NewCustomCarrier(metadata.NewOutgoingContext(ctx, test.initialMD))

c.Set(test.setKey, test.setValue)

gotKeys := c.Keys()
equalKeys := cmp.Equal(test.wantKeys, gotKeys, cmpopts.SortSlices(func(a, b string) bool {
return a < b
}))
if !equalKeys {
t.Fatalf("got keys %v, want %v", gotKeys, test.wantKeys)
}
gotMD, _ := metadata.FromOutgoingContext(c.Context())
if gotMD.Get(test.setKey)[0] != test.setValue {
t.Fatalf("got value %s, want %s, for key %s", gotMD.Get(test.setKey)[0], test.setValue, test.setKey)
}
cancel()
})
}
}

func (s) TestGetBinary(t *testing.T) {
t.Run("get grpc-trace-bin header", func(t *testing.T) {
want := []byte{0x01, 0x02, 0x03}
ctx, cancel := context.WithCancel(context.Background())
c := NewCustomCarrier(stats.SetIncomingTrace(ctx, want))
got := c.GetBinary()
if got == nil {
t.Fatalf("got nil, want %v", got)
}
if string(got) != string(want) {
t.Fatalf("got %s, want %s", got, want)
}
cancel()
})

t.Run("get non grpc-trace-bin header", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c := NewCustomCarrier(metadata.NewIncomingContext(ctx, metadata.Pairs("non-trace-bin", "\x01\x02\x03")))
got := c.GetBinary()
if got != nil {
t.Fatalf("got %v, want nil", got)
}
cancel()
})
}

func (s) TestSetBinary(t *testing.T) {
t.Run("set grpc-trace-bin header", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
want := []byte{0x01, 0x02, 0x03}
c := NewCustomCarrier(stats.SetIncomingTrace(ctx, want))
c.SetBinary(want)
got := stats.OutgoingTrace(c.Context())
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %v, want %v", got, want)
}
cancel()
})

t.Run("set non grpc-trace-bin header", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c := NewCustomCarrier(metadata.NewOutgoingContext(ctx, metadata.MD{"non-trace-bin": []string{"value"}}))
got := stats.OutgoingTrace(c.Context())
if got != nil {
t.Fatalf("got %v, want nil", got)
}
cancel()
})
}
27 changes: 27 additions & 0 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

func init() {
Expand All @@ -51,6 +53,8 @@ var joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.Di
type Options struct {
// MetricsOptions are the metrics options for OpenTelemetry instrumentation.
MetricsOptions MetricsOptions
// TraceOptions are the tracing options for OpenTelemetry instrumentation.
TraceOptions TraceOptions
}

// MetricsOptions are the metrics options for OpenTelemetry instrumentation.
Expand Down Expand Up @@ -84,6 +88,16 @@ type MetricsOptions struct {
pluginOption otelinternal.PluginOption
}

// TraceOptions are the tracing options for OpenTelemetry instrumentation.
type TraceOptions struct {
// TracerProvider provides Tracers that are used by instrumentation code to
// trace computational workflows.
TracerProvider trace.TracerProvider

// TextMapPropagator propagates span context through text map carrier.
TextMapPropagator propagation.TextMapPropagator
}

// DialOption returns a dial option which enables OpenTelemetry instrumentation
// code for a grpc.ClientConn.
//
Expand All @@ -99,6 +113,7 @@ type MetricsOptions struct {
func DialOption(o Options) grpc.DialOption {
csh := &clientStatsHandler{options: o}
csh.initializeMetrics()
csh.initializeTracing()

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / upload

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, -race)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, 386)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, arm64)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.22)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / upload

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, -race, GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, -race)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, 386)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, arm64)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.22)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)

Check failure on line 116 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, -race, GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true)

csh.initializeTracing undefined (type *clientStatsHandler has no field or method initializeTracing)
return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh))
}

Expand All @@ -119,6 +134,7 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g
func ServerOption(o Options) grpc.ServerOption {
ssh := &serverStatsHandler{options: o}
ssh.initializeMetrics()
ssh.initializeTracing()

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / upload

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, -race)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, 386)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, arm64)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.22)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / upload

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, -race, GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, -race)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, 386)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, arm64)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.22)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)

Check failure on line 137 in stats/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / tests (tests, 1.23, -race, GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true)

ssh.initializeTracing undefined (type *serverStatsHandler has no field or method initializeTracing)
return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh))
}

Expand Down Expand Up @@ -165,6 +181,14 @@ func removeLeadingSlash(mn string) string {
return strings.TrimLeft(mn, "/")
}

func isMetricsDisabled(mo MetricsOptions) bool {
return mo.MeterProvider == nil
}

func isTracingDisabled(to TraceOptions) bool {
return to.TracerProvider == nil || to.TextMapPropagator == nil
}

// attemptInfo is RPC information scoped to the RPC attempt life span client
// side, and the RPC life span server side.
type attemptInfo struct {
Expand All @@ -181,6 +205,9 @@ type attemptInfo struct {

pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted
xdsLabels map[string]string

// ti holds the trace span information for this RPC attempt
ti *attemptTraceSpan
}

type clientMetrics struct {
Expand Down
Loading

0 comments on commit be78227

Please sign in to comment.