From 4ae0d6bd340bb4b4ceb84a62d92513185a372147 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Mon, 9 Dec 2024 08:57:26 +0530 Subject: [PATCH] final rebase with master --- clientconn.go | 11 +- stats/handlers.go | 5 + stats/opentelemetry/client_metrics.go | 96 +++- stats/opentelemetry/e2e_test.go | 673 ++++++++++++++++++++++++-- stats/opentelemetry/opentelemetry.go | 33 ++ stats/opentelemetry/server_metrics.go | 32 +- stats/opentelemetry/trace.go | 140 ++++++ stream.go | 39 +- 8 files changed, 943 insertions(+), 86 deletions(-) create mode 100644 stats/opentelemetry/trace.go diff --git a/clientconn.go b/clientconn.go index 4f57b55434f9..5f326647e11f 100644 --- a/clientconn.go +++ b/clientconn.go @@ -130,9 +130,10 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires // function. func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ - target: target, - conns: make(map[*addrConn]struct{}), - dopts: defaultDialOptions(), + target: target, + conns: make(map[*addrConn]struct{}), + dopts: defaultDialOptions(), + nameResolutionDelayed: false, } cc.retryThrottler.Store((*retryThrottler)(nil)) @@ -604,6 +605,10 @@ type ClientConn struct { idlenessMgr *idle.Manager metricsRecorderList *stats.MetricsRecorderList + // To track if there was a delay in name resolution, helping to track + // latency issues in gRPC connection setup. + nameResolutionDelayed bool + // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. csMgr *connectivityStateManager diff --git a/stats/handlers.go b/stats/handlers.go index dc03731e45ef..c1dd190c1609 100644 --- a/stats/handlers.go +++ b/stats/handlers.go @@ -38,6 +38,11 @@ type RPCTagInfo struct { // FailFast indicates if this RPC is failfast. // This field is only valid on client side, it's always false on server side. FailFast bool + // NameResolutionDelay indicates whether there was a delay in name + // resolution. + // + // This field is only valid on client side, it's always false on server side. + NameResolutionDelay bool } // Handler defines the interface for the related stats handling (e.g., RPCs, connections). diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 265791e5a261..cb13182d7b9a 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -18,10 +18,15 @@ package opentelemetry import ( "context" + "strings" "sync/atomic" "time" + "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + grpccodes "google.golang.org/grpc/codes" estats "google.golang.org/grpc/experimental/stats" istats "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/metadata" @@ -33,6 +38,7 @@ import ( ) type clientStatsHandler struct { + statsHandler estats.MetricsRecorder options Options clientMetrics clientMetrics @@ -68,6 +74,15 @@ func (h *clientStatsHandler) initializeMetrics() { rm.registerMetrics(metrics, meter) } +func (h *clientStatsHandler) initializeTracing() { + if isTracingDisabled(h.options.TraceOptions) { + return + } + + otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator) + otel.SetTracerProvider(h.options.TraceOptions.TracerProvider) +} + func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ci := &callInfo{ target: cc.CanonicalTarget(), @@ -85,8 +100,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string } startTime := time.Now() + var span *trace.Span + if !isTracingDisabled(h.options.TraceOptions) { + ctx, span = h.createCallTraceSpan(ctx, method) + } err := invoker(ctx, method, req, reply, cc, opts...) - h.perCallMetrics(ctx, err, startTime, ci) + h.perCallTracesAndMetrics(ctx, err, startTime, ci, span) return err } @@ -119,22 +138,50 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S } startTime := time.Now() - + var span *trace.Span + if !isTracingDisabled(h.options.TraceOptions) { + ctx, span = h.createCallTraceSpan(ctx, method) + } callback := func(err error) { - h.perCallMetrics(ctx, err, startTime, ci) + h.perCallTracesAndMetrics(ctx, err, startTime, ci, span) } opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) return streamer(ctx, desc, cc, method, opts...) } -func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { - callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP - attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( - otelattribute.String("grpc.method", ci.method), - otelattribute.String("grpc.target", ci.target), - otelattribute.String("grpc.status", canonicalString(status.Code(err))), - )) - h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) +// perCallTracesAndMetrics records per call trace spans and metrics. +func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts *trace.Span) { + if !isTracingDisabled(h.options.TraceOptions) && ts != nil { + s := status.Convert(err) + if s.Code() == grpccodes.OK { + (*ts).SetStatus(otelcodes.Ok, s.Message()) + } else { + (*ts).SetStatus(otelcodes.Error, s.Message()) + } + (*ts).End() + } + if !isMetricsDisabled(h.options.MetricsOptions) { + callLatency := float64(time.Since(startTime)) / float64(time.Second) + attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( + otelattribute.String("grpc.method", ci.method), + otelattribute.String("grpc.target", ci.target), + otelattribute.String("grpc.status", canonicalString(status.Code(err))), + )) + h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) + } +} + +// createCallTraceSpan creates a call span to put in the provided context using +// provided TraceProvider. If TraceProvider is nil, it returns context as is. +func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, *trace.Span) { + if h.options.TraceOptions.TracerProvider == nil { + logger.Error("TraceProvider is not provided in trace options") + return ctx, nil + } + mn := strings.Replace(removeLeadingSlash(method), "/", ".", -1) + tracer := otel.Tracer("grpc-open-telemetry") + ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient)) + return ctx, &span } // TagConn exists to satisfy stats.Handler. @@ -163,15 +210,21 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) } ctx = istats.SetLabels(ctx, labels) } - ai := &attemptInfo{ // populates information about RPC start. - startTime: time.Now(), - xdsLabels: labels.TelemetryLabels, - method: info.FullMethodName, + ai := &attemptInfo{} + startTime := time.Now() + if !isTracingDisabled(h.options.TraceOptions) { + callSpan := trace.SpanFromContext(ctx) + if info.NameResolutionDelay { + callSpan.AddEvent("Delayed name resolution complete") + } + ctx, ai = h.traceTagRPC(trace.ContextWithSpan(ctx, callSpan), info) } - ri := &rpcInfo{ + ai.startTime = startTime + ai.xdsLabels = labels.TelemetryLabels + ai.method = info.FullMethodName + return setRPCInfo(ctx, &rpcInfo{ ai: ai, - } - return setRPCInfo(ctx, ri) + }) } func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { @@ -180,7 +233,12 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present") return } - h.processRPCEvent(ctx, rs, ri.ai) + if !isMetricsDisabled(h.options.MetricsOptions) { + h.processRPCEvent(ctx, rs, ri.ai) + } + if !isTracingDisabled(h.options.TraceOptions) { + h.populateSpan(ctx, rs, ri.ai) + } } func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index e56c0fe94805..ecdc8178fe9a 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -14,16 +14,21 @@ * limitations under the License. */ -package opentelemetry_test +package opentelemetry import ( "context" "fmt" - "io" "testing" "time" + otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" + + "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" + trace2 "go.opentelemetry.io/otel/trace" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -35,45 +40,69 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/internal/grpcsync" - "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" itestutils "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" setup "google.golang.org/grpc/internal/testutils/xds/e2e/setup" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/orca" - "google.golang.org/grpc/stats/opentelemetry" "google.golang.org/grpc/stats/opentelemetry/internal/testutils" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) -var defaultTestTimeout = 5 * time.Second +// traceSpanInfo is the information received about the span. This is a subset +// of information that is important to verify that gRPC has knobs over, which +// goes through a stable OpenTelemetry API with well-defined behavior. This keeps +// the robustness of assertions over time. +type traceSpanInfo struct { + spanKind string + name string + events []trace.Event + attributes []attribute.KeyValue +} -type s struct { - grpctest.Tester +// defaultMetricsOptions creates default metrics options +func defaultMetricsOptions(_ *testing.T, methodAttributeFilter func(string) bool) (*MetricsOptions, *metric.ManualReader) { + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + metricsOptions := &MetricsOptions{ + MeterProvider: provider, + Metrics: DefaultMetrics(), + MethodAttributeFilter: methodAttributeFilter, + } + return metricsOptions, reader } -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) +// defaultTraceOptions function to create default trace options +func defaultTraceOptions(_ *testing.T) (*TraceOptions, *tracetest.InMemoryExporter) { + spanExporter := tracetest.NewInMemoryExporter() + spanProcessor := trace.NewSimpleSpanProcessor(spanExporter) + tracerProvider := trace.NewTracerProvider(trace.WithSpanProcessor(spanProcessor)) + textMapPropagator := propagation.NewCompositeTextMapPropagator(GRPCTraceBinPropagator{}) + traceOptions := &TraceOptions{ + TracerProvider: tracerProvider, + TextMapPropagator: textMapPropagator, + } + return traceOptions, spanExporter } // setupStubServer creates a stub server with OpenTelemetry component configured on client -// and server side. It returns a reader for metrics emitted from OpenTelemetry -// component and the server. -func setupStubServer(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { - reader := metric.NewManualReader() - provider := metric.NewMeterProvider(metric.WithReader(reader)) +// and server side and returns the server. +func setupStubServer(t *testing.T, metricsOptions *MetricsOptions, traceOptions *TraceOptions) *stubserver.StubServer { ss := &stubserver.StubServer{ - UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + UnaryCallF: func(_ context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ Body: make([]byte, len(in.GetPayload().GetBody())), }}, nil @@ -88,20 +117,19 @@ func setupStubServer(t *testing.T, methodAttributeFilter func(string) bool) (*me }, } - if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(opentelemetry.Options{ - MetricsOptions: opentelemetry.MetricsOptions{ - MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics(), - MethodAttributeFilter: methodAttributeFilter, - }})}, opentelemetry.DialOption(opentelemetry.Options{ - MetricsOptions: opentelemetry.MetricsOptions{ - MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics(), - }, - })); err != nil { + otelOptions := Options{} + if metricsOptions != nil { + otelOptions.MetricsOptions = *metricsOptions + } + if traceOptions != nil { + otelOptions.TraceOptions = *traceOptions + } + + if err := ss.Start([]grpc.ServerOption{ServerOption(otelOptions)}, + DialOption(otelOptions)); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } - return reader, ss + return ss } // TestMethodAttributeFilter tests the method attribute filter. The method @@ -112,7 +140,8 @@ func (s) TestMethodAttributeFilter(t *testing.T) { // Will allow duplex/any other type of RPC. return str != testgrpc.TestService_UnaryCall_FullMethodName } - reader, ss := setupStubServer(t, maf) + mo, reader := defaultMetricsOptions(t, maf) + ss := setupStubServer(t, mo, nil) defer ss.Stop() // Make a Unary and Streaming RPC. The Unary RPC should be filtered by the @@ -197,7 +226,8 @@ func (s) TestMethodAttributeFilter(t *testing.T) { // on the Client (no StaticMethodCallOption set) and Server. The method // attribute on subsequent metrics should be bucketed in "other". func (s) TestAllMetricsOneFunction(t *testing.T) { - reader, ss := setupStubServer(t, nil) + mo, reader := defaultMetricsOptions(t, nil) + ss := setupStubServer(t, mo, nil) defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -362,7 +392,7 @@ func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map func (s) TestWRRMetrics(t *testing.T) { cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder) backend1 := stubserver.StartTestService(t, &stubserver.StubServer{ - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { // Copy metrics from what the test set in cmr into r. sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() @@ -380,7 +410,7 @@ func (s) TestWRRMetrics(t *testing.T) { cmr.SetApplicationUtilization(1.0) backend2 := stubserver.StartTestService(t, &stubserver.StubServer{ - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { // Copy metrics from what the test set in cmr into r. sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() @@ -452,14 +482,14 @@ func (s) TestWRRMetrics(t *testing.T) { reader := metric.NewManualReader() provider := metric.NewMeterProvider(metric.WithReader(reader)) - mo := opentelemetry.MetricsOptions{ + mo := MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"), + Metrics: DefaultMetrics().Add("grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"), OptionalLabels: []string{"grpc.lb.locality"}, } target := fmt.Sprintf("xds:///%s", serviceName) - cc, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver), opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})) + cc, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver), DialOption(Options{MetricsOptions: mo})) if err != nil { t.Fatalf("Failed to dial local test server: %v", err) } @@ -582,3 +612,568 @@ func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.Manual return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err()) } + +// TestServerWithMetricsAndTraceOptions tests emitted metrics and traces from +// OpenTelemetry instrumentation component. It then configures a system with a gRPC +// Client and gRPC server with the OpenTelemetry Dial and Server Option configured +// specifying all the metrics and traces provided by this package, and makes a Unary +// RPC and a Streaming RPC. These two RPCs should cause certain recording for each +// registered metric observed through a Manual Metrics Reader on the provided +// OpenTelemetry SDK's Meter Provider. It also verifies the traces are recorded +// correctly. +func (s) TestServerWithMetricsAndTraceOptions(t *testing.T) { + // Create default metrics options + mo, reader := defaultMetricsOptions(t, nil) + // Create default trace options + to, exporter := defaultTraceOptions(t) + + ss := setupStubServer(t, mo, to) + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create a parent span for the client call + ctx, _ = otel.Tracer("grpc-open-telemetry").Start(ctx, "test-parent-span") + md, _ := metadata.FromOutgoingContext(ctx) + otel.GetTextMapPropagator().Inject(ctx, otelinternaltracing.NewOutgoingCarrier(ctx)) + ctx = metadata.NewOutgoingContext(ctx, md) + + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain metrics and traces to be emitted. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}, grpc.UseCompressor(gzip.Name)); err != nil { // Deterministic compression. + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } + + // Verify metrics + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + wantMetrics := testutils.MetricData(testutils.MetricDataOptions{ + Target: ss.Target, + UnaryCompressedMessageSize: float64(57), + }) + testutils.CompareMetrics(ctx, t, reader, gotMetrics, wantMetrics) + + // Verify traces + spans := exporter.GetSpans() + if got, want := len(spans), 6; got != want { + t.Fatalf("Got %d spans, want %d", got, want) + } + + // Add assertions for specific span attributes and events as needed. + // For example, to check if the server span has the correct status: + serverSpan := spans[0] + if got, want := serverSpan.Status.Code, otelcodes.Ok; got != want { + t.Errorf("Got status code %v, want %v", got, want) + } +} + +// TestSpan verifies that the gRPC Trace Binary propagator +// correctly propagates span context between a client and server using the +// grpc-trace-bin header. It sets up a stub server with OpenTelemetry tracing +// enabled, makes a unary RPC. +func (s) TestSpan(t *testing.T) { + // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter + traceOptions, spanExporter := defaultTraceOptions(t) + // Start the server with OpenTelemetry options + ss := setupStubServer(t, nil, traceOptions) + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain metrics to be emitted, which should be able to be observed + // through the Metric Reader. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } + // Get the spans from the exporter + spans := spanExporter.GetSpans() + if got, want := len(spans), 6; got != want { + t.Fatalf("Got %d spans, want %d", got, want) + } + + wantSI := []traceSpanInfo{ + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: trace2.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + }, + { + Key: "FailFast", + }, + { + Key: "previous-rpc-attempts", + }, + { + Key: "transparent-retry", + }, + }, + events: []trace.Event{ + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + }, + { + Key: "message-size", + }, + { + Key: "message-size-compressed", + }, + }, + }, + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + }, + { + Key: "message-size", + }, + { + Key: "message-size-compressed", + }, + }, + }, + }, + }, + { + name: "Attempt.grpc.testing.TestService.UnaryCall", + spanKind: trace2.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + }, + { + Key: "FailFast", + }, + { + Key: "previous-rpc-attempts", + }, + { + Key: "transparent-retry", + }, + }, + events: []trace.Event{ + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + }, + { + Key: "message-size", + }, + { + Key: "message-size-compressed", + }, + }, + }, + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + }, + { + Key: "message-size", + }, + { + Key: "message-size-compressed", + }, + }, + }, + }, + }, + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: trace2.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: trace2.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + }, + { + Key: "FailFast", + }, + { + Key: "previous-rpc-attempts", + }, + { + Key: "transparent-retry", + }, + }, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: trace2.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "Attempt.grpc.testing.TestService.FullDuplexCall", + spanKind: trace2.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + }, + { + Key: "FailFast", + }, + { + Key: "previous-rpc-attempts", + }, + { + Key: "transparent-retry", + }, + }, + events: []trace.Event{}, + }, + } + + // Check that same traceID is used in client and server. + if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + // Check that same traceID is used in client and server. + if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + for index, span := range spans { + // Check that the attempt span has the correct status + if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { + t.Errorf("Got status code %v, want %v", got, want) + } + // name + if got, want := span.Name, wantSI[index].name; got != want { + t.Errorf("Span name is %q, want %q", got, want) + } + // spanKind + if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { + t.Errorf("Got span kind %q, want %q", got, want) + } + // attributes + if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { + t.Errorf("Got attributes list of size %q, want %q", got, want) + } + for idx, att := range span.Attributes { + if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %q as %q, want %q", span.Name, got, want) + } + } + // events + if got, want := len(span.Events), len(wantSI[index].events); got != want { + t.Errorf("Event length is %q, want %q", got, want) + } + for eventIdx, event := range span.Events { + if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { + t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) + } + for idx, att := range span.Attributes { + if got, want := att.Key, wantSI[eventIdx].attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %q with event name %q, as %q, want %q", span.Name, event.Name, got, want) + } + } + } + } +} + +// TestSpan_WithW3CContextPropagator sets up a stub server with OpenTelemetry tracing +// enabled makes a unary and a streaming RPC, and then, asserts that the correct +// number of spans are created with the expected spans. +func (s) TestSpan_WithW3CContextPropagator(t *testing.T) { + // Using defaultTraceOptions to set up OpenTelemetry with an in-memory exporter + traceOptions, spanExporter := defaultTraceOptions(t) + // Set the W3CContextPropagator as part of TracingOptions. + traceOptions.TextMapPropagator = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}) + // Start the server with OpenTelemetry options + ss := setupStubServer(t, nil, traceOptions) + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Make two RPC's, a unary RPC and a streaming RPC. These should cause + // certain metrics to be emitted, which should be able to be observed + // through the Metric Reader. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } + // Get the spans from the exporter + spans := spanExporter.GetSpans() + if got, want := len(spans), 6; got != want { + t.Fatalf("Got %d spans, want %d", got, want) + } + + wantSI := []traceSpanInfo{ + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: trace2.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + }, + { + Key: "FailFast", + }, + { + Key: "previous-rpc-attempts", + }, + { + Key: "transparent-retry", + }, + }, + events: []trace.Event{ + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + }, + { + Key: "message-size", + }, + { + Key: "message-size-compressed", + }, + }, + }, + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + }, + { + Key: "message-size", + }, + { + Key: "message-size-compressed", + }, + }, + }, + }, + }, + { + name: "Attempt.grpc.testing.TestService.UnaryCall", + spanKind: trace2.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + }, + { + Key: "FailFast", + }, + { + Key: "previous-rpc-attempts", + }, + { + Key: "transparent-retry", + }, + }, + events: []trace.Event{ + { + Name: "Outbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + }, + { + Key: "message-size", + }, + { + Key: "message-size-compressed", + }, + }, + }, + { + Name: "Inbound compressed message", + Attributes: []attribute.KeyValue{ + { + Key: "sequence-number", + }, + { + Key: "message-size", + }, + { + Key: "message-size-compressed", + }, + }, + }, + }, + }, + { + name: "grpc.testing.TestService.UnaryCall", + spanKind: trace2.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: trace2.SpanKindServer.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + }, + { + Key: "FailFast", + }, + { + Key: "previous-rpc-attempts", + }, + { + Key: "transparent-retry", + }, + }, + events: []trace.Event{}, + }, + { + name: "grpc.testing.TestService.FullDuplexCall", + spanKind: trace2.SpanKindClient.String(), + attributes: []attribute.KeyValue{}, + events: []trace.Event{}, + }, + { + name: "Attempt.grpc.testing.TestService.FullDuplexCall", + spanKind: trace2.SpanKindInternal.String(), + attributes: []attribute.KeyValue{ + { + Key: "Client", + }, + { + Key: "FailFast", + }, + { + Key: "previous-rpc-attempts", + }, + { + Key: "transparent-retry", + }, + }, + events: []trace.Event{}, + }, + } + + // Check that same traceID is used in client and server. + if got, want := spans[0].SpanContext.TraceID(), spans[2].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[0].Parent.SpanID(), spans[1].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + // Check that same traceID is used in client and server. + if got, want := spans[3].SpanContext.TraceID(), spans[4].SpanContext.TraceID(); got != want { + t.Fatal("TraceID mismatch in client span and server span.") + } + // Check that the attempt span id of client matches the span id of server + // SpanContext. + if got, want := spans[3].Parent.SpanID(), spans[5].SpanContext.SpanID(); got != want { + t.Fatal("SpanID mismatch in client span and server span.") + } + + for index, span := range spans { + // Check that the attempt span has the correct status + if got, want := spans[index].Status.Code, otelcodes.Ok; got != want { + t.Errorf("Got status code %v, want %v", got, want) + } + // name + if got, want := span.Name, wantSI[index].name; got != want { + t.Errorf("Span name is %q, want %q", got, want) + } + // spanKind + if got, want := span.SpanKind.String(), wantSI[index].spanKind; got != want { + t.Errorf("Got span kind %q, want %q", got, want) + } + // attributes + if got, want := len(span.Attributes), len(wantSI[index].attributes); got != want { + t.Errorf("Got attributes list of size %q, want %q", got, want) + } + for idx, att := range span.Attributes { + if got, want := att.Key, wantSI[index].attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %q as %q, want %q", span.Name, got, want) + } + } + // events + if got, want := len(span.Events), len(wantSI[index].events); got != want { + t.Errorf("Event length is %q, want %q", got, want) + } + for eventIdx, event := range span.Events { + if got, want := event.Name, wantSI[index].events[eventIdx].Name; got != want { + t.Errorf("Got event name for span name %q as %q, want %q", span.Name, got, want) + } + for idx, att := range span.Attributes { + if got, want := att.Key, wantSI[eventIdx].attributes[idx].Key; got != want { + t.Errorf("Got attribute key for span name %q with event name %q, as %q, want %q", span.Name, event.Name, got, want) + } + } + } + } +} diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index dcc424775f14..94e01b268233 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -38,6 +38,8 @@ import ( otelattribute "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" ) func init() { @@ -56,6 +58,8 @@ var joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.Di type Options struct { // MetricsOptions are the metrics options for OpenTelemetry instrumentation. MetricsOptions MetricsOptions + // TraceOptions are the tracing options for OpenTelemetry instrumentation. + TraceOptions TraceOptions } // MetricsOptions are the metrics options for OpenTelemetry instrumentation. @@ -90,6 +94,16 @@ type MetricsOptions struct { pluginOption otelinternal.PluginOption } +// TraceOptions are the tracing options for OpenTelemetry instrumentation. +type TraceOptions struct { + // TracerProvider is the OpenTelemetry tracer which is required to + // record traces/trace spans for instrumentation + TracerProvider trace.TracerProvider + + // TextMapPropagator propagates span context through text map carrier. + TextMapPropagator propagation.TextMapPropagator +} + // DialOption returns a dial option which enables OpenTelemetry instrumentation // code for a grpc.ClientConn. // @@ -105,6 +119,7 @@ type MetricsOptions struct { func DialOption(o Options) grpc.DialOption { csh := &clientStatsHandler{options: o} csh.initializeMetrics() + csh.initializeTracing() return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh)) } @@ -125,6 +140,7 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g func ServerOption(o Options) grpc.ServerOption { ssh := &serverStatsHandler{options: o} ssh.initializeMetrics() + ssh.initializeTracing() return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh)) } @@ -171,6 +187,14 @@ func removeLeadingSlash(mn string) string { return strings.TrimLeft(mn, "/") } +func isMetricsDisabled(mo MetricsOptions) bool { + return mo.MeterProvider == nil +} + +func isTracingDisabled(to TraceOptions) bool { + return to.TracerProvider == nil || to.TextMapPropagator == nil +} + // attemptInfo is RPC information scoped to the RPC attempt life span client // side, and the RPC life span server side. type attemptInfo struct { @@ -187,6 +211,15 @@ type attemptInfo struct { pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted xdsLabels map[string]string + + // traceSpan is data used for recording traces. + traceSpan trace.Span + // message counters for sent and received messages (used for + // generating message IDs), and the number of previous RPC attempts for the + // associated call. + countSentMsg uint32 + countRecvMsg uint32 + previousRPCAttempts uint32 } type clientMetrics struct { diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index 4765afa8ed53..f61c13571415 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -28,11 +28,13 @@ import ( "google.golang.org/grpc/stats" "google.golang.org/grpc/status" + "go.opentelemetry.io/otel" otelattribute "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" ) type serverStatsHandler struct { + statsHandler estats.MetricsRecorder options Options serverMetrics serverMetrics @@ -66,6 +68,15 @@ func (h *serverStatsHandler) initializeMetrics() { rm.registerMetrics(metrics, meter) } +func (h *serverStatsHandler) initializeTracing() { + if !isTracingDisabled(h.options.TraceOptions) { + return + } + + otel.SetTextMapPropagator(h.options.TraceOptions.TextMapPropagator) + otel.SetTracerProvider(h.options.TraceOptions.TracerProvider) +} + // attachLabelsTransportStream intercepts SetHeader and SendHeader calls of the // underlying ServerTransportStream to attach metadataExchangeLabels. type attachLabelsTransportStream struct { @@ -197,14 +208,16 @@ func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) } } - ai := &attemptInfo{ - startTime: time.Now(), - method: removeLeadingSlash(method), + ai := &attemptInfo{} + startTime := time.Now() + if !isTracingDisabled(h.options.TraceOptions) { + ctx, ai = h.traceTagRPC(ctx, info) } - ri := &rpcInfo{ + ai.startTime = startTime + ai.method = removeLeadingSlash(method) + return setRPCInfo(ctx, &rpcInfo{ ai: ai, - } - return setRPCInfo(ctx, ri) + }) } // HandleRPC implements per RPC tracing and stats implementation. @@ -214,7 +227,12 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present") return } - h.processRPCData(ctx, rs, ri.ai) + if !isTracingDisabled(h.options.TraceOptions) { + h.populateSpan(ctx, rs, ri.ai) + } + if !isMetricsDisabled(h.options.MetricsOptions) { + h.processRPCData(ctx, rs, ri.ai) + } } func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go new file mode 100644 index 000000000000..0101fc85f752 --- /dev/null +++ b/stats/opentelemetry/trace.go @@ -0,0 +1,140 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opentelemetry + +import ( + "context" + "strings" + "sync/atomic" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/stats" + otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing" + "google.golang.org/grpc/status" +) + +// traceTagRPC populates provided context with a new span using the +// TextMapPropagator supplied in trace options and internal itracing.carrier. +// It creates a new outgoing carrier which serializes information about this +// span into gRPC Metadata, if TextMapPropagator is provided in the trace +// options. if TextMapPropagator is not provided, it returns the context as is. +func (h *clientStatsHandler) traceTagRPC(ctx context.Context, rti *stats.RPCTagInfo) (context.Context, *attemptInfo) { + if h.options.TraceOptions.TextMapPropagator == nil { + return ctx, nil + } + + mn := "Attempt." + strings.Replace(removeLeadingSlash(rti.FullMethodName), "/", ".", -1) + tracer := otel.Tracer("grpc-open-telemetry") + ctx, span := tracer.Start(ctx, mn) + carrier := otelinternaltracing.NewOutgoingCarrier(ctx) + otel.GetTextMapPropagator().Inject(ctx, carrier) + + return carrier.Context(), &attemptInfo{ + traceSpan: span, + countSentMsg: 0, // msg events scoped to scope of context, per attempt client side + countRecvMsg: 0, + } +} + +// traceTagRPC populates context with new span data using the TextMapPropagator +// supplied in trace options and internal itracing.Carrier. It creates a new +// incoming carrier which extracts an existing span context (if present) by +// deserializing from provided context. If valid span context is extracted, it +// is set as parent of the new span otherwise new span remains the root span. +// If TextMapPropagator is not provided in the trace options, it returns context +// as is. +func (h *serverStatsHandler) traceTagRPC(ctx context.Context, rti *stats.RPCTagInfo) (context.Context, *attemptInfo) { + if h.options.TraceOptions.TextMapPropagator == nil { + return ctx, nil + } + + mn := strings.Replace(removeLeadingSlash(rti.FullMethodName), "/", ".", -1) + var span trace.Span + tracer := otel.Tracer("grpc-open-telemetry") + ctx = otel.GetTextMapPropagator().Extract(ctx, otelinternaltracing.NewIncomingCarrier(ctx)) + // If the context.Context provided in `ctx` to tracer.Start(), contains a + // Span then the newly-created Span will be a child of that span, + // otherwise it will be a root span. + ctx, span = tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindServer)) + return ctx, &attemptInfo{ + traceSpan: span, + countSentMsg: 0, + countRecvMsg: 0, + } +} + +// statsHandler holds common functionality for both client and server stats +// handler. +type statsHandler struct{} + +// populateSpan populates span information based on stats passed in, representing +// invariants of the RPC lifecycle. It ends the span, triggering its export. +// This function handles attempt spans on the client-side and call spans on the +// server-side. +func (h *statsHandler) populateSpan(_ context.Context, rs stats.RPCStats, ai *attemptInfo) { + if ai == nil || ai.traceSpan == nil { + // Shouldn't happen, tagRPC call comes before this function gets called + // which populates this information. + logger.Error("ctx passed into stats handler tracing event handling has no traceSpan present") + return + } + span := ai.traceSpan + + switch rs := rs.(type) { + case *stats.Begin: + // Note: Go always added Client and FailFast attributes even though they are not + // defined by the OpenCensus gRPC spec. Thus, they are unimportant for + // correctness. + span.SetAttributes( + attribute.Bool("Client", rs.Client), + attribute.Bool("FailFast", rs.Client), + attribute.Int64("previous-rpc-attempts", int64(ai.previousRPCAttempts)), + attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt), + ) + // increment previous rpc attempts applicable for next attempt + atomic.AddUint32(&ai.previousRPCAttempts, 1) + case *stats.PickerUpdated: + span.AddEvent("Delayed LB pick complete") + case *stats.InPayload: + // message id - "must be calculated as two different counters starting + // from one for sent messages and one for received messages." + mi := atomic.AddUint32(&ai.countRecvMsg, 1) + span.AddEvent("Inbound compressed message", trace.WithAttributes( + attribute.Int64("sequence-number", int64(mi)), + attribute.Int64("message-size", int64(rs.Length)), + attribute.Int64("message-size-compressed", int64(rs.CompressedLength)), + )) + case *stats.OutPayload: + mi := atomic.AddUint32(&ai.countSentMsg, 1) + span.AddEvent("Outbound compressed message", trace.WithAttributes( + attribute.Int64("sequence-number", int64(mi)), + attribute.Int64("message-size", int64(rs.Length)), + attribute.Int64("message-size-compressed", int64(rs.CompressedLength)), + )) + case *stats.End: + if rs.Error != nil { + s := status.Convert(rs.Error) + span.SetStatus(otelcodes.Error, s.Message()) + } else { + span.SetStatus(otelcodes.Ok, "Ok") + } + span.End() + } +} diff --git a/stream.go b/stream.go index 17e2267b3320..7309e1eb6d07 100644 --- a/stream.go +++ b/stream.go @@ -213,13 +213,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // Provide an opportunity for the first RPC to see the first service config // provided by the resolver. if err := cc.waitForResolvedAddrs(ctx); err != nil { + cc.nameResolutionDelayed = true return nil, err } - var mc serviceconfig.MethodConfig var onCommit func() - newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) { - return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...) + var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { + return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, cc.nameResolutionDelayed, opts...) } rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method} @@ -257,7 +257,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth return newStream(ctx, func() {}) } -func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) { +func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) { c := defaultCallInfo() if mc.WaitForReady != nil { c.failFast = !*mc.WaitForReady @@ -321,19 +321,20 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client } cs := &clientStream{ - callHdr: callHdr, - ctx: ctx, - methodConfig: &mc, - opts: opts, - callInfo: c, - cc: cc, - desc: desc, - codec: c.codec, - cp: cp, - comp: comp, - cancel: cancel, - firstAttempt: true, - onCommit: onCommit, + callHdr: callHdr, + ctx: ctx, + methodConfig: &mc, + opts: opts, + callInfo: c, + cc: cc, + desc: desc, + codec: c.codec, + cp: cp, + comp: comp, + cancel: cancel, + firstAttempt: true, + onCommit: onCommit, + nameResolutionDelayed: nameResolutionDelayed, } if !cc.dopts.disableRetry { cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) @@ -417,7 +418,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) var beginTime time.Time shs := cs.cc.dopts.copts.StatsHandlers for _, sh := range shs { - ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast}) + ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast, NameResolutionDelay: cs.nameResolutionDelayed}) beginTime = time.Now() begin := &stats.Begin{ Client: true, @@ -555,6 +556,8 @@ type clientStream struct { // synchronized. serverHeaderBinlogged bool + nameResolutionDelayed bool + mu sync.Mutex firstAttempt bool // if true, transparent retry is valid numRetries int // exclusive of transparent retry attempt(s)