Skip to content

Commit

Permalink
重构超时逻辑,当没有设置超时时间时,不加入额外超时空置逻辑,请求直接放在当前goroutine执行,以提高性能。
Browse files Browse the repository at this point in the history
  • Loading branch information
bunnier committed Jul 22, 2021
1 parent f35dae3 commit 15b8e6e
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 56 deletions.
2 changes: 1 addition & 1 deletion breaker/cutbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (b *CutBreaker) allow(summary *internal.MetricSummary) (bool, string) {
return false, "half-open" // 半开状态,说明已经有一个请求正在尝试,拒绝所有其它请求。

case Openning:
// 判断是否已经达到熔断时间
// 判断是否已过休眠时间
if time.Since(summary.LastExecuteTime) < b.sleepWindow {
return false, "open"
}
Expand Down
102 changes: 59 additions & 43 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type CommandFunc func(context.Context, interface{}) (interface{}, error)
// error 为功能返回值的error。
type CommandFallbackFunc func(context.Context, interface{}, error) (interface{}, error) // 降级函数签名。

var ErrTimeout error = errors.New("command: timeout") // 服务执行超时。
var ErrFallback error = errors.New("command: unavailable") // 服务不可用(熔断器开启后返回)。
var ErrTimeout error = errors.New("command: timeout") // 服务执行超时。
var ErrUnavailable error = errors.New("command: unavailable") // 服务不可用(熔断器开启后返回)。

// 在断路器中执行的命令对象。
type Command struct {
Expand All @@ -34,7 +34,7 @@ type Command struct {
run CommandFunc // 功能函数。
fallback CommandFallbackFunc // 降级函数。

timeout time.Duration // 超时时间。
timeout *time.Duration // 超时时间。

breaker breaker.Breaker // 熔断器。
}
Expand All @@ -43,9 +43,9 @@ func NewCommand(name string, run CommandFunc, options ...CommandOptionFunc) *Com
ctx, cancel := context.WithCancel(context.Background()) // 这个context主要用于处理内部的资源释放,而非执行功能函数。

command := &Command{
cancel: cancel,
name: name,
timeout: time.Second * 10, // 默认超时10s。
cancel: cancel,
name: name,
run: run,
}

for _, option := range options {
Expand All @@ -62,13 +62,14 @@ func NewCommand(name string, run CommandFunc, options ...CommandOptionFunc) *Com
breaker.WithCutBreakerSleepWindow(5*time.Second))
}

// 对run方法包装一层超时处理。
command.run = wrapCommandFuncWithTimeout(command, run)
if command.timeout != nil {
command.run = wrapCommandFuncWithTimeout(command, command.run)

// 如果有降级函数,也打包一层超时处理。
// 执行时将通过command的默认超时时间新建一个context,不会复用功能函数的,以免累计超时时间。
if command.fallback != nil {
command.fallback = wrapCommandFallbackFuncWithTimeout(command, command.fallback)
// 如果有降级函数,也打包一层超时处理。
// 执行时将通过command的默认超时时间新建一个context,不会复用功能函数的,以免累计超时时间。
if command.fallback != nil {
command.fallback = wrapCommandFallbackFuncWithTimeout(command, command.fallback)
}
}

return command
Expand All @@ -83,30 +84,55 @@ func (command *Command) Execute(param interface{}) (interface{}, error) {
func (command *Command) ContextExecute(ctx context.Context, param interface{}) (interface{}, error) {
pass, statusMsg := command.breaker.Allow()

// 已经熔断走降级逻辑
// 已经熔断直接走降级逻辑
if !pass {
openErr := fmt.Errorf("%s: %s: %w", command.name, statusMsg, ErrFallback)
openErr := fmt.Errorf("%s: %s: %w", command.name, statusMsg, ErrUnavailable)
if command.fallback == nil { // 没有设置降级函数直接返回
return nil, openErr
}
return command.executeFallback(param, openErr) // 降级函数。
return command.contextExecuteFallback(param, openErr) // 降级函数。
}

if result, err := command.run(ctx, param); err != nil {
if panicErr, ok := err.(funcPanicError); ok { // 如果是panic错误,统计后依然panic掉。
command.breaker.Failure()
panic(panicErr.panicObj)
}

if errors.Is(err, ErrTimeout) {
command.breaker.Timeout()
} else {
command.breaker.Failure()
}

if command.fallback == nil { // 没有设置降级函数直接返回
return nil, err
}
return command.executeFallback(result, err) // 降级函数。
return command.contextExecuteFallback(result, err) // 降级函数。
} else {
command.breaker.Success()
return result, nil
}
}

// executeFallback 用于执行降级函数。
func (command *Command) executeFallback(param interface{}, err error) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), command.timeout)
defer cancel()
return command.fallback(ctx, param, err)
// contextExecuteFallback 用于执行降级函数。
func (command *Command) contextExecuteFallback(param interface{}, err error) (interface{}, error) {
ctx := context.Background()
if command.timeout != nil {
ctxWt, cancel := context.WithTimeout(ctx, *command.timeout)
ctx = ctxWt
defer cancel()
}
res, err := command.fallback(ctx, param, err)
if err != nil {
command.breaker.FallbackFailure()
if panicErr, ok := err.(funcPanicError); ok { // 如果是panic错误,统计后依然panic掉。
panic(panicErr.panicObj)
}
return res, err
}
command.breaker.FallbackSuccess()
return res, err
}

// funcResType 将功能函数/降级函数的返回值打包成一个结构。
Expand All @@ -115,13 +141,19 @@ type funcResType struct {
err error
}

// funcPanicError 用于在goroutine中传递Panic错误。
type funcPanicError struct {
error
panicObj interface{}
}

// wrapCommandFuncWithTimeout 用于对功能函数包装超时处理。
func wrapCommandFuncWithTimeout(command *Command, run CommandFunc) CommandFunc {
return func(ctx context.Context, param interface{}) (interface{}, error) {
resCh := make(chan funcResType, 1) // 设置一个1的缓冲,以免超时后goroutine泄漏。
panicCh := make(chan interface{}, 1) // 由于放到独立的goroutine中,原本的panic保护会失效,这里做个panic转发,让其回归到原本的goroutine中。

ctx, cancel := context.WithTimeout(ctx, command.timeout) // 为context加上统一的超时时间。
ctx, cancel := context.WithTimeout(ctx, *command.timeout) // 为context加上统一的超时时间。
defer cancel()

go func() {
Expand All @@ -138,20 +170,12 @@ func wrapCommandFuncWithTimeout(command *Command, run CommandFunc) CommandFunc {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
command.breaker.Timeout()
return nil, fmt.Errorf("%s: %w", command.name, ErrTimeout)
}
command.breaker.Failure()
return nil, fmt.Errorf("%s: %w", command.name, ctx.Err())
case err := <-panicCh:
command.breaker.Failure()
panic(err) // 接收goroutine转发过来的panic。
case panicObj := <-panicCh:
return nil, funcPanicError{errors.New("panic"), panicObj} // 接收goroutine转发过来的panic。
case res := <-resCh:
if res.err != nil {
command.breaker.Failure()
} else {
command.breaker.Success()
}
return res.res, res.err
}
}
Expand All @@ -177,20 +201,12 @@ func wrapCommandFallbackFuncWithTimeout(command *Command, run CommandFallbackFun
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
command.breaker.FallbackFailure()
return nil, fmt.Errorf("%s: %w", command.name, ErrTimeout)
}
command.breaker.FallbackFailure()
return nil, fmt.Errorf("%s: %w", command.name, ctx.Err())
case err := <-panicCh:
command.breaker.FallbackFailure()
panic(err) // 接收goroutine转发过来的panic。
case panicObj := <-panicCh:
return nil, funcPanicError{errors.New("panic"), panicObj} // 接收goroutine转发过来的panic。
case res := <-resCh:
if res.err != nil {
command.breaker.FallbackFailure()
} else {
command.breaker.FallbackSuccess()
}
return res.res, res.err
}
}
Expand All @@ -213,7 +229,7 @@ func WithCommandBreaker(breaker breaker.Breaker) CommandOptionFunc {
// WithCommandBreaker 用于为Command设置默认超时。
func WithCommandTimeout(timeout time.Duration) CommandOptionFunc {
return func(c *Command) {
c.timeout = timeout
c.timeout = &timeout
}
}

Expand Down
92 changes: 80 additions & 12 deletions command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,77 @@ func TestCommand_workflow(t *testing.T) {
return nil, fmt.Errorf("fallback: %w", e)
}
// 初始化Command。
command := NewCommand("test", run,
WithCommandFallback(fallback),
WithCommandTimeout(time.Second*3))
command := NewCommand("test", run, WithCommandFallback(fallback))
defer command.Close()

for i := 0; i < 10000; i++ {
r, err := command.Execute(i)

// 前5000正常。
if i < 5000 {
if err != nil {
t.Errorf("Command.Execute() got = %v, wantErr %v", err, nil)
}
if r.(int) != i+1 {
t.Errorf("Command.Execute() got = %v, want %v", r, i+1)
}
continue
}

// 后5000错误。
if err == nil || err.Error() != "fallback: more then 5000" {
t.Errorf("Command.Execute() got = %v, wantErr %v", err, "fallback: more then 5000")
}
}

// 再一个熔断。
if _, err := command.Execute(10001); err == nil || err.Error() != "fallback: test: open: command: unavailable" {
t.Errorf("Command.Execute() got = %v, want %v", err, "fallback: test: open: command: unavailable")
}

// 熔断中,正常的也熔断。
if _, err := command.Execute(1); err == nil || err.Error() != "fallback: test: open: command: unavailable" {
t.Errorf("Command.Execute() got = %v, want %v", err, "fallback: test: open: command: unavailable")
}

time.Sleep(5 * time.Second)
// 进入半熔断了,放入一个错误的。
if _, err := command.Execute(10001); err == nil || err.Error() != "fallback: more then 5000" {
t.Errorf("Command.Execute() got = %v, want %v", err, "fallback: more then 5000")
}
// 由于刚放了个错误的进行半熔断测试,又恢复熔断了。
if _, err := command.Execute(1); err == nil || err.Error() != "fallback: test: open: command: unavailable" {
t.Errorf("Command.Execute() got = %v, want %v", err, "fallback: test: open: command: unavailable")
}

time.Sleep(5 * time.Second)
// 进入半熔断了,放入一个正常的。
if _, err := command.Execute(1); err != nil {
t.Errorf("Command.Execute() got = %v, want %v", err, nil)
}
// 恢复了。
if _, err := command.Execute(2); err != nil {
t.Errorf("Command.Execute() got = %v, want %v", err, nil)
}
}

// TestCommand_withtimeout_workflow 由于加入超时机制后,将放入独立的goroutine中运行,执行流程与原本有区别,故独立测试一份。
func TestCommand_withtimeout_workflow(t *testing.T) {
// 功能函数。
run := func(ctx context.Context, i interface{}) (interface{}, error) {
param := i.(int)
param++
if param > 5000 {
return nil, errors.New("more then 5000")
}
return param, nil
}
// 降级函数。
fallback := func(ctx context.Context, i interface{}, e error) (interface{}, error) {
return nil, fmt.Errorf("fallback: %w", e)
}
// 初始化Command。
command := NewCommand("test", run, WithCommandFallback(fallback), WithCommandTimeout(time.Second*2))
defer command.Close()

for i := 0; i < 10000; i++ {
Expand Down Expand Up @@ -88,7 +156,7 @@ func TestCommand_timeout(t *testing.T) {

// 初始化Command。
command := NewCommand("test", run,
WithCommandTimeout(time.Second*5))
WithCommandTimeout(time.Second*2))
defer command.Close()

// 还没超时。
Expand All @@ -97,20 +165,20 @@ func TestCommand_timeout(t *testing.T) {
}

// 超过默认超时。
if _, err := command.Execute(6); !errors.Is(err, ErrTimeout) {
if _, err := command.Execute(3); !errors.Is(err, ErrTimeout) {
t.Errorf("Command.Execute() got = %v, want nil", err)
}

// 测试下传入的超时。
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := command.ContextExecute(ctx, 5); !errors.Is(err, ErrTimeout) {
if _, err := command.ContextExecute(ctx, 2); !errors.Is(err, ErrTimeout) {
t.Errorf("Command.ContextExecute() got = %v, want %v", err, ErrTimeout)
}
// 此时应该时间过去两秒左右,允许一点时差。
if time.Since(startTime) > time.Second*2+time.Millisecond*100 {
t.Errorf("Command.ContextExecute() got = %v, want less than %v", time.Since(startTime), time.Second*2+time.Millisecond*100)
// 此时应该时间过去1秒左右,允许一点时差。
if time.Since(startTime) > time.Second+time.Millisecond*100 {
t.Errorf("Command.ContextExecute() got = %v, want less than %v", time.Since(startTime), time.Second+time.Millisecond*100)
}
}

Expand All @@ -127,7 +195,7 @@ func TestCommand_fallback_timeout(t *testing.T) {
// 初始化Command。
command := NewCommand("test", run,
WithCommandFallback(fallback),
WithCommandTimeout(time.Second*5))
WithCommandTimeout(time.Second*2))
defer command.Close()

// 还没超时。
Expand All @@ -136,7 +204,7 @@ func TestCommand_fallback_timeout(t *testing.T) {
}

// 超过默认超时。
if _, err := command.Execute(6); !errors.Is(err, ErrTimeout) {
if _, err := command.Execute(3); !errors.Is(err, ErrTimeout) {
t.Errorf("Command.Execute() got = %v, want nil", err)
}
}

0 comments on commit 15b8e6e

Please sign in to comment.