From be78227799272be77ebe4dc22ab20b17e7a46357 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 5 Nov 2024 17:06:08 +0530 Subject: [PATCH 1/2] Add TraceOptions to api --- stats/handlers.go | 8 + .../internal/tracing/custom_map_carrier.go | 109 +++++++++++ .../tracing/custom_map_carrier_test.go | 183 ++++++++++++++++++ stats/opentelemetry/opentelemetry.go | 27 +++ stats/opentelemetry/trace.go | 125 ++++++++++++ .../tracing/grpc_trace_bin_propagator.go | 140 ++++++++++++++ .../tracing/grpc_trace_bin_propagator_test.go | 150 ++++++++++++++ 7 files changed, 742 insertions(+) create mode 100644 stats/opentelemetry/internal/tracing/custom_map_carrier.go create mode 100644 stats/opentelemetry/internal/tracing/custom_map_carrier_test.go create mode 100644 stats/opentelemetry/trace.go create mode 100644 stats/opentelemetry/tracing/grpc_trace_bin_propagator.go create mode 100644 stats/opentelemetry/tracing/grpc_trace_bin_propagator_test.go diff --git a/stats/handlers.go b/stats/handlers.go index dc03731e45ef..154d81c9d47b 100644 --- a/stats/handlers.go +++ b/stats/handlers.go @@ -38,6 +38,14 @@ type RPCTagInfo struct { // FailFast indicates if this RPC is failfast. // This field is only valid on client side, it's always false on server side. FailFast bool + // NameResolutionDelay indicates whether there was a delay in name + // resolution. + // + // This field is only valid on client side, it's always false on server side. + NameResolutionDelay bool + // IsTransparentRetry indicates whether the stream is undergoing a + // transparent retry. + IsTransparentRetry bool } // Handler defines the interface for the related stats handling (e.g., RPCs, connections). diff --git a/stats/opentelemetry/internal/tracing/custom_map_carrier.go b/stats/opentelemetry/internal/tracing/custom_map_carrier.go new file mode 100644 index 000000000000..65a4a0762875 --- /dev/null +++ b/stats/opentelemetry/internal/tracing/custom_map_carrier.go @@ -0,0 +1,109 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package tracing implements the OpenTelemetry carrier for context propagation +// in gRPC tracing. +package tracing + +import ( + "context" + + otelpropagation "go.opentelemetry.io/otel/propagation" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" +) + +// GRPCTraceBinHeaderKey is the gRPC metadata header key `grpc-trace-bin` used +// to propagate trace context in binary format. +const GRPCTraceBinHeaderKey = "grpc-trace-bin" + +// CustomCarrier is a TextMapCarrier that uses gRPC context to store and +// retrieve any propagated key-value pairs in text format along with binary +// format for `grpc-trace-bin` header +type CustomCarrier struct { + otelpropagation.TextMapCarrier + + ctx context.Context +} + +// NewCustomCarrier creates a new CustomMapCarrier with +// the given context. +func NewCustomCarrier(ctx context.Context) *CustomCarrier { + return &CustomCarrier{ + ctx: ctx, + } +} + +// Get returns the string value associated with the passed key from the gRPC +// context. It returns an empty string if the key is not present in the +// context. +func (c *CustomCarrier) Get(key string) string { + md, ok := metadata.FromIncomingContext(c.ctx) + if !ok { + return "" + } + values := md.Get(key) + if len(values) == 0 { + return "" + } + return values[0] +} + +// Set stores the key-value pair in string format in the gRPC context. +// If the key already exists, its value will be overwritten. +func (c *CustomCarrier) Set(key, value string) { + md, ok := metadata.FromOutgoingContext(c.ctx) + if !ok { + md = metadata.MD{} + } + md.Set(key, value) + c.ctx = metadata.NewOutgoingContext(c.ctx, md) +} + +// GetBinary returns the binary value from the gRPC context in the incoming RPC, +// associated with the header `grpc-trace-bin`. +func (c CustomCarrier) GetBinary() []byte { + values := stats.Trace(c.ctx) + if len(values) == 0 { + return nil + } + + return values +} + +// SetBinary sets the binary value to the gRPC context, which will be sent in +// the outgoing RPC with the header `grpc-trace-bin`. +func (c *CustomCarrier) SetBinary(value []byte) { + c.ctx = stats.SetTrace(c.ctx, value) +} + +// Keys returns the keys stored in the gRPC context for the outgoing RPC. +func (c *CustomCarrier) Keys() []string { + md, _ := metadata.FromOutgoingContext(c.ctx) + keys := make([]string, 0, len(md)) + for k := range md { + keys = append(keys, k) + } + return keys +} + +// Context returns the underlying *context.Context associated with the +// CustomCarrier. +func (c *CustomCarrier) Context() context.Context { + return c.ctx +} diff --git a/stats/opentelemetry/internal/tracing/custom_map_carrier_test.go b/stats/opentelemetry/internal/tracing/custom_map_carrier_test.go new file mode 100644 index 000000000000..11cd1400abd2 --- /dev/null +++ b/stats/opentelemetry/internal/tracing/custom_map_carrier_test.go @@ -0,0 +1,183 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * htestp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tracing + +import ( + "context" + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestGet(t *testing.T) { + tests := []struct { + name string + md metadata.MD + key string + want string + }{ + { + name: "existing key", + md: metadata.Pairs("key1", "value1"), + key: "key1", + want: "value1", + }, + { + name: "non-existing key", + md: metadata.Pairs("key1", "value1"), + key: "key2", + want: "", + }, + { + name: "empty key", + md: metadata.MD{}, + key: "key1", + want: "", + }, + } + + for _, test := range tests { + ctx, cancel := context.WithCancel(context.Background()) + t.Run(test.name, func(t *testing.T) { + c := NewCustomCarrier(metadata.NewIncomingContext(ctx, test.md)) + got := c.Get(test.key) + if got != test.want { + t.Fatalf("got %s, want %s", got, test.want) + } + cancel() + }) + } +} + +func (s) TestSet(t *testing.T) { + tests := []struct { + name string + initialMD metadata.MD // Metadata to initialize the context with + setKey string // Key to set using c.Set() + setValue string // Value to set using c.Set() + wantKeys []string // Expected keys returned by c.Keys() + }{ + { + name: "set new key", + initialMD: metadata.MD{}, + setKey: "key1", + setValue: "value1", + wantKeys: []string{"key1"}, + }, + { + name: "override existing key", + initialMD: metadata.MD{"key1": []string{"oldvalue"}}, + setKey: "key1", + setValue: "newvalue", + wantKeys: []string{"key1"}, + }, + { + name: "set key with existing unrelated key", + initialMD: metadata.MD{"key2": []string{"value2"}}, + setKey: "key1", + setValue: "value1", + wantKeys: []string{"key2", "key1"}, // Order matesters here! + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + c := NewCustomCarrier(metadata.NewOutgoingContext(ctx, test.initialMD)) + + c.Set(test.setKey, test.setValue) + + gotKeys := c.Keys() + equalKeys := cmp.Equal(test.wantKeys, gotKeys, cmpopts.SortSlices(func(a, b string) bool { + return a < b + })) + if !equalKeys { + t.Fatalf("got keys %v, want %v", gotKeys, test.wantKeys) + } + gotMD, _ := metadata.FromOutgoingContext(c.Context()) + if gotMD.Get(test.setKey)[0] != test.setValue { + t.Fatalf("got value %s, want %s, for key %s", gotMD.Get(test.setKey)[0], test.setValue, test.setKey) + } + cancel() + }) + } +} + +func (s) TestGetBinary(t *testing.T) { + t.Run("get grpc-trace-bin header", func(t *testing.T) { + want := []byte{0x01, 0x02, 0x03} + ctx, cancel := context.WithCancel(context.Background()) + c := NewCustomCarrier(stats.SetIncomingTrace(ctx, want)) + got := c.GetBinary() + if got == nil { + t.Fatalf("got nil, want %v", got) + } + if string(got) != string(want) { + t.Fatalf("got %s, want %s", got, want) + } + cancel() + }) + + t.Run("get non grpc-trace-bin header", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + c := NewCustomCarrier(metadata.NewIncomingContext(ctx, metadata.Pairs("non-trace-bin", "\x01\x02\x03"))) + got := c.GetBinary() + if got != nil { + t.Fatalf("got %v, want nil", got) + } + cancel() + }) +} + +func (s) TestSetBinary(t *testing.T) { + t.Run("set grpc-trace-bin header", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + want := []byte{0x01, 0x02, 0x03} + c := NewCustomCarrier(stats.SetIncomingTrace(ctx, want)) + c.SetBinary(want) + got := stats.OutgoingTrace(c.Context()) + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %v, want %v", got, want) + } + cancel() + }) + + t.Run("set non grpc-trace-bin header", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + c := NewCustomCarrier(metadata.NewOutgoingContext(ctx, metadata.MD{"non-trace-bin": []string{"value"}})) + got := stats.OutgoingTrace(c.Context()) + if got != nil { + t.Fatalf("got %v, want nil", got) + } + cancel() + }) +} diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index cc5ad387fb4c..8e29b9f33ae1 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -33,6 +33,8 @@ import ( otelattribute "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" ) func init() { @@ -51,6 +53,8 @@ var joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.Di type Options struct { // MetricsOptions are the metrics options for OpenTelemetry instrumentation. MetricsOptions MetricsOptions + // TraceOptions are the tracing options for OpenTelemetry instrumentation. + TraceOptions TraceOptions } // MetricsOptions are the metrics options for OpenTelemetry instrumentation. @@ -84,6 +88,16 @@ type MetricsOptions struct { pluginOption otelinternal.PluginOption } +// TraceOptions are the tracing options for OpenTelemetry instrumentation. +type TraceOptions struct { + // TracerProvider provides Tracers that are used by instrumentation code to + // trace computational workflows. + TracerProvider trace.TracerProvider + + // TextMapPropagator propagates span context through text map carrier. + TextMapPropagator propagation.TextMapPropagator +} + // DialOption returns a dial option which enables OpenTelemetry instrumentation // code for a grpc.ClientConn. // @@ -99,6 +113,7 @@ type MetricsOptions struct { func DialOption(o Options) grpc.DialOption { csh := &clientStatsHandler{options: o} csh.initializeMetrics() + csh.initializeTracing() return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh)) } @@ -119,6 +134,7 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g func ServerOption(o Options) grpc.ServerOption { ssh := &serverStatsHandler{options: o} ssh.initializeMetrics() + ssh.initializeTracing() return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh)) } @@ -165,6 +181,14 @@ func removeLeadingSlash(mn string) string { return strings.TrimLeft(mn, "/") } +func isMetricsDisabled(mo MetricsOptions) bool { + return mo.MeterProvider == nil +} + +func isTracingDisabled(to TraceOptions) bool { + return to.TracerProvider == nil || to.TextMapPropagator == nil +} + // attemptInfo is RPC information scoped to the RPC attempt life span client // side, and the RPC life span server side. type attemptInfo struct { @@ -181,6 +205,9 @@ type attemptInfo struct { pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted xdsLabels map[string]string + + // ti holds the trace span information for this RPC attempt + ti *attemptTraceSpan } type clientMetrics struct { diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go new file mode 100644 index 000000000000..29a3a0f5868a --- /dev/null +++ b/stats/opentelemetry/trace.go @@ -0,0 +1,125 @@ +package opentelemetry + +import ( + "context" + "strings" + "sync/atomic" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/stats" + otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" + "google.golang.org/grpc/status" +) + +// attemptTraceSpan is data used for recording traces. It holds a reference to the +// current span, message counters for sent and received messages (used for +// generating message IDs), and the number of previous RPC attempts for the +// associated call. +type attemptTraceSpan struct { + span trace.Span + countSentMsg uint32 + countRecvMsg uint32 + previousRpcAttempts uint32 +} + +// traceTagRPC populates context with a new span, and serializes information +// about this span into gRPC Metadata. +func (csh *clientStatsHandler) traceTagRPC(ctx context.Context, rti *stats.RPCTagInfo) (context.Context, *attemptTraceSpan) { + if csh.options.TraceOptions.TextMapPropagator == nil { + return ctx, nil + } + + mn := "Attempt." + strings.Replace(removeLeadingSlash(rti.FullMethodName), "/", ".", -1) + tracer := otel.Tracer("grpc-open-telemetry") + ctx, span := tracer.Start(ctx, mn) + + carrier := otelinternaltracing.NewCustomCarrier(ctx) // Use internal custom carrier to inject + otel.GetTextMapPropagator().Inject(ctx, carrier) + + return carrier.Context(), &attemptTraceSpan{ + span: span, + countSentMsg: 0, // msg events scoped to scope of context, per attempt client side + countRecvMsg: 0, + } +} + +// traceTagRPC populates context with new span data, with a parent based on the +// spanContext deserialized from context passed in (wire data in gRPC metadata) +// if present. +func (ssh *serverStatsHandler) traceTagRPC(ctx context.Context, rti *stats.RPCTagInfo) (context.Context, *attemptTraceSpan) { + if ssh.options.TraceOptions.TextMapPropagator == nil { + return ctx, nil + } + + mn := strings.Replace(removeLeadingSlash(rti.FullMethodName), "/", ".", -1) + var span trace.Span + tracer := otel.Tracer("grpc-open-telemetry") + ctx = otel.GetTextMapPropagator().Extract(ctx, otelinternaltracing.NewCustomCarrier(ctx)) + // If the context.Context provided in `ctx` to tracer.Start(), contains a + // Span then the newly-created Span will be a child of that span, + // otherwise it will be a root span. + ctx, span = tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindServer)) + return ctx, &attemptTraceSpan{ + span: span, + countSentMsg: 0, + countRecvMsg: 0, + } +} + +// populateSpan populates span information based on stats passed in, representing +// invariants of the RPC lifecycle. It ends the span, triggering its export. +// This function handles attempt spans on the client-side and call spans on the +// server-side. +func populateSpan(_ context.Context, rs stats.RPCStats, ti *attemptTraceSpan) { + if ti == nil || ti.span == nil { + // Shouldn't happen, tagRPC call comes before this function gets called + // which populates this information. + logger.Error("ctx passed into stats handler tracing event handling has no span present") + return + } + span := ti.span + + switch rs := rs.(type) { + case *stats.Begin: + // Note: Go always added Client and FailFast attributes even though they are not + // defined by the OpenCensus gRPC spec. Thus, they are unimportant for + // correctness. + span.SetAttributes( + attribute.Bool("Client", rs.Client), + attribute.Bool("FailFast", rs.Client), + attribute.Int64("previous-rpc-attempts", int64(ti.previousRpcAttempts)), + attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt), + ) + // increment previous rpc attempts applicable for next attempt + atomic.AddUint32(&ti.previousRpcAttempts, 1) + case *stats.PickerUpdated: + span.AddEvent("Delayed LB pick complete") + case *stats.InPayload: + // message id - "must be calculated as two different counters starting + // from one for sent messages and one for received messages." + mi := atomic.AddUint32(&ti.countRecvMsg, 1) + span.AddEvent("Inbound compressed message", trace.WithAttributes( + attribute.Int64("sequence-number", int64(mi)), + attribute.Int64("message-size", int64(rs.Length)), + attribute.Int64("message-size-compressed", int64(rs.CompressedLength)), + )) + case *stats.OutPayload: + mi := atomic.AddUint32(&ti.countSentMsg, 1) + span.AddEvent("Outbound compressed message", trace.WithAttributes( + attribute.Int64("sequence-number", int64(mi)), + attribute.Int64("message-size", int64(rs.Length)), + attribute.Int64("message-size-compressed", int64(rs.CompressedLength)), + )) + case *stats.End: + if rs.Error != nil { + s := status.Convert(rs.Error) + span.SetStatus(otelcodes.Error, s.Message()) + } else { + span.SetStatus(otelcodes.Ok, "Ok") + } + span.End() + } +} diff --git a/stats/opentelemetry/tracing/grpc_trace_bin_propagator.go b/stats/opentelemetry/tracing/grpc_trace_bin_propagator.go new file mode 100644 index 000000000000..5bcce7a839d0 --- /dev/null +++ b/stats/opentelemetry/tracing/grpc_trace_bin_propagator.go @@ -0,0 +1,140 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tracing + +import ( + "context" + "encoding/base64" + + otelpropagation "go.opentelemetry.io/otel/propagation" + oteltrace "go.opentelemetry.io/otel/trace" + itracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" +) + +// TODO: Move out of internal as part of open telemetry API + +// GRPCTraceBinPropagator is an OpenTelemetry TextMapPropagator which is used +// to extract and inject trace context data from and into messages exchanged by +// gRPC applications. It propagates trace data in binary format using the +// 'grpc-trace-bin' header. +type GRPCTraceBinPropagator struct{} + +// Inject sets OpenTelemetry trace context information from the Context into +// the carrier. +// +// If the carrier is a CustomCarrier, trace data is directly injected in a +// binary format using the 'grpc-trace-bin' header (fast path). Otherwise, +// the trace data is base64 encoded and injected using the same header in +// text format (slow path). +func (p GRPCTraceBinPropagator) Inject(ctx context.Context, carrier otelpropagation.TextMapCarrier) { + span := oteltrace.SpanFromContext(ctx) + if !span.SpanContext().IsValid() { + return + } + + bd := Binary(span.SpanContext()) + if bd == nil { + return + } + + if cc, ok := carrier.(*itracing.CustomCarrier); ok { + cc.SetBinary(bd) + return + } + carrier.Set(itracing.GRPCTraceBinHeaderKey, base64.StdEncoding.EncodeToString(bd)) +} + +// Extract reads OpenTelemetry trace context information from the carrier into a +// Context. +// +// If the carrier is a CustomCarrier, trace data is read directly in a binary +// format from the 'grpc-trace-bin' header (fast path). Otherwise, the trace +// data is base64 decoded from the same header in text format (slow path). +func (p GRPCTraceBinPropagator) Extract(ctx context.Context, carrier otelpropagation.TextMapCarrier) context.Context { + var bd []byte + + if cc, ok := carrier.(*itracing.CustomCarrier); ok { + bd = cc.GetBinary() + } else { + bd, _ = base64.StdEncoding.DecodeString(carrier.Get(itracing.GRPCTraceBinHeaderKey)) + } + if bd == nil { + return ctx + } + + sc, ok := FromBinary([]byte(bd)) + if !ok { + return ctx + } + + return oteltrace.ContextWithRemoteSpanContext(ctx, sc) +} + +// Fields returns the keys whose values are set with Inject. +// +// GRPCTraceBinPropagator always returns a slice containing only +// `grpc-trace-bin` key because it only sets the 'grpc-trace-bin' header for +// propagating trace context. +func (p GRPCTraceBinPropagator) Fields() []string { + return []string{itracing.GRPCTraceBinHeaderKey} +} + +// Binary returns the binary format representation of a SpanContext. +// +// If sc is the zero value, returns nil. +func Binary(sc oteltrace.SpanContext) []byte { + if sc.Equal(oteltrace.SpanContext{}) { + return nil + } + var b [29]byte + traceID := oteltrace.TraceID(sc.TraceID()) + copy(b[2:18], traceID[:]) + b[18] = 1 + spanID := oteltrace.SpanID(sc.SpanID()) + copy(b[19:27], spanID[:]) + b[27] = 2 + b[28] = uint8(oteltrace.TraceFlags(sc.TraceFlags())) + return b[:] +} + +// FromBinary returns the SpanContext represented by b. +// +// If b has an unsupported version ID or contains no TraceID, FromBinary +// returns with zero value SpanContext and false. +func FromBinary(b []byte) (oteltrace.SpanContext, bool) { + if len(b) == 0 || b[0] != 0 { + return oteltrace.SpanContext{}, false + } + b = b[1:] + if len(b) < 17 || b[0] != 0 { + return oteltrace.SpanContext{}, false + } + + sc := oteltrace.SpanContext{} + sc = sc.WithTraceID(oteltrace.TraceID(b[1:17])) + b = b[17:] + if len(b) >= 9 && b[0] == 1 { + sc = sc.WithSpanID(oteltrace.SpanID(b[1:9])) + b = b[9:] + } + if len(b) >= 2 && b[0] == 2 { + sc = sc.WithTraceFlags(oteltrace.TraceFlags(b[1])) + } + return sc, true +} diff --git a/stats/opentelemetry/tracing/grpc_trace_bin_propagator_test.go b/stats/opentelemetry/tracing/grpc_trace_bin_propagator_test.go new file mode 100644 index 000000000000..3b5521531f25 --- /dev/null +++ b/stats/opentelemetry/tracing/grpc_trace_bin_propagator_test.go @@ -0,0 +1,150 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tracing + +import ( + "context" + "encoding/base64" + "testing" + + otelpropagation "go.opentelemetry.io/otel/propagation" + oteltrace "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" + itracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" +) + +// TODO: Move out of internal as part of open telemetry API + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// TestInject_FastPath verifies that the GRPCTraceBinPropagator correctly +// injects OpenTelemetry trace context data using the CustomCarrier. +// +// It is called the fast path because it injects the trace context directly in +// binary format. +func (s) TestInject_FastPath(t *testing.T) { + p := GRPCTraceBinPropagator{} + sc := oteltrace.NewSpanContext(oteltrace.SpanContextConfig{ + TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + SpanID: [8]byte{17, 18, 19, 20, 21, 22, 23, 24}, + TraceFlags: oteltrace.FlagsSampled, + }) + tCtx, tCancel := context.WithCancel(context.Background()) + tCtx = oteltrace.ContextWithSpanContext(tCtx, sc) + defer tCancel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := itracing.NewCustomCarrier(metadata.NewOutgoingContext(ctx, metadata.MD{})) + p.Inject(tCtx, c) + + got := stats.OutgoingTrace(c.Context()) + want := Binary(sc) + if string(got) != string(want) { + t.Fatalf("got = %v, want %v", got, want) + } +} + +// TestInject_SlowPath verifies that the GRPCTraceBinPropagator correctly +// injects OpenTelemetry trace context data using any other text based carrier. +// +// It is called the slow path because it base64 encodes the binary trace +// context before injecting it. +func (s) TestInject_SlowPath(t *testing.T) { + p := GRPCTraceBinPropagator{} + sc := oteltrace.NewSpanContext(oteltrace.SpanContextConfig{ + TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + SpanID: [8]byte{17, 18, 19, 20, 21, 22, 23, 24}, + TraceFlags: oteltrace.FlagsSampled, + }) + tCtx, tCancel := context.WithCancel(context.Background()) + tCtx = oteltrace.ContextWithSpanContext(tCtx, sc) + defer tCancel() + + c := otelpropagation.MapCarrier{} + p.Inject(tCtx, c) + + got := c.Get(itracing.GRPCTraceBinHeaderKey) + want := base64.StdEncoding.EncodeToString(Binary(sc)) + if got != want { + t.Fatalf("got = %v, want %v", got, want) + } +} + +// TestExtract_FastPath verifies that the GRPCTraceBinPropagator correctly +// extracts OpenTelemetry trace context data using the CustomCarrier. +// +// It is called the fast path because it extracts the trace context directly +// in the binary format. +func (s) TestExtract_FastPath(t *testing.T) { + p := GRPCTraceBinPropagator{} + sc := oteltrace.NewSpanContext(oteltrace.SpanContextConfig{ + TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + SpanID: [8]byte{17, 18, 19, 20, 21, 22, 23, 24}, + TraceFlags: oteltrace.FlagsSampled, + Remote: true, + }) + bd := Binary(sc) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := itracing.NewCustomCarrier(stats.SetIncomingTrace(ctx, bd)) + tCtx := p.Extract(ctx, c) + got := oteltrace.SpanContextFromContext(tCtx) + + if !got.Equal(sc) { + t.Fatalf("got = %v, want %v", got, sc) + } +} + +// TestExtract_SlowPath verifies that the GRPCTraceBinPropagator correctly +// extracts OpenTelemetry trace context data using any other text based carrier. +// +// It is called the slow path because it base64 decodes the binary trace +// context before extracting it. +func (s) TestExtract_SlowPath(t *testing.T) { + p := GRPCTraceBinPropagator{} + sc := oteltrace.NewSpanContext(oteltrace.SpanContextConfig{ + TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + SpanID: [8]byte{17, 18, 19, 20, 21, 22, 23, 24}, + TraceFlags: oteltrace.FlagsSampled, + Remote: true, + }) + bd := Binary(sc) + + c := otelpropagation.MapCarrier{ + itracing.GRPCTraceBinHeaderKey: base64.StdEncoding.EncodeToString(bd), + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tCtx := p.Extract(ctx, c) + got := oteltrace.SpanContextFromContext(tCtx) + + if !got.Equal(sc) { + t.Fatalf("got = %v, want %v", got, sc) + } +} From 4f9d7b5bc028c05b9539a32f61c534c85bf750c2 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 5 Nov 2024 17:43:23 +0530 Subject: [PATCH 2/2] Add initializeTracing --- stats/opentelemetry/client_metrics.go | 10 ++++++++++ stats/opentelemetry/server_metrics.go | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 4af7f933c8ba..3701b3240c34 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -18,6 +18,7 @@ package opentelemetry import ( "context" + "go.opentelemetry.io/otel" "sync/atomic" "time" @@ -68,6 +69,15 @@ func (h *clientStatsHandler) initializeMetrics() { rm.registerMetrics(metrics, meter) } +func (h *clientStatsHandler) initializeTracing() { + if h.options.TraceOptions.TracerProvider == nil || h.options.TraceOptions.TextMapPropagator == nil { + return + } + + otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator) + otel.SetTracerProvider(h.options.TraceOptions.TracerProvider) +} + func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ci := &callInfo{ target: cc.CanonicalTarget(), diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index eaea559b2c10..87f172ebd40b 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -18,6 +18,7 @@ package opentelemetry import ( "context" + "go.opentelemetry.io/otel" "sync/atomic" "time" @@ -66,6 +67,15 @@ func (h *serverStatsHandler) initializeMetrics() { rm.registerMetrics(metrics, meter) } +func (h *serverStatsHandler) initializeTracing() { + if !isTracingDisabled(h.options.TraceOptions) { + return + } + + otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator) + otel.SetTracerProvider(h.options.TraceOptions.TracerProvider) +} + // attachLabelsTransportStream intercepts SetHeader and SendHeader calls of the // underlying ServerTransportStream to attach metadataExchangeLabels. type attachLabelsTransportStream struct {