Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/opentelemetry: Introduce Tracing API #7852

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
// function.
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
nameResolutionDelayed: false,
}

cc.retryThrottler.Store((*retryThrottler)(nil))
Expand Down Expand Up @@ -604,6 +605,10 @@ type ClientConn struct {
idlenessMgr *idle.Manager
metricsRecorderList *stats.MetricsRecorderList

// To track if there was a delay in name resolution, helping to track
// latency issues in gRPC connection setup.
nameResolutionDelayed bool

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
csMgr *connectivityStateManager
Expand Down
5 changes: 5 additions & 0 deletions stats/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type RPCTagInfo struct {
// FailFast indicates if this RPC is failfast.
// This field is only valid on client side, it's always false on server side.
FailFast bool
// NameResolutionDelay indicates whether there was a delay in name
// resolution.
//
// This field is only valid on client side, it's always false on server side.
NameResolutionDelay bool
}

// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
Expand Down
98 changes: 82 additions & 16 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package opentelemetry

import (
"context"
"strings"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
Expand All @@ -33,6 +38,7 @@ import (
)

type clientStatsHandler struct {
statsHandler
estats.MetricsRecorder
options Options
clientMetrics clientMetrics
Expand Down Expand Up @@ -68,6 +74,15 @@ func (h *clientStatsHandler) initializeMetrics() {
rm.registerMetrics(metrics, meter)
}

func (h *clientStatsHandler) initializeTracing() {
if isTracingDisabled(h.options.TraceOptions) {
return
}

otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator)
otel.SetTracerProvider(h.options.TraceOptions.TracerProvider)
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: cc.CanonicalTarget(),
Expand All @@ -85,8 +100,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
}

startTime := time.Now()
var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we pass the method to perCallTracesAndMetrics and create the trace span there if tracing is not disabled? it is because we are already checking traces disable in perCallTracesAndMetrics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have create call span here as we need to add event for "name resolution delay", and this info is only available in RPCTagInfo. One alternative way is to have a struct for nameResolutionDelay as a key of context metadata, but I don't think it'd be good idea to do that.

Based on offline discussion, going ahead with earlier approach.

ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
return err
}

Expand Down Expand Up @@ -119,22 +138,50 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
}

startTime := time.Now()

var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same suggestion here

ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts *trace.Span) {
if !isTracingDisabled(h.options.TraceOptions) && ts != nil {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
(*ts).SetStatus(otelcodes.Ok, s.Message())
} else {
(*ts).SetStatus(otelcodes.Error, s.Message())
}
(*ts).End()
}
if !isMetricsDisabled(h.options.MetricsOptions) {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
}
}

// createCallTraceSpan creates a call span to put in the provided context using
// provided TraceProvider. If TraceProvider is nil, it returns context as is.
func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, *trace.Span) {
if h.options.TraceOptions.TracerProvider == nil {
logger.Error("TraceProvider is not provided in trace options")
return ctx, nil
}
mn := strings.Replace(removeLeadingSlash(method), "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient))
return ctx, &span
}

// TagConn exists to satisfy stats.Handler.
Expand Down Expand Up @@ -163,15 +210,29 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{ // populates information about RPC start.
ai := &attemptInfo{
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
}
ri := &rpcInfo{
ai: ai,
if !isTracingDisabled(h.options.TraceOptions) {
callSpan := trace.SpanFromContext(ctx)
if info.NameResolutionDelay {
callSpan.AddEvent("Delayed name resolution complete")
}
var newAI *attemptInfo
ctx, newAI = h.traceTagRPC(trace.ContextWithSpan(ctx, callSpan), info)
// Update the ai with values from updated attempt info.
newAI.startTime = ai.startTime
newAI.xdsLabels = ai.xdsLabels
newAI.method = ai.method

ai = newAI
}
return setRPCInfo(ctx, ri)

return setRPCInfo(ctx, &rpcInfo{
ai: ai,
})
}

func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
Expand All @@ -180,7 +241,12 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
h.processRPCEvent(ctx, rs, ri.ai)
if !isMetricsDisabled(h.options.MetricsOptions) {
h.processRPCEvent(ctx, rs, ri.ai)
}
if !isTracingDisabled(h.options.TraceOptions) {
h.populateSpan(ctx, rs, ri.ai)
}
}

func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
Expand Down
Loading
Loading