From 86e99d1e8370c80182746337722f0862d8a478ca Mon Sep 17 00:00:00 2001 From: Jonathan Marcantonio Date: Thu, 18 Jul 2024 09:34:16 -0400 Subject: [PATCH] Return error to caller Signed-off-by: Jonathan Marcantonio --- internal/server/middleware/recovery.go | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/internal/server/middleware/recovery.go b/internal/server/middleware/recovery.go index 6a454bb..6b93a1d 100644 --- a/internal/server/middleware/recovery.go +++ b/internal/server/middleware/recovery.go @@ -1,19 +1,35 @@ package middleware import ( + "context" "fmt" "runtime" "time" "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/middleware/recovery" "google.golang.org/grpc" ) -func StreamRecoveryInterceptor(logger log.Logger) grpc.StreamServerInterceptor { - return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - wrapper := &requestInterceptingWrapper{ServerStream: ss} +type Option func(*options) + +type options struct { // Duplicated from https://github.com/go-kratos/kratos/blob/main/middleware/recovery/recovery.go b/c no export + handler recovery.HandlerFunc +} + +func StreamRecoveryInterceptor(logger log.Logger, opts ...Option) grpc.StreamServerInterceptor { + op := options{ + handler: func(ctx context.Context, req, err interface{}) error { + return recovery.ErrUnknownRequest + }, + } + for _, o := range opts { + o(&op) + } + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { startTime := time.Now() + wrapper := &requestInterceptingWrapper{ServerStream: ss} ctx := ss.Context() defer func() { @@ -27,7 +43,7 @@ func StreamRecoveryInterceptor(logger log.Logger) grpc.StreamServerInterceptor { "latency", time.Since(startTime).Seconds(), "reason", fmt.Sprintf("%v: %+v\n%s\n", rerr, wrapper.req, buf), ) - // fail silently? + err = op.handler(ctx, wrapper.req, rerr) } }() return handler(srv, wrapper)