Skip to content

Commit

Permalink
Added streaming-compatible gRPC log middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
wscalf committed Jun 26, 2024
1 parent 845939c commit a20f5a5
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func NewGRPCServer(c *conf.Server, relations *service.RelationshipsService, heal
validate.Validator(),
logging.Server(logger),
),
grpc.StreamInterceptor(StreamLogInterceptor(logger)),
}
if c.Grpc.Network != "" {
opts = append(opts, grpc.Network(c.Grpc.Network))
Expand Down
84 changes: 84 additions & 0 deletions internal/server/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package server

import (
"fmt"
"time"

"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/logging"
"google.golang.org/grpc"
)

func StreamLogInterceptor(logger log.Logger) grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
var (
code int32
reason string
kind string
operation string
)
ctx := ss.Context()
startTime := time.Now()
operation = info.FullMethod
kind = "server"
wrapper := &requestInterceptingWrapper{ServerStream: ss}
err := handler(srv, wrapper)

if se := errors.FromError(err); se != nil {
code = se.Code
reason = se.Reason
}
level, stack := extractError(err)

log.NewHelper(log.WithContext(ctx, logger)).Log(level,
"kind", kind,
"component", kind,
"operation", operation,
"args", extractArgs(wrapper.req),
"code", code,
"reason", reason,
"stack", stack,
"latency", time.Since(startTime).Seconds())

return err
}
}

type requestInterceptingWrapper struct {
req any
grpc.ServerStream
}

func (w *requestInterceptingWrapper) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m) //Includes deserializing m, all fields are empty before this point
if w.req == nil {
w.req = m
}

return err
}

func (w *requestInterceptingWrapper) SendMsg(m interface{}) error {
return w.ServerStream.SendMsg(m)
}

// Taken from Kratos logging middleware
// extractArgs returns the string of the req
func extractArgs(req interface{}) string {
if redacter, ok := req.(logging.Redacter); ok {
return redacter.Redact()
}
if stringer, ok := req.(fmt.Stringer); ok {
return stringer.String()
}
return fmt.Sprintf("%+v", req)
}

// extractError returns the string of the error
func extractError(err error) (log.Level, string) {
if err != nil {
return log.LevelError, fmt.Sprintf("%+v", err)
}
return log.LevelInfo, ""
}

0 comments on commit a20f5a5

Please sign in to comment.