From 15b8e6e9a693f27f8188fd887c42f8e2a3145c6a Mon Sep 17 00:00:00 2001 From: zhongrui Date: Thu, 22 Jul 2021 11:15:18 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E8=B6=85=E6=97=B6=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E5=BD=93=E6=B2=A1=E6=9C=89=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4=E6=97=B6=EF=BC=8C=E4=B8=8D?= =?UTF-8?q?=E5=8A=A0=E5=85=A5=E9=A2=9D=E5=A4=96=E8=B6=85=E6=97=B6=E7=A9=BA?= =?UTF-8?q?=E7=BD=AE=E9=80=BB=E8=BE=91=EF=BC=8C=E8=AF=B7=E6=B1=82=E7=9B=B4?= =?UTF-8?q?=E6=8E=A5=E6=94=BE=E5=9C=A8=E5=BD=93=E5=89=8Dgoroutine=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=EF=BC=8C=E4=BB=A5=E6=8F=90=E9=AB=98=E6=80=A7=E8=83=BD?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- breaker/cutbreaker.go | 2 +- command.go | 102 ++++++++++++++++++++++++------------------ command_test.go | 92 ++++++++++++++++++++++++++++++++----- 3 files changed, 140 insertions(+), 56 deletions(-) diff --git a/breaker/cutbreaker.go b/breaker/cutbreaker.go index b469002..d34ba8c 100644 --- a/breaker/cutbreaker.go +++ b/breaker/cutbreaker.go @@ -77,7 +77,7 @@ func (b *CutBreaker) allow(summary *internal.MetricSummary) (bool, string) { return false, "half-open" // 半开状态,说明已经有一个请求正在尝试,拒绝所有其它请求。 case Openning: - // 判断是否已经达到熔断时间。 + // 判断是否已过休眠时间。 if time.Since(summary.LastExecuteTime) < b.sleepWindow { return false, "open" } diff --git a/command.go b/command.go index a05e13f..e9ea7d5 100644 --- a/command.go +++ b/command.go @@ -22,8 +22,8 @@ type CommandFunc func(context.Context, interface{}) (interface{}, error) // error 为功能返回值的error。 type CommandFallbackFunc func(context.Context, interface{}, error) (interface{}, error) // 降级函数签名。 -var ErrTimeout error = errors.New("command: timeout") // 服务执行超时。 -var ErrFallback error = errors.New("command: unavailable") // 服务不可用(熔断器开启后返回)。 +var ErrTimeout error = errors.New("command: timeout") // 服务执行超时。 +var ErrUnavailable error = errors.New("command: unavailable") // 服务不可用(熔断器开启后返回)。 // 在断路器中执行的命令对象。 type Command struct { @@ -34,7 +34,7 @@ type Command struct { run CommandFunc // 功能函数。 fallback CommandFallbackFunc // 降级函数。 - timeout time.Duration // 超时时间。 + timeout *time.Duration // 超时时间。 breaker breaker.Breaker // 熔断器。 } @@ -43,9 +43,9 @@ func NewCommand(name string, run CommandFunc, options ...CommandOptionFunc) *Com ctx, cancel := context.WithCancel(context.Background()) // 这个context主要用于处理内部的资源释放,而非执行功能函数。 command := &Command{ - cancel: cancel, - name: name, - timeout: time.Second * 10, // 默认超时10s。 + cancel: cancel, + name: name, + run: run, } for _, option := range options { @@ -62,13 +62,14 @@ func NewCommand(name string, run CommandFunc, options ...CommandOptionFunc) *Com breaker.WithCutBreakerSleepWindow(5*time.Second)) } - // 对run方法包装一层超时处理。 - command.run = wrapCommandFuncWithTimeout(command, run) + if command.timeout != nil { + command.run = wrapCommandFuncWithTimeout(command, command.run) - // 如果有降级函数,也打包一层超时处理。 - // 执行时将通过command的默认超时时间新建一个context,不会复用功能函数的,以免累计超时时间。 - if command.fallback != nil { - command.fallback = wrapCommandFallbackFuncWithTimeout(command, command.fallback) + // 如果有降级函数,也打包一层超时处理。 + // 执行时将通过command的默认超时时间新建一个context,不会复用功能函数的,以免累计超时时间。 + if command.fallback != nil { + command.fallback = wrapCommandFallbackFuncWithTimeout(command, command.fallback) + } } return command @@ -83,30 +84,55 @@ func (command *Command) Execute(param interface{}) (interface{}, error) { func (command *Command) ContextExecute(ctx context.Context, param interface{}) (interface{}, error) { pass, statusMsg := command.breaker.Allow() - // 已经熔断走降级逻辑。 + // 已经熔断直接走降级逻辑。 if !pass { - openErr := fmt.Errorf("%s: %s: %w", command.name, statusMsg, ErrFallback) + openErr := fmt.Errorf("%s: %s: %w", command.name, statusMsg, ErrUnavailable) if command.fallback == nil { // 没有设置降级函数直接返回 return nil, openErr } - return command.executeFallback(param, openErr) // 降级函数。 + return command.contextExecuteFallback(param, openErr) // 降级函数。 } if result, err := command.run(ctx, param); err != nil { + if panicErr, ok := err.(funcPanicError); ok { // 如果是panic错误,统计后依然panic掉。 + command.breaker.Failure() + panic(panicErr.panicObj) + } + + if errors.Is(err, ErrTimeout) { + command.breaker.Timeout() + } else { + command.breaker.Failure() + } + if command.fallback == nil { // 没有设置降级函数直接返回 return nil, err } - return command.executeFallback(result, err) // 降级函数。 + return command.contextExecuteFallback(result, err) // 降级函数。 } else { + command.breaker.Success() return result, nil } } -// executeFallback 用于执行降级函数。 -func (command *Command) executeFallback(param interface{}, err error) (interface{}, error) { - ctx, cancel := context.WithTimeout(context.Background(), command.timeout) - defer cancel() - return command.fallback(ctx, param, err) +// contextExecuteFallback 用于执行降级函数。 +func (command *Command) contextExecuteFallback(param interface{}, err error) (interface{}, error) { + ctx := context.Background() + if command.timeout != nil { + ctxWt, cancel := context.WithTimeout(ctx, *command.timeout) + ctx = ctxWt + defer cancel() + } + res, err := command.fallback(ctx, param, err) + if err != nil { + command.breaker.FallbackFailure() + if panicErr, ok := err.(funcPanicError); ok { // 如果是panic错误,统计后依然panic掉。 + panic(panicErr.panicObj) + } + return res, err + } + command.breaker.FallbackSuccess() + return res, err } // funcResType 将功能函数/降级函数的返回值打包成一个结构。 @@ -115,13 +141,19 @@ type funcResType struct { err error } +// funcPanicError 用于在goroutine中传递Panic错误。 +type funcPanicError struct { + error + panicObj interface{} +} + // wrapCommandFuncWithTimeout 用于对功能函数包装超时处理。 func wrapCommandFuncWithTimeout(command *Command, run CommandFunc) CommandFunc { return func(ctx context.Context, param interface{}) (interface{}, error) { resCh := make(chan funcResType, 1) // 设置一个1的缓冲,以免超时后goroutine泄漏。 panicCh := make(chan interface{}, 1) // 由于放到独立的goroutine中,原本的panic保护会失效,这里做个panic转发,让其回归到原本的goroutine中。 - ctx, cancel := context.WithTimeout(ctx, command.timeout) // 为context加上统一的超时时间。 + ctx, cancel := context.WithTimeout(ctx, *command.timeout) // 为context加上统一的超时时间。 defer cancel() go func() { @@ -138,20 +170,12 @@ func wrapCommandFuncWithTimeout(command *Command, run CommandFunc) CommandFunc { select { case <-ctx.Done(): if errors.Is(ctx.Err(), context.DeadlineExceeded) { - command.breaker.Timeout() return nil, fmt.Errorf("%s: %w", command.name, ErrTimeout) } - command.breaker.Failure() return nil, fmt.Errorf("%s: %w", command.name, ctx.Err()) - case err := <-panicCh: - command.breaker.Failure() - panic(err) // 接收goroutine转发过来的panic。 + case panicObj := <-panicCh: + return nil, funcPanicError{errors.New("panic"), panicObj} // 接收goroutine转发过来的panic。 case res := <-resCh: - if res.err != nil { - command.breaker.Failure() - } else { - command.breaker.Success() - } return res.res, res.err } } @@ -177,20 +201,12 @@ func wrapCommandFallbackFuncWithTimeout(command *Command, run CommandFallbackFun select { case <-ctx.Done(): if errors.Is(ctx.Err(), context.DeadlineExceeded) { - command.breaker.FallbackFailure() return nil, fmt.Errorf("%s: %w", command.name, ErrTimeout) } - command.breaker.FallbackFailure() return nil, fmt.Errorf("%s: %w", command.name, ctx.Err()) - case err := <-panicCh: - command.breaker.FallbackFailure() - panic(err) // 接收goroutine转发过来的panic。 + case panicObj := <-panicCh: + return nil, funcPanicError{errors.New("panic"), panicObj} // 接收goroutine转发过来的panic。 case res := <-resCh: - if res.err != nil { - command.breaker.FallbackFailure() - } else { - command.breaker.FallbackSuccess() - } return res.res, res.err } } @@ -213,7 +229,7 @@ func WithCommandBreaker(breaker breaker.Breaker) CommandOptionFunc { // WithCommandBreaker 用于为Command设置默认超时。 func WithCommandTimeout(timeout time.Duration) CommandOptionFunc { return func(c *Command) { - c.timeout = timeout + c.timeout = &timeout } } diff --git a/command_test.go b/command_test.go index 3ec7e7e..7be1534 100644 --- a/command_test.go +++ b/command_test.go @@ -23,9 +23,77 @@ func TestCommand_workflow(t *testing.T) { return nil, fmt.Errorf("fallback: %w", e) } // 初始化Command。 - command := NewCommand("test", run, - WithCommandFallback(fallback), - WithCommandTimeout(time.Second*3)) + command := NewCommand("test", run, WithCommandFallback(fallback)) + defer command.Close() + + for i := 0; i < 10000; i++ { + r, err := command.Execute(i) + + // 前5000正常。 + if i < 5000 { + if err != nil { + t.Errorf("Command.Execute() got = %v, wantErr %v", err, nil) + } + if r.(int) != i+1 { + t.Errorf("Command.Execute() got = %v, want %v", r, i+1) + } + continue + } + + // 后5000错误。 + if err == nil || err.Error() != "fallback: more then 5000" { + t.Errorf("Command.Execute() got = %v, wantErr %v", err, "fallback: more then 5000") + } + } + + // 再一个熔断。 + if _, err := command.Execute(10001); err == nil || err.Error() != "fallback: test: open: command: unavailable" { + t.Errorf("Command.Execute() got = %v, want %v", err, "fallback: test: open: command: unavailable") + } + + // 熔断中,正常的也熔断。 + if _, err := command.Execute(1); err == nil || err.Error() != "fallback: test: open: command: unavailable" { + t.Errorf("Command.Execute() got = %v, want %v", err, "fallback: test: open: command: unavailable") + } + + time.Sleep(5 * time.Second) + // 进入半熔断了,放入一个错误的。 + if _, err := command.Execute(10001); err == nil || err.Error() != "fallback: more then 5000" { + t.Errorf("Command.Execute() got = %v, want %v", err, "fallback: more then 5000") + } + // 由于刚放了个错误的进行半熔断测试,又恢复熔断了。 + if _, err := command.Execute(1); err == nil || err.Error() != "fallback: test: open: command: unavailable" { + t.Errorf("Command.Execute() got = %v, want %v", err, "fallback: test: open: command: unavailable") + } + + time.Sleep(5 * time.Second) + // 进入半熔断了,放入一个正常的。 + if _, err := command.Execute(1); err != nil { + t.Errorf("Command.Execute() got = %v, want %v", err, nil) + } + // 恢复了。 + if _, err := command.Execute(2); err != nil { + t.Errorf("Command.Execute() got = %v, want %v", err, nil) + } +} + +// TestCommand_withtimeout_workflow 由于加入超时机制后,将放入独立的goroutine中运行,执行流程与原本有区别,故独立测试一份。 +func TestCommand_withtimeout_workflow(t *testing.T) { + // 功能函数。 + run := func(ctx context.Context, i interface{}) (interface{}, error) { + param := i.(int) + param++ + if param > 5000 { + return nil, errors.New("more then 5000") + } + return param, nil + } + // 降级函数。 + fallback := func(ctx context.Context, i interface{}, e error) (interface{}, error) { + return nil, fmt.Errorf("fallback: %w", e) + } + // 初始化Command。 + command := NewCommand("test", run, WithCommandFallback(fallback), WithCommandTimeout(time.Second*2)) defer command.Close() for i := 0; i < 10000; i++ { @@ -88,7 +156,7 @@ func TestCommand_timeout(t *testing.T) { // 初始化Command。 command := NewCommand("test", run, - WithCommandTimeout(time.Second*5)) + WithCommandTimeout(time.Second*2)) defer command.Close() // 还没超时。 @@ -97,20 +165,20 @@ func TestCommand_timeout(t *testing.T) { } // 超过默认超时。 - if _, err := command.Execute(6); !errors.Is(err, ErrTimeout) { + if _, err := command.Execute(3); !errors.Is(err, ErrTimeout) { t.Errorf("Command.Execute() got = %v, want nil", err) } // 测试下传入的超时。 startTime := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - if _, err := command.ContextExecute(ctx, 5); !errors.Is(err, ErrTimeout) { + if _, err := command.ContextExecute(ctx, 2); !errors.Is(err, ErrTimeout) { t.Errorf("Command.ContextExecute() got = %v, want %v", err, ErrTimeout) } - // 此时应该时间过去两秒左右,允许一点时差。 - if time.Since(startTime) > time.Second*2+time.Millisecond*100 { - t.Errorf("Command.ContextExecute() got = %v, want less than %v", time.Since(startTime), time.Second*2+time.Millisecond*100) + // 此时应该时间过去1秒左右,允许一点时差。 + if time.Since(startTime) > time.Second+time.Millisecond*100 { + t.Errorf("Command.ContextExecute() got = %v, want less than %v", time.Since(startTime), time.Second+time.Millisecond*100) } } @@ -127,7 +195,7 @@ func TestCommand_fallback_timeout(t *testing.T) { // 初始化Command。 command := NewCommand("test", run, WithCommandFallback(fallback), - WithCommandTimeout(time.Second*5)) + WithCommandTimeout(time.Second*2)) defer command.Close() // 还没超时。 @@ -136,7 +204,7 @@ func TestCommand_fallback_timeout(t *testing.T) { } // 超过默认超时。 - if _, err := command.Execute(6); !errors.Is(err, ErrTimeout) { + if _, err := command.Execute(3); !errors.Is(err, ErrTimeout) { t.Errorf("Command.Execute() got = %v, want nil", err) } }