From b4a777d25becf8ae12481b804a45dd52142e7e14 Mon Sep 17 00:00:00 2001 From: Phu Ngo <12547020+NgoKimPhu@users.noreply.github.com> Date: Wed, 20 Dec 2023 09:59:59 +0700 Subject: [PATCH 1/2] feat: add batcher to batch tasks --- batcher.go | 218 ++++++++++++++++++++++++++++++++++++++++++++++++ batcher_test.go | 143 +++++++++++++++++++++++++++++++ go.mod | 22 +++-- go.sum | 41 +++++++-- mocks/mocks.go | 127 ++++++++++++++++++++++++++++ 5 files changed, 535 insertions(+), 16 deletions(-) create mode 100644 batcher.go create mode 100644 batcher_test.go create mode 100644 mocks/mocks.go diff --git a/batcher.go b/batcher.go new file mode 100644 index 0000000..481c6f9 --- /dev/null +++ b/batcher.go @@ -0,0 +1,218 @@ +package kutils + +import ( + "context" + "math" + "runtime" + "runtime/debug" + "sync/atomic" + "time" + + "github.com/KyberNetwork/logger" + "github.com/pkg/errors" +) + +//go:generate mockgen -source=batcher.go -destination mocks/mocks.go -package mocks + +var ( + ErrBatcherClosed = errors.New("batcher closed") +) + +// BatchableTask represents a batchable task +type BatchableTask[R any] interface { + Ctx() context.Context // The context of this task + Done() <-chan struct{} // Signals if this task was already resolved + IsDone() bool // Checks (non-blocking) if this task was already resolved + Result() (R, error) // Blocks until this task is resolved and returns result and error + Resolve(ret R, err error) // Resolves this task with return value and error +} + +// ChanTask uses a done channel to signal resolution of return value and error +type ChanTask[R any] struct { + ctx context.Context + done chan struct{} + Ret R + Err error +} + +func NewChanTask[R any](ctx context.Context) *ChanTask[R] { + if ctx == nil { + ctx = context.Background() + } + return &ChanTask[R]{ + ctx: ctx, + done: make(chan struct{}), + } +} + +func (c *ChanTask[R]) Ctx() context.Context { + return c.ctx +} + +func (c *ChanTask[R]) Done() <-chan struct{} { + return c.done +} + +func (c *ChanTask[R]) IsDone() bool { + select { + case <-c.done: + return true + default: + return false + } +} + +func (c *ChanTask[R]) Result() (R, error) { + if c.IsDone() { + return c.Ret, c.Err + } + select { + case <-c.done: + return c.Ret, c.Err + case <-c.ctx.Done(): + return *new(R), c.ctx.Err() + } +} + +func (c *ChanTask[R]) Resolve(ret R, err error) { + select { + case <-c.done: + logger.Errorf("ChanTask.Resolve|called twice, ignored|c.Ret=%v,c.Err=%v|Ret=%v,Err=%v", c.Ret, c.Err, ret, err) + default: + c.Ret, c.Err = ret, err + close(c.done) + } +} + +// Batcher batches together n BatchableTask's together and executes a logic for a batch of BatchableTask's. +// It skips BatchableTask's with cancelled Ctx and resolve those tasks with the context's error. +// Batch logic execution should signal each BatchableTask as done by using its Resolve method. +type Batcher[T BatchableTask[R], R any] interface { + // Batch submits a BatchableTask to the batcher. + Batch(task T) + // Close should stop Batch from being called and clean up any background resources. + Close() +} + +// BatchCfg provides batchRate and batchCnt configs for a ChanBatcher. ChanBatcher will trigger a batch processing +// either if no more task is queued after batchRate, or batchCnt BatchableTask's are already queued. +type BatchCfg func() (batchRate time.Duration, batchCnt int) + +// BatchFn is called for a batch of tasks collected and triggered by a ChanBatcher per its batchCfg. +type BatchFn[T any] func([]T) + +// ChanBatcher implements Batcher using golang channel. +type ChanBatcher[T BatchableTask[R], R any] struct { + batchCfg BatchCfg + batchFn BatchFn[T] + taskCh chan T + closed atomic.Bool +} + +func NewChanBatcher[T BatchableTask[R], R any](batchCfg BatchCfg, batchFn BatchFn[T]) *ChanBatcher[T, R] { + _, batchCnt := batchCfg() + chanBatcher := &ChanBatcher[T, R]{ + batchCfg: batchCfg, + batchFn: batchFn, + taskCh: make(chan T, 16*batchCnt), + } + go chanBatcher.worker() + return chanBatcher +} + +// Batch submits a BatchableTask to the channel if this chanBatcher hasn't been closed. +func (b *ChanBatcher[T, R]) Batch(task T) { + if !b.closed.Load() { + b.taskCh <- task + } else { + task.Resolve(*new(R), ErrBatcherClosed) + } +} + +// Close closes this chanBatcher to prevents Batch-ing new BatchableTask's and tell the worker goroutine to finish up. +func (b *ChanBatcher[_, _]) Close() { + if !b.closed.Swap(true) { + close(b.taskCh) + } +} + +// goBatchFn +func (b *ChanBatcher[T, R]) batchFnWithRecover(tasks []T) { + defer func() { + p := recover() + if p == nil { + return + } + logger.Errorf("ChanBatcher.goBatchFn|recovered from panic: %v\n%s", p, string(debug.Stack())) + var ret R + for _, task := range tasks { + if task.IsDone() { + continue + } + if err, ok := p.(error); ok { + task.Resolve(ret, errors.Wrap(err, "batchFn panicked")) + } else { + task.Resolve(ret, errors.Errorf("batchFn panicked: %v", p)) + } + } + }() + b.batchFn(tasks) +} + +// worker batches up BatchableTask's in taskCh per batchCfg (per at most batchRate ns and at most batchCnt BatchableTask's) +// and triggers batchFn with each batch. +func (b *ChanBatcher[T, R]) worker() { + defer func() { + if p := recover(); p != nil { + logger.Errorf("ChanBatcher.worker|recovered from panic: %v\n%s", p, string(debug.Stack())) + } + }() + var tasks []T + batchTimer := time.NewTimer(time.Duration(math.MaxInt64)) + for { + runtime.Gosched() // in case GOMAXPROCS is 1, we need to cooperatively yield + select { + case <-batchTimer.C: + if len(tasks) == 0 { + break + } + logger.Debugf("ChanBatcher.worker|timer|%d tasks", len(tasks)) + go b.batchFnWithRecover(tasks) + tasks = tasks[:0:0] + case task, ok := <-b.taskCh: + if !ok { + logger.Debugf("ChanBatcher.worker|closed|%d tasks", len(tasks)) + if len(tasks) > 0 { + go b.batchFnWithRecover(tasks) + } + return + } + if !task.IsDone() { + select { + case <-task.Ctx().Done(): + logger.Infof("ChanBatcher.worker|skip|task=%v", task) + task.Resolve(*new(R), task.Ctx().Err()) + continue + default: + } + } + duration, batchCount := b.batchCfg() + if len(tasks) == 0 { + logger.Debugf("ChanBatcher.worker|timer start|duration=%s", duration) + if !batchTimer.Stop() { + select { + case <-batchTimer.C: + default: + } + } + batchTimer.Reset(duration) + } + tasks = append(tasks, task) + if len(tasks) >= batchCount { + logger.Debugf("ChanBatcher.worker|max|%d tasks", len(tasks)) + go b.batchFnWithRecover(tasks) + tasks = tasks[:0:0] + } + } + } +} diff --git a/batcher_test.go b/batcher_test.go new file mode 100644 index 0000000..19bb166 --- /dev/null +++ b/batcher_test.go @@ -0,0 +1,143 @@ +package kutils + +import ( + "context" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/KyberNetwork/logger" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestChanBatcher(t *testing.T) { + ctx := context.Background() + batchRate := 10 * time.Millisecond + batchFn := func(_ []*ChanTask[time.Duration]) {} + batcher := NewChanBatcher[*ChanTask[time.Duration], time.Duration](func() (time.Duration, int) { + return batchRate, 2 + }, func(tasks []*ChanTask[time.Duration]) { batchFn(tasks) }) + var cnt atomic.Uint32 + start := time.Now() + batchFn = func(tasks []*ChanTask[time.Duration]) { + cnt.Add(1) + for _, task := range tasks { + task.Resolve(time.Since(start), nil) + } + } + task0 := NewChanTask[time.Duration](ctx) + task1 := NewChanTask[time.Duration](ctx) + task2 := NewChanTask[time.Duration](ctx) + + t.Run("happy", func(t *testing.T) { + batcher.Batch(task0) + batcher.Batch(task1) + _, _ = task0.Result() + assert.EqualValues(t, 1, cnt.Load()) + assert.NoError(t, task0.Err) + assert.Less(t, task0.Ret, batchRate) + ret, err := task1.Result() + assert.NoError(t, err) + assert.Less(t, ret, batchRate) + time.Sleep(batchRate * 11 / 10) + runtime.Gosched() + + batcher.Batch(task2) + assert.False(t, task2.IsDone()) + ret, err = task2.Result() + assert.True(t, task2.IsDone()) + assert.EqualValues(t, 2, cnt.Load()) + assert.Equal(t, task2.Err, err) + assert.NoError(t, task2.Err) + assert.Equal(t, task2.Ret, ret) + assert.Greater(t, ret, batchRate) + }) + + t.Run("spam", func(t *testing.T) { + batcher := NewChanBatcher[*ChanTask[int], int](func() (time.Duration, int) { return 0, 0 }, + func(tasks []*ChanTask[int]) { + for _, task := range tasks { + task.Resolve(0, nil) + } + }) + const taskCnt = 1000 + tasks := make([]*ChanTask[int], taskCnt) + start := time.Now() + for i := 0; i < taskCnt; i++ { + tasks[i] = NewChanTask[int](ctx) + batcher.Batch(tasks[i]) + } + // 1k: 2.561804ms; 1M: 2.62s - average overhead per task = 2.6µs + logger.Warnf("done %d tasks in %v", taskCnt, time.Since(start)) + for i := 0; i < taskCnt; i++ { + ret, err := tasks[i].Result() + assert.NoError(t, err) + assert.EqualValues(t, 0, ret) + } + batcher.Close() + }) + + t.Run("resolve twice", func(t *testing.T) { + task0.Resolve(batchRate, nil) + assert.NoError(t, task0.Err) + assert.Less(t, task0.Ret, batchRate) + }) + + t.Run("recover from panic", func(t *testing.T) { + oldBatchFn := batchFn + batchFn = func(tasks []*ChanTask[time.Duration]) { + panic("test panic") + } + task0 = NewChanTask[time.Duration](ctx) + task1 = NewChanTask[time.Duration](ctx) + task0.Resolve(0, nil) + batcher.Batch(task0) + batcher.Batch(task1) + <-task1.Done() + assert.ErrorContains(t, task1.Err, "test panic") + + panicErr := errors.New("test panic error") + batchFn = func(tasks []*ChanTask[time.Duration]) { + panic(panicErr) + } + task0 = NewChanTask[time.Duration](ctx) + task1 = NewChanTask[time.Duration](ctx) + batcher.Batch(task0) + batcher.Batch(task1) + <-task1.Done() + assert.ErrorIs(t, task0.Err, panicErr) + assert.ErrorIs(t, task1.Err, panicErr) + + batchFn = oldBatchFn + task2 = NewChanTask[time.Duration](nil) // nolint:staticcheck + batcher.Batch(task2) + batcher.Batch(task2) + ret, err := task2.Result() + assert.NoError(t, err) + assert.Greater(t, ret, batchRate) + }) + + t.Run("cancelled task", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + task0 = NewChanTask[time.Duration](ctx) + batcher.Batch(task0) + cancel() + _, err := task0.Result() + assert.ErrorIs(t, err, context.Canceled) + }) + + t.Run("close", func(t *testing.T) { + batcher.Batch(task2) + batcher.Close() + task3 := NewChanTask[time.Duration](ctx) + batcher.Batch(task3) + assert.ErrorIs(t, task3.Err, ErrBatcherClosed) + }) + + t.Run("invalid task", func(t *testing.T) { + NewChanBatcher[*ChanTask[int], int](func() (time.Duration, int) { return 0, 0 }, + nil).Batch(&ChanTask[int]{}) + }) +} diff --git a/go.mod b/go.mod index 7eda449..2e89ee8 100644 --- a/go.mod +++ b/go.mod @@ -3,26 +3,34 @@ module github.com/KyberNetwork/kutils go 1.20 require ( + github.com/KyberNetwork/logger v0.1.0 github.com/bytedance/sonic v1.10.2 github.com/go-resty/resty/v2 v2.10.0 github.com/goccy/go-json v0.10.2 - github.com/hashicorp/go-retryablehttp v0.7.4 + github.com/hashicorp/go-retryablehttp v0.7.5 github.com/json-iterator/go v1.1.12 + github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 - golang.org/x/exp v0.0.0-20231005195138-3e424a577f31 + go.uber.org/mock v0.3.0 + golang.org/x/exp v0.0.0-20231219160207-73b9e39aefca ) require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect - github.com/chenzhuoyu/iasm v0.9.0 // indirect + github.com/chenzhuoyu/iasm v0.9.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/klauspost/cpuid/v2 v2.0.9 // indirect - github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/klauspost/cpuid/v2 v2.2.6 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect - golang.org/x/net v0.17.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.26.0 // indirect + golang.org/x/arch v0.6.0 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.15.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index cc322e4..4d9a7ba 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/KyberNetwork/logger v0.1.0 h1:Iibu9Ls+tipjR+C0iXhzUYM1VtRgmmR1HHWGufPYcbs= +github.com/KyberNetwork/logger v0.1.0/go.mod h1:zBqHbtJ3nJn6HQnp6UW8pbQkR+U6tSRFd5CzfiKL3Kw= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE= @@ -6,8 +8,9 @@ github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0= github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpVsBuRksnlj1mLy4AWzRNQYxauNi62uWcE3to6eA= -github.com/chenzhuoyu/iasm v0.9.0 h1:9fhXjVzq5hUy2gkhhgHl95zG2cEAhw9OSGs8toWWAwo= github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= +github.com/chenzhuoyu/iasm v0.9.1 h1:tUHQJXo3NhBqw6s33wkGn9SP3bvrWLdlVIJ3hQBL7P0= +github.com/chenzhuoyu/iasm v0.9.1/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -20,19 +23,25 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= -github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M= +github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= +github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -47,13 +56,21 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo= +go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.6.0 h1:S0JTfE48HbRj80+4tbvZDYsJ3tGv6BUU3XxyZ7CirAc= +golang.org/x/arch v0.6.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/exp v0.0.0-20231005195138-3e424a577f31 h1:9k5exFQKQglLo+RoP+4zMjOFE14P6+vyR0baDAi0Rcs= -golang.org/x/exp v0.0.0-20231005195138-3e424a577f31/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/exp v0.0.0-20231219160207-73b9e39aefca h1:+xQfFu/HO/82Wwg4zuJ5xiLp0yaOLJjBGnuafXp85YQ= +golang.org/x/exp v0.0.0-20231219160207-73b9e39aefca/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -61,8 +78,9 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -70,10 +88,13 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -94,6 +115,8 @@ golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/mocks/mocks.go b/mocks/mocks.go new file mode 100644 index 0000000..4f1eb10 --- /dev/null +++ b/mocks/mocks.go @@ -0,0 +1,127 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: batcher.go +// +// Generated by this command: +// +// mockgen -source=batcher.go -destination mocks/mocks.go -package mocks +// +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + kutils "github.com/KyberNetwork/kutils" + gomock "go.uber.org/mock/gomock" +) + +// MockBatchableTask is a mock of BatchableTask interface. +type MockBatchableTask[R any] struct { + ctrl *gomock.Controller + recorder *MockBatchableTaskMockRecorder[R] +} + +// MockBatchableTaskMockRecorder is the mock recorder for MockBatchableTask. +type MockBatchableTaskMockRecorder[R any] struct { + mock *MockBatchableTask[R] +} + +// NewMockBatchableTask creates a new mock instance. +func NewMockBatchableTask[R any](ctrl *gomock.Controller) *MockBatchableTask[R] { + mock := &MockBatchableTask[R]{ctrl: ctrl} + mock.recorder = &MockBatchableTaskMockRecorder[R]{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchableTask[R]) EXPECT() *MockBatchableTaskMockRecorder[R] { + return m.recorder +} + +// Ctx mocks base method. +func (m *MockBatchableTask[R]) Ctx() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ctx") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Ctx indicates an expected call of Ctx. +func (mr *MockBatchableTaskMockRecorder[R]) Ctx() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ctx", reflect.TypeOf((*MockBatchableTask[R])(nil).Ctx)) +} + +// Done mocks base method. +func (m *MockBatchableTask[R]) Done() <-chan struct{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Done") + ret0, _ := ret[0].(<-chan struct{}) + return ret0 +} + +// Done indicates an expected call of Done. +func (mr *MockBatchableTaskMockRecorder[R]) Done() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockBatchableTask[R])(nil).Done)) +} + +// Resolve mocks base method. +func (m *MockBatchableTask[R]) Resolve(ret R, err error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Resolve", ret, err) +} + +// Resolve indicates an expected call of Resolve. +func (mr *MockBatchableTaskMockRecorder[R]) Resolve(ret, err any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resolve", reflect.TypeOf((*MockBatchableTask[R])(nil).Resolve), ret, err) +} + +// MockBatcher is a mock of Batcher interface. +type MockBatcher[T kutils.BatchableTask[R], R any] struct { + ctrl *gomock.Controller + recorder *MockBatcherMockRecorder[T, R] +} + +// MockBatcherMockRecorder is the mock recorder for MockBatcher. +type MockBatcherMockRecorder[T kutils.BatchableTask[R], R any] struct { + mock *MockBatcher[T, R] +} + +// NewMockBatcher creates a new mock instance. +func NewMockBatcher[T kutils.BatchableTask[R], R any](ctrl *gomock.Controller) *MockBatcher[T, R] { + mock := &MockBatcher[T, R]{ctrl: ctrl} + mock.recorder = &MockBatcherMockRecorder[T, R]{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatcher[T, R]) EXPECT() *MockBatcherMockRecorder[T, R] { + return m.recorder +} + +// Batch mocks base method. +func (m *MockBatcher[T, R]) Batch(task T) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Batch", task) +} + +// Batch indicates an expected call of Batch. +func (mr *MockBatcherMockRecorder[T, R]) Batch(task any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Batch", reflect.TypeOf((*MockBatcher[T, R])(nil).Batch), task) +} + +// Close mocks base method. +func (m *MockBatcher[T, R]) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockBatcherMockRecorder[T, R]) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockBatcher[T, R])(nil).Close)) +} From 7f85b5c8d5f78dff90ec1bac01b1e6d917f7704f Mon Sep 17 00:00:00 2001 From: Phu Ngo <12547020+NgoKimPhu@users.noreply.github.com> Date: Fri, 29 Dec 2023 10:00:43 +0700 Subject: [PATCH 2/2] refactor: get resolve err before resolving in loop --- batcher.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/batcher.go b/batcher.go index 481c6f9..c47d892 100644 --- a/batcher.go +++ b/batcher.go @@ -145,15 +145,17 @@ func (b *ChanBatcher[T, R]) batchFnWithRecover(tasks []T) { } logger.Errorf("ChanBatcher.goBatchFn|recovered from panic: %v\n%s", p, string(debug.Stack())) var ret R + err, ok := p.(error) + if ok { + err = errors.Wrap(err, "batchFn panicked") + } else { + err = errors.Errorf("batchFn panicked: %v", p) + } for _, task := range tasks { if task.IsDone() { continue } - if err, ok := p.(error); ok { - task.Resolve(ret, errors.Wrap(err, "batchFn panicked")) - } else { - task.Resolve(ret, errors.Errorf("batchFn panicked: %v", p)) - } + task.Resolve(ret, err) } }() b.batchFn(tasks)