Skip to content

Commit

Permalink
Rebase with Purnesh's PR
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Nov 17, 2024
1 parent 7b855de commit 18b1e12
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 328 deletions.
2 changes: 1 addition & 1 deletion examples/features/opentelemetry/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {
defer cc.Close()
c := echo.NewEchoClient(cc)

// Make an RPC every second. This should trigger telemetry on prometheous
// Make an RPC every second. This should trigger telemetry on prometheus
// server along with traces in the in memory exporter to be emitted from
// the client and the server.
for {
Expand Down
35 changes: 24 additions & 11 deletions stats/opentelemetry/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package opentelemetry
import (
"context"
"fmt"
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
"io"
"testing"
"time"

otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"

"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
trace2 "go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -606,9 +607,6 @@ func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.Manual
// goes through a stable OpenTelemetry API with well-defined behavior. This keeps
// the robustness of assertions over time.
type spanInformation struct {
// SpanContext either gets pulled off the wire in certain cases server side
// or created.
sc trace2.SpanContext
spanKind string
name string
events []trace.Event
Expand All @@ -629,10 +627,13 @@ func (s) TestClientCallSpanEvents(t *testing.T) {
ss := setupStubServer(t, nil, traceOptions)
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Create a parent span for the client call
ctx, _ := otel.Tracer("grpc-open-telemetry").Start(context.Background(), "test-parent-span")
ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span")
md, _ := metadata.FromOutgoingContext(ctx)
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewCustomCarrier(ctx))
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx))
ctx = metadata.NewOutgoingContext(ctx, md)

// Make a unary RPC
Expand Down Expand Up @@ -682,10 +683,14 @@ func (s) TestServerWithMetricsAndTraceOptions(t *testing.T) {

ss := setupStubServer(t, mo, to)
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Create a parent span for the client call
ctx, _ := otel.Tracer("grpc-open-telemetry").Start(context.Background(), "test-parent-span")
ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span")
md, _ := metadata.FromOutgoingContext(ctx)
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewCustomCarrier(ctx))
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx))
ctx = metadata.NewOutgoingContext(ctx, md)

// Make two RPC's, a unary RPC and a streaming RPC. These should cause
Expand Down Expand Up @@ -748,10 +753,13 @@ func (s) TestGrpcTraceBinPropagator(t *testing.T) {
ss := setupStubServer(t, nil, traceOptions)
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Create a parent span for the client call
ctx, _ := otel.Tracer("grpc-open-telemetry").Start(context.Background(), "test-parent-span")
ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span")
md, _ := metadata.FromOutgoingContext(ctx)
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewCustomCarrier(ctx))
otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewIncomingCarrier(ctx))
ctx = metadata.NewOutgoingContext(ctx, md)
// Make a unary RPC
if _, err := ss.Client.UnaryCall(
Expand Down Expand Up @@ -923,7 +931,12 @@ func (s) TestW3CContextPropagator(t *testing.T) {
// Start the server with OpenTelemetry options
ss := setupStubServer(t, nil, traceOptions)
defer ss.Stop()
ctx, _ := otel.Tracer("grpc-open-telemetry").Start(context.Background(), "test-parent-span")

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Create a parent span for the client call
ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span")
md, _ := metadata.FromOutgoingContext(ctx)
ctx = metadata.NewOutgoingContext(ctx, md)
// Make two RPC's, a unary RPC and a streaming RPC. These should cause
Expand Down
111 changes: 45 additions & 66 deletions stats/opentelemetry/grpc_trace_bin_propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,85 +20,70 @@ package opentelemetry

import (
"context"
"encoding/base64"

otelpropagation "go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
itracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)

// TODO: Move out of internal as part of open telemetry API
// gRPCTraceBinHeaderKey is the gRPC metadata header key `grpc-trace-bin` used
// to propagate trace context in binary format.
const grpcTraceBinHeaderKey = "grpc-trace-bin"

// GRPCTraceBinPropagator is an OpenTelemetry TextMapPropagator which is used
// to extract and inject trace context data from and into messages exchanged by
// to extract and inject trace context data from and into headers exchanged by
// gRPC applications. It propagates trace data in binary format using the
// 'grpc-trace-bin' header.
// `grpc-trace-bin` header.
type GRPCTraceBinPropagator struct{}

// Inject sets OpenTelemetry trace context information from the Context into
// the carrier.
// Inject sets OpenTelemetry span context from the Context into the carrier as
// a `grpc-trace-bin` header if span context is valid.
//
// If the carrier is a CustomCarrier, trace data is directly injected in a
// binary format using the 'grpc-trace-bin' header (fast path). Otherwise,
// the trace data is base64 encoded and injected using the same header in
// text format (slow path).
func (p GRPCTraceBinPropagator) Inject(ctx context.Context, carrier otelpropagation.TextMapCarrier) {
span := oteltrace.SpanFromContext(ctx)
if !span.SpanContext().IsValid() {
// If span context is not valid, it ruturns without setting `grpc-trace-bin`
// header.
func (GRPCTraceBinPropagator) Inject(ctx context.Context, carrier otelpropagation.TextMapCarrier) {
sc := oteltrace.SpanFromContext(ctx)
if !sc.SpanContext().IsValid() {
return
}

bd := Binary(span.SpanContext())
if bd == nil {
return
}

if cc, ok := carrier.(*itracing.CustomCarrier); ok {
cc.SetBinary(bd)
return
}
carrier.Set(itracing.GRPCTraceBinHeaderKey, base64.StdEncoding.EncodeToString(bd))
bd := toBinary(sc.SpanContext())
carrier.Set(grpcTraceBinHeaderKey, string(bd))
}

// Extract reads OpenTelemetry trace context information from the carrier into a
// Context.
// Extract reads OpenTelemetry span context from the `grpc-trace-bin` header of
// carrier into the provided context, if present.
//
// If the carrier is a CustomCarrier, trace data is read directly in a binary
// format from the 'grpc-trace-bin' header (fast path). Otherwise, the trace
// data is base64 decoded from the same header in text format (slow path).
func (p GRPCTraceBinPropagator) Extract(ctx context.Context, carrier otelpropagation.TextMapCarrier) context.Context {
var bd []byte

if cc, ok := carrier.(*itracing.CustomCarrier); ok {
bd = cc.GetBinary()
} else {
bd, _ = base64.StdEncoding.DecodeString(carrier.Get(itracing.GRPCTraceBinHeaderKey))
}
if bd == nil {
// If a valid span context is retrieved from `grpc-trace-bin`, it returns a new
// context containing the extracted OpenTelemetry span context marked as
// remote.
//
// If `grpc-trace-bin` header is not present, it returns the context as is.
func (GRPCTraceBinPropagator) Extract(ctx context.Context, carrier otelpropagation.TextMapCarrier) context.Context {
h := carrier.Get(grpcTraceBinHeaderKey)
if h == "" {
return ctx
}

sc, ok := FromBinary([]byte(bd))
sc, ok := fromBinary([]byte(h))
if !ok {
return ctx
}

return oteltrace.ContextWithRemoteSpanContext(ctx, sc)
}

// Fields returns the keys whose values are set with Inject.
//
// GRPCTraceBinPropagator always returns a slice containing only
// `grpc-trace-bin` key because it only sets the 'grpc-trace-bin' header for
// `grpc-trace-bin` key because it only sets the `grpc-trace-bin` header for
// propagating trace context.
func (p GRPCTraceBinPropagator) Fields() []string {
return []string{itracing.GRPCTraceBinHeaderKey}
func (GRPCTraceBinPropagator) Fields() []string {
return []string{grpcTraceBinHeaderKey}
}

// Binary returns the binary format representation of a SpanContext.
// toBinary returns the binary format representation of a SpanContext.
//
// If sc is the zero value, returns nil.
func Binary(sc oteltrace.SpanContext) []byte {
func toBinary(sc oteltrace.SpanContext) []byte {
if sc.Equal(oteltrace.SpanContext{}) {
return nil
}
Expand All @@ -109,32 +94,26 @@ func Binary(sc oteltrace.SpanContext) []byte {
spanID := oteltrace.SpanID(sc.SpanID())
copy(b[19:27], spanID[:])
b[27] = 2
b[28] = uint8(oteltrace.TraceFlags(sc.TraceFlags()))
b[28] = byte(oteltrace.TraceFlags(sc.TraceFlags()))
return b[:]
}

// FromBinary returns the SpanContext represented by b.
// fromBinary returns the SpanContext represented by b with Remote set to true.
//
// If b has an unsupported version ID or contains no TraceID, FromBinary
// returns with zero value SpanContext and false.
func FromBinary(b []byte) (oteltrace.SpanContext, bool) {
if len(b) == 0 || b[0] != 0 {
return oteltrace.SpanContext{}, false
}
b = b[1:]
if len(b) < 17 || b[0] != 0 {
// It returns with zero value SpanContext and false, if any of the
// below condition is not satisfied:
// - Valid header: len(b) = 29
// - Valid version: b[0] = 0
// - Valid traceID prefixed with 0: b[1] = 0
// - Valid spanID prefixed with 1: b[18] = 1
// - Valid traceFlags prefixed with 2: b[27] = 2
func fromBinary(b []byte) (oteltrace.SpanContext, bool) {
if len(b) != 29 || b[0] != 0 || b[1] != 0 || b[18] != 1 || b[27] != 2 {
return oteltrace.SpanContext{}, false
}

sc := oteltrace.SpanContext{}
sc = sc.WithTraceID(oteltrace.TraceID(b[1:17]))
b = b[17:]
if len(b) >= 9 && b[0] == 1 {
sc = sc.WithSpanID(oteltrace.SpanID(b[1:9]))
b = b[9:]
}
if len(b) >= 2 && b[0] == 2 {
sc = sc.WithTraceFlags(oteltrace.TraceFlags(b[1]))
}
return sc, true
return oteltrace.SpanContext{}.WithTraceID(
oteltrace.TraceID(b[2:18])).WithSpanID(
oteltrace.SpanID(b[19:27])).WithTraceFlags(
oteltrace.TraceFlags(b[28])).WithRemote(true), true
}
Loading

0 comments on commit 18b1e12

Please sign in to comment.