diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 13c6361..9a67424 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -12,6 +12,7 @@ import ( "github.com/go-kratos/kratos/v2/middleware/recovery" "github.com/go-kratos/kratos/v2/middleware/validate" "github.com/go-kratos/kratos/v2/transport/grpc" + googlegrpc "google.golang.org/grpc" ) // NewGRPCServer new a gRPC server. @@ -22,9 +23,10 @@ func NewGRPCServer(c *conf.Server, relations *service.RelationshipsService, heal validate.Validator(), logging.Server(logger), ), - grpc.StreamInterceptor(middleware.StreamLogInterceptor(logger)), - grpc.StreamInterceptor(middleware.StreamValidationInterceptor()), - grpc.StreamInterceptor(middleware.StreamRecoveryInterceptor(logger)), + grpc.Options(googlegrpc.ChainStreamInterceptor(middleware.StreamLogInterceptor(logger), + middleware.StreamValidationInterceptor(), + middleware.StreamRecoveryInterceptor(logger), + middleware.StreamTracingInterceptor())), } if c.Grpc.Network != "" { opts = append(opts, grpc.Network(c.Grpc.Network)) diff --git a/internal/server/middleware/span.go b/internal/server/middleware/span.go new file mode 100644 index 0000000..a68e4d2 --- /dev/null +++ b/internal/server/middleware/span.go @@ -0,0 +1,156 @@ +package middleware + +// Taken from: middleware/tracing/span.go + +import ( + "context" + "net" + "net/url" + "strings" + + "github.com/go-kratos/kratos/v2/metadata" + "github.com/go-kratos/kratos/v2/transport" + "github.com/go-kratos/kratos/v2/transport/http" + + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/peer" + "google.golang.org/protobuf/proto" +) + +const serviceHeader = "x-md-service-name" //Taken from Kratos: middleware/tracing/metadata.go + +func setClientSpan(ctx context.Context, span trace.Span, m interface{}) { + var ( + attrs []attribute.KeyValue + remote string + operation string + rpcKind string + ) + tr, ok := transport.FromClientContext(ctx) + if ok { + operation = tr.Operation() + rpcKind = tr.Kind().String() + switch tr.Kind() { + case transport.KindHTTP: + if ht, ok := tr.(http.Transporter); ok { + method := ht.Request().Method + route := ht.PathTemplate() + path := ht.Request().URL.Path + attrs = append(attrs, semconv.HTTPMethodKey.String(method)) + attrs = append(attrs, semconv.HTTPRouteKey.String(route)) + attrs = append(attrs, semconv.HTTPTargetKey.String(path)) + remote = ht.Request().Host + } + case transport.KindGRPC: + remote, _ = parseTarget(tr.Endpoint()) + } + } + attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind)) + _, mAttrs := parseFullMethod(operation) + attrs = append(attrs, mAttrs...) + if remote != "" { + attrs = append(attrs, peerAttr(remote)...) + } + if p, ok := m.(proto.Message); ok { + attrs = append(attrs, attribute.Key("send_msg.size").Int(proto.Size(p))) + } + + span.SetAttributes(attrs...) +} + +func setServerSpan(ctx context.Context, span trace.Span, m interface{}) { + var ( + attrs []attribute.KeyValue + remote string + operation string + rpcKind string + ) + tr, ok := transport.FromServerContext(ctx) + if ok { + operation = tr.Operation() + rpcKind = tr.Kind().String() + switch tr.Kind() { + case transport.KindHTTP: + if ht, ok := tr.(http.Transporter); ok { + method := ht.Request().Method + route := ht.PathTemplate() + path := ht.Request().URL.Path + attrs = append(attrs, semconv.HTTPMethodKey.String(method)) + attrs = append(attrs, semconv.HTTPRouteKey.String(route)) + attrs = append(attrs, semconv.HTTPTargetKey.String(path)) + remote = ht.Request().RemoteAddr + } + case transport.KindGRPC: + if p, ok := peer.FromContext(ctx); ok { + remote = p.Addr.String() + } + } + } + attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind)) + _, mAttrs := parseFullMethod(operation) + attrs = append(attrs, mAttrs...) + attrs = append(attrs, peerAttr(remote)...) + if p, ok := m.(proto.Message); ok { + attrs = append(attrs, attribute.Key("recv_msg.size").Int(proto.Size(p))) + } + if md, ok := metadata.FromServerContext(ctx); ok { + attrs = append(attrs, semconv.PeerServiceKey.String(md.Get(serviceHeader))) + } + + span.SetAttributes(attrs...) +} + +// parseFullMethod returns a span name following the OpenTelemetry semantic +// conventions as well as all applicable span attribute.KeyValue attributes based +// on a gRPC's FullMethod. +func parseFullMethod(fullMethod string) (string, []attribute.KeyValue) { + name := strings.TrimLeft(fullMethod, "/") + parts := strings.SplitN(name, "/", 2) + if len(parts) != 2 { //nolint:gomnd + // Invalid format, does not follow `/package.service/method`. + return name, []attribute.KeyValue{attribute.Key("rpc.operation").String(fullMethod)} + } + + var attrs []attribute.KeyValue + if service := parts[0]; service != "" { + attrs = append(attrs, semconv.RPCServiceKey.String(service)) + } + if method := parts[1]; method != "" { + attrs = append(attrs, semconv.RPCMethodKey.String(method)) + } + return name, attrs +} + +// peerAttr returns attributes about the peer address. +func peerAttr(addr string) []attribute.KeyValue { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return []attribute.KeyValue(nil) + } + + if host == "" { + host = "127.0.0.1" + } + + return []attribute.KeyValue{ + semconv.NetPeerIPKey.String(host), + semconv.NetPeerPortKey.String(port), + } +} + +func parseTarget(endpoint string) (address string, err error) { + var u *url.URL + u, err = url.Parse(endpoint) + if err != nil { + if u, err = url.Parse("http://" + endpoint); err != nil { + return "", err + } + return u.Host, nil + } + if len(u.Path) > 1 { + return u.Path[1:], nil + } + return endpoint, nil +} diff --git a/internal/server/middleware/tracing.go b/internal/server/middleware/tracing.go new file mode 100644 index 0000000..3b41bae --- /dev/null +++ b/internal/server/middleware/tracing.go @@ -0,0 +1,27 @@ +package middleware + +import ( + "github.com/go-kratos/kratos/v2/middleware/tracing" + "google.golang.org/grpc" + + "go.opentelemetry.io/otel/trace" + + "github.com/go-kratos/kratos/v2/transport" +) + +// StreamTracingInterceptor returns a new server middleware for OpenTelemetry. +func StreamTracingInterceptor(opts ...tracing.Option) grpc.StreamServerInterceptor { + tracer := tracing.NewTracer(trace.SpanKindServer, opts...) + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + ctx := ss.Context() + if tr, ok := transport.FromServerContext(ctx); ok { + var span trace.Span + ctx, span = tracer.Start(ctx, tr.Operation(), tr.RequestHeader()) + //Using nil requests and responses skips recording request and response size + //Alternatively could sum request/response sizes for the stream + setServerSpan(ctx, span, nil) + defer func() { tracer.End(ctx, span, nil, err) }() + } + return handler(srv, ss) + } +}