Skip to content

Commit

Permalink
Adds streaming tracing middleware variant
Browse files Browse the repository at this point in the history
Also chains streaming middlewares
  • Loading branch information
wscalf committed Jul 19, 2024
1 parent faa3704 commit d637e3d
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 3 deletions.
8 changes: 5 additions & 3 deletions internal/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/middleware/validate"
"github.com/go-kratos/kratos/v2/transport/grpc"
googlegrpc "google.golang.org/grpc"
)

// NewGRPCServer new a gRPC server.
Expand All @@ -22,9 +23,10 @@ func NewGRPCServer(c *conf.Server, relations *service.RelationshipsService, heal
validate.Validator(),
logging.Server(logger),
),
grpc.StreamInterceptor(middleware.StreamLogInterceptor(logger)),
grpc.StreamInterceptor(middleware.StreamValidationInterceptor()),
grpc.StreamInterceptor(middleware.StreamRecoveryInterceptor(logger)),
grpc.Options(googlegrpc.ChainStreamInterceptor(middleware.StreamLogInterceptor(logger),
middleware.StreamValidationInterceptor(),
middleware.StreamRecoveryInterceptor(logger),
middleware.StreamTracingInterceptor())),
}
if c.Grpc.Network != "" {
opts = append(opts, grpc.Network(c.Grpc.Network))
Expand Down
156 changes: 156 additions & 0 deletions internal/server/middleware/span.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package middleware

// Taken from: middleware/tracing/span.go

import (
"context"
"net"
"net/url"
"strings"

"github.com/go-kratos/kratos/v2/metadata"
"github.com/go-kratos/kratos/v2/transport"
"github.com/go-kratos/kratos/v2/transport/http"

"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"
)

const serviceHeader = "x-md-service-name" //Taken from Kratos: middleware/tracing/metadata.go

func setClientSpan(ctx context.Context, span trace.Span, m interface{}) {

Check failure on line 24 in internal/server/middleware/span.go

View workflow job for this annotation

GitHub Actions / lint

func `setClientSpan` is unused (unused)
var (
attrs []attribute.KeyValue
remote string
operation string
rpcKind string
)
tr, ok := transport.FromClientContext(ctx)
if ok {
operation = tr.Operation()
rpcKind = tr.Kind().String()
switch tr.Kind() {
case transport.KindHTTP:
if ht, ok := tr.(http.Transporter); ok {
method := ht.Request().Method
route := ht.PathTemplate()
path := ht.Request().URL.Path
attrs = append(attrs, semconv.HTTPMethodKey.String(method))
attrs = append(attrs, semconv.HTTPRouteKey.String(route))
attrs = append(attrs, semconv.HTTPTargetKey.String(path))
remote = ht.Request().Host
}
case transport.KindGRPC:
remote, _ = parseTarget(tr.Endpoint())
}
}
attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind))
_, mAttrs := parseFullMethod(operation)
attrs = append(attrs, mAttrs...)
if remote != "" {
attrs = append(attrs, peerAttr(remote)...)
}
if p, ok := m.(proto.Message); ok {
attrs = append(attrs, attribute.Key("send_msg.size").Int(proto.Size(p)))
}

span.SetAttributes(attrs...)
}

func setServerSpan(ctx context.Context, span trace.Span, m interface{}) {
var (
attrs []attribute.KeyValue
remote string
operation string
rpcKind string
)
tr, ok := transport.FromServerContext(ctx)
if ok {
operation = tr.Operation()
rpcKind = tr.Kind().String()
switch tr.Kind() {
case transport.KindHTTP:
if ht, ok := tr.(http.Transporter); ok {
method := ht.Request().Method
route := ht.PathTemplate()
path := ht.Request().URL.Path
attrs = append(attrs, semconv.HTTPMethodKey.String(method))
attrs = append(attrs, semconv.HTTPRouteKey.String(route))
attrs = append(attrs, semconv.HTTPTargetKey.String(path))
remote = ht.Request().RemoteAddr
}
case transport.KindGRPC:
if p, ok := peer.FromContext(ctx); ok {
remote = p.Addr.String()
}
}
}
attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind))
_, mAttrs := parseFullMethod(operation)
attrs = append(attrs, mAttrs...)
attrs = append(attrs, peerAttr(remote)...)
if p, ok := m.(proto.Message); ok {
attrs = append(attrs, attribute.Key("recv_msg.size").Int(proto.Size(p)))
}
if md, ok := metadata.FromServerContext(ctx); ok {
attrs = append(attrs, semconv.PeerServiceKey.String(md.Get(serviceHeader)))
}

span.SetAttributes(attrs...)
}

// parseFullMethod returns a span name following the OpenTelemetry semantic
// conventions as well as all applicable span attribute.KeyValue attributes based
// on a gRPC's FullMethod.
func parseFullMethod(fullMethod string) (string, []attribute.KeyValue) {
name := strings.TrimLeft(fullMethod, "/")
parts := strings.SplitN(name, "/", 2)
if len(parts) != 2 { //nolint:gomnd
// Invalid format, does not follow `/package.service/method`.
return name, []attribute.KeyValue{attribute.Key("rpc.operation").String(fullMethod)}
}

var attrs []attribute.KeyValue
if service := parts[0]; service != "" {
attrs = append(attrs, semconv.RPCServiceKey.String(service))
}
if method := parts[1]; method != "" {
attrs = append(attrs, semconv.RPCMethodKey.String(method))
}
return name, attrs
}

// peerAttr returns attributes about the peer address.
func peerAttr(addr string) []attribute.KeyValue {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return []attribute.KeyValue(nil)
}

if host == "" {
host = "127.0.0.1"
}

return []attribute.KeyValue{
semconv.NetPeerIPKey.String(host),
semconv.NetPeerPortKey.String(port),
}
}

func parseTarget(endpoint string) (address string, err error) {

Check failure on line 143 in internal/server/middleware/span.go

View workflow job for this annotation

GitHub Actions / lint

func `parseTarget` is unused (unused)
var u *url.URL
u, err = url.Parse(endpoint)
if err != nil {
if u, err = url.Parse("http://" + endpoint); err != nil {
return "", err
}
return u.Host, nil
}
if len(u.Path) > 1 {
return u.Path[1:], nil
}
return endpoint, nil
}
27 changes: 27 additions & 0 deletions internal/server/middleware/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package middleware

import (
"github.com/go-kratos/kratos/v2/middleware/tracing"
"google.golang.org/grpc"

"go.opentelemetry.io/otel/trace"

"github.com/go-kratos/kratos/v2/transport"
)

// StreamTracingInterceptor returns a new server middleware for OpenTelemetry.
func StreamTracingInterceptor(opts ...tracing.Option) grpc.StreamServerInterceptor {
tracer := tracing.NewTracer(trace.SpanKindServer, opts...)
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
ctx := ss.Context()
if tr, ok := transport.FromServerContext(ctx); ok {
var span trace.Span
ctx, span = tracer.Start(ctx, tr.Operation(), tr.RequestHeader())
//Using nil requests and responses skips recording request and response size
//Alternatively could sum request/response sizes for the stream
setServerSpan(ctx, span, nil)
defer func() { tracer.End(ctx, span, nil, err) }()
}
return handler(srv, ss)
}
}

0 comments on commit d637e3d

Please sign in to comment.