Skip to content

Commit

Permalink
final rebase with master
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Dec 9, 2024
1 parent d7286fb commit 4ae0d6b
Show file tree
Hide file tree
Showing 8 changed files with 943 additions and 86 deletions.
11 changes: 8 additions & 3 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
// function.
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
nameResolutionDelayed: false,
}

cc.retryThrottler.Store((*retryThrottler)(nil))
Expand Down Expand Up @@ -604,6 +605,10 @@ type ClientConn struct {
idlenessMgr *idle.Manager
metricsRecorderList *stats.MetricsRecorderList

// To track if there was a delay in name resolution, helping to track
// latency issues in gRPC connection setup.
nameResolutionDelayed bool

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
csMgr *connectivityStateManager
Expand Down
5 changes: 5 additions & 0 deletions stats/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type RPCTagInfo struct {
// FailFast indicates if this RPC is failfast.
// This field is only valid on client side, it's always false on server side.
FailFast bool
// NameResolutionDelay indicates whether there was a delay in name
// resolution.
//
// This field is only valid on client side, it's always false on server side.
NameResolutionDelay bool
}

// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
Expand Down
96 changes: 77 additions & 19 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package opentelemetry

import (
"context"
"strings"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
Expand All @@ -33,6 +38,7 @@ import (
)

type clientStatsHandler struct {
statsHandler
estats.MetricsRecorder
options Options
clientMetrics clientMetrics
Expand Down Expand Up @@ -68,6 +74,15 @@ func (h *clientStatsHandler) initializeMetrics() {
rm.registerMetrics(metrics, meter)
}

func (h *clientStatsHandler) initializeTracing() {
if isTracingDisabled(h.options.TraceOptions) {
return
}

otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator)
otel.SetTracerProvider(h.options.TraceOptions.TracerProvider)
}

func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ci := &callInfo{
target: cc.CanonicalTarget(),
Expand All @@ -85,8 +100,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
}

startTime := time.Now()
var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
return err
}

Expand Down Expand Up @@ -119,22 +138,50 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
}

startTime := time.Now()

var span *trace.Span
if !isTracingDisabled(h.options.TraceOptions) {
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) {
h.perCallMetrics(ctx, err, startTime, ci)
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
}
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...)
}

func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
// perCallTracesAndMetrics records per call trace spans and metrics.
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts *trace.Span) {
if !isTracingDisabled(h.options.TraceOptions) && ts != nil {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
(*ts).SetStatus(otelcodes.Ok, s.Message())
} else {
(*ts).SetStatus(otelcodes.Error, s.Message())
}
(*ts).End()
}
if !isMetricsDisabled(h.options.MetricsOptions) {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target),
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
}
}

// createCallTraceSpan creates a call span to put in the provided context using
// provided TraceProvider. If TraceProvider is nil, it returns context as is.
func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, *trace.Span) {
if h.options.TraceOptions.TracerProvider == nil {
logger.Error("TraceProvider is not provided in trace options")
return ctx, nil
}
mn := strings.Replace(removeLeadingSlash(method), "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient))
return ctx, &span
}

// TagConn exists to satisfy stats.Handler.
Expand Down Expand Up @@ -163,15 +210,21 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
}
ctx = istats.SetLabels(ctx, labels)
}
ai := &attemptInfo{ // populates information about RPC start.
startTime: time.Now(),
xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName,
ai := &attemptInfo{}
startTime := time.Now()
if !isTracingDisabled(h.options.TraceOptions) {
callSpan := trace.SpanFromContext(ctx)
if info.NameResolutionDelay {
callSpan.AddEvent("Delayed name resolution complete")
}
ctx, ai = h.traceTagRPC(trace.ContextWithSpan(ctx, callSpan), info)
}
ri := &rpcInfo{
ai.startTime = startTime
ai.xdsLabels = labels.TelemetryLabels
ai.method = info.FullMethodName
return setRPCInfo(ctx, &rpcInfo{
ai: ai,
}
return setRPCInfo(ctx, ri)
})
}

func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
Expand All @@ -180,7 +233,12 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return
}
h.processRPCEvent(ctx, rs, ri.ai)
if !isMetricsDisabled(h.options.MetricsOptions) {
h.processRPCEvent(ctx, rs, ri.ai)
}
if !isTracingDisabled(h.options.TraceOptions) {
h.populateSpan(ctx, rs, ri.ai)
}
}

func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
Expand Down
Loading

0 comments on commit 4ae0d6b

Please sign in to comment.