From ed87fa17b356a7ddac3c2b7180c7e800fbf8ad90 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Mon, 18 Nov 2024 13:35:32 -0800 Subject: [PATCH] Add DedupingBucketRateLimiter (#6025) Our use of a `BucketRateLimiter` with `DelayingQueue` is problematic when repeatedly inserting the same items. The former continually moves out the rate limit token time with each new item (no deduping), while the latter dedupes insertions. The net effect is that under load newly inserted items end up with a token time way in the future while the queue happily drains to empty with items processed at a much lower than expected rate. This PR adds a new `DedupingBucketRateLimiter` that maintains historic reservations per item and skips taking out a new reservation when a duplicate item is found with an outstanding reservation. Reservations are dropped from the map when `Forget` is called, which happens after the item is fully processed. This is similar to how the exponential backoff rate limiter works. See [here](https://github.com/kubernetes/client-go/blob/master/util/workqueue/default_rate_limiters.go#L82-L149). - [x] Run new rate limiter under high execution load and verify that queue is kept full but not overflown. Signed-off-by: Andrew Dye --- .../pkg/controller/interfaces/rate_limiter.go | 36 + .../pkg/controller/mocks/limiter.go | 637 ++++++++++++++++++ .../pkg/controller/mocks/reservation.go | 237 +++++++ flytepropeller/pkg/controller/rate_limiter.go | 116 ++++ .../pkg/controller/rate_limiter_test.go | 98 +++ flytepropeller/pkg/controller/workqueue.go | 10 +- 6 files changed, 1127 insertions(+), 7 deletions(-) create mode 100644 flytepropeller/pkg/controller/interfaces/rate_limiter.go create mode 100644 flytepropeller/pkg/controller/mocks/limiter.go create mode 100644 flytepropeller/pkg/controller/mocks/reservation.go create mode 100644 flytepropeller/pkg/controller/rate_limiter.go create mode 100644 flytepropeller/pkg/controller/rate_limiter_test.go diff --git a/flytepropeller/pkg/controller/interfaces/rate_limiter.go b/flytepropeller/pkg/controller/interfaces/rate_limiter.go new file mode 100644 index 0000000000..576d9736c4 --- /dev/null +++ b/flytepropeller/pkg/controller/interfaces/rate_limiter.go @@ -0,0 +1,36 @@ +package interfaces + +import ( + "context" + "time" + + "golang.org/x/time/rate" +) + +//go:generate mockery-v2 --name Limiter --output ../mocks --case=snake --with-expecter +//go:generate mockery-v2 --name Reservation --output ../mocks --case=snake --with-expecter + +type Limiter interface { + Allow() bool + AllowN(t time.Time, n int) bool + Burst() int + Limit() rate.Limit + Reserve() Reservation + ReserveN(t time.Time, n int) Reservation + SetBurst(newBurst int) + SetBurstAt(t time.Time, newBurst int) + SetLimit(newLimit rate.Limit) + SetLimitAt(t time.Time, newLimit rate.Limit) + Tokens() float64 + TokensAt(t time.Time) float64 + Wait(ctx context.Context) (err error) + WaitN(ctx context.Context, n int) (err error) +} + +type Reservation interface { + Cancel() + CancelAt(t time.Time) + Delay() time.Duration + DelayFrom(t time.Time) time.Duration + OK() bool +} diff --git a/flytepropeller/pkg/controller/mocks/limiter.go b/flytepropeller/pkg/controller/mocks/limiter.go new file mode 100644 index 0000000000..709cdd4d65 --- /dev/null +++ b/flytepropeller/pkg/controller/mocks/limiter.go @@ -0,0 +1,637 @@ +// Code generated by mockery v2.40.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + + interfaces "github.com/flyteorg/flyte/flytepropeller/pkg/controller/interfaces" + mock "github.com/stretchr/testify/mock" + + rate "golang.org/x/time/rate" + + time "time" +) + +// Limiter is an autogenerated mock type for the Limiter type +type Limiter struct { + mock.Mock +} + +type Limiter_Expecter struct { + mock *mock.Mock +} + +func (_m *Limiter) EXPECT() *Limiter_Expecter { + return &Limiter_Expecter{mock: &_m.Mock} +} + +// Allow provides a mock function with given fields: +func (_m *Limiter) Allow() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Allow") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Limiter_Allow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Allow' +type Limiter_Allow_Call struct { + *mock.Call +} + +// Allow is a helper method to define mock.On call +func (_e *Limiter_Expecter) Allow() *Limiter_Allow_Call { + return &Limiter_Allow_Call{Call: _e.mock.On("Allow")} +} + +func (_c *Limiter_Allow_Call) Run(run func()) *Limiter_Allow_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Allow_Call) Return(_a0 bool) *Limiter_Allow_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Allow_Call) RunAndReturn(run func() bool) *Limiter_Allow_Call { + _c.Call.Return(run) + return _c +} + +// AllowN provides a mock function with given fields: t, n +func (_m *Limiter) AllowN(t time.Time, n int) bool { + ret := _m.Called(t, n) + + if len(ret) == 0 { + panic("no return value specified for AllowN") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(time.Time, int) bool); ok { + r0 = rf(t, n) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Limiter_AllowN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllowN' +type Limiter_AllowN_Call struct { + *mock.Call +} + +// AllowN is a helper method to define mock.On call +// - t time.Time +// - n int +func (_e *Limiter_Expecter) AllowN(t interface{}, n interface{}) *Limiter_AllowN_Call { + return &Limiter_AllowN_Call{Call: _e.mock.On("AllowN", t, n)} +} + +func (_c *Limiter_AllowN_Call) Run(run func(t time.Time, n int)) *Limiter_AllowN_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time), args[1].(int)) + }) + return _c +} + +func (_c *Limiter_AllowN_Call) Return(_a0 bool) *Limiter_AllowN_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_AllowN_Call) RunAndReturn(run func(time.Time, int) bool) *Limiter_AllowN_Call { + _c.Call.Return(run) + return _c +} + +// Burst provides a mock function with given fields: +func (_m *Limiter) Burst() int { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Burst") + } + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// Limiter_Burst_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Burst' +type Limiter_Burst_Call struct { + *mock.Call +} + +// Burst is a helper method to define mock.On call +func (_e *Limiter_Expecter) Burst() *Limiter_Burst_Call { + return &Limiter_Burst_Call{Call: _e.mock.On("Burst")} +} + +func (_c *Limiter_Burst_Call) Run(run func()) *Limiter_Burst_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Burst_Call) Return(_a0 int) *Limiter_Burst_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Burst_Call) RunAndReturn(run func() int) *Limiter_Burst_Call { + _c.Call.Return(run) + return _c +} + +// Limit provides a mock function with given fields: +func (_m *Limiter) Limit() rate.Limit { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Limit") + } + + var r0 rate.Limit + if rf, ok := ret.Get(0).(func() rate.Limit); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(rate.Limit) + } + + return r0 +} + +// Limiter_Limit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Limit' +type Limiter_Limit_Call struct { + *mock.Call +} + +// Limit is a helper method to define mock.On call +func (_e *Limiter_Expecter) Limit() *Limiter_Limit_Call { + return &Limiter_Limit_Call{Call: _e.mock.On("Limit")} +} + +func (_c *Limiter_Limit_Call) Run(run func()) *Limiter_Limit_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Limit_Call) Return(_a0 rate.Limit) *Limiter_Limit_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Limit_Call) RunAndReturn(run func() rate.Limit) *Limiter_Limit_Call { + _c.Call.Return(run) + return _c +} + +// Reserve provides a mock function with given fields: +func (_m *Limiter) Reserve() interfaces.Reservation { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Reserve") + } + + var r0 interfaces.Reservation + if rf, ok := ret.Get(0).(func() interfaces.Reservation); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interfaces.Reservation) + } + } + + return r0 +} + +// Limiter_Reserve_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reserve' +type Limiter_Reserve_Call struct { + *mock.Call +} + +// Reserve is a helper method to define mock.On call +func (_e *Limiter_Expecter) Reserve() *Limiter_Reserve_Call { + return &Limiter_Reserve_Call{Call: _e.mock.On("Reserve")} +} + +func (_c *Limiter_Reserve_Call) Run(run func()) *Limiter_Reserve_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Reserve_Call) Return(_a0 interfaces.Reservation) *Limiter_Reserve_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Reserve_Call) RunAndReturn(run func() interfaces.Reservation) *Limiter_Reserve_Call { + _c.Call.Return(run) + return _c +} + +// ReserveN provides a mock function with given fields: t, n +func (_m *Limiter) ReserveN(t time.Time, n int) interfaces.Reservation { + ret := _m.Called(t, n) + + if len(ret) == 0 { + panic("no return value specified for ReserveN") + } + + var r0 interfaces.Reservation + if rf, ok := ret.Get(0).(func(time.Time, int) interfaces.Reservation); ok { + r0 = rf(t, n) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interfaces.Reservation) + } + } + + return r0 +} + +// Limiter_ReserveN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReserveN' +type Limiter_ReserveN_Call struct { + *mock.Call +} + +// ReserveN is a helper method to define mock.On call +// - t time.Time +// - n int +func (_e *Limiter_Expecter) ReserveN(t interface{}, n interface{}) *Limiter_ReserveN_Call { + return &Limiter_ReserveN_Call{Call: _e.mock.On("ReserveN", t, n)} +} + +func (_c *Limiter_ReserveN_Call) Run(run func(t time.Time, n int)) *Limiter_ReserveN_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time), args[1].(int)) + }) + return _c +} + +func (_c *Limiter_ReserveN_Call) Return(_a0 interfaces.Reservation) *Limiter_ReserveN_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_ReserveN_Call) RunAndReturn(run func(time.Time, int) interfaces.Reservation) *Limiter_ReserveN_Call { + _c.Call.Return(run) + return _c +} + +// SetBurst provides a mock function with given fields: newBurst +func (_m *Limiter) SetBurst(newBurst int) { + _m.Called(newBurst) +} + +// Limiter_SetBurst_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetBurst' +type Limiter_SetBurst_Call struct { + *mock.Call +} + +// SetBurst is a helper method to define mock.On call +// - newBurst int +func (_e *Limiter_Expecter) SetBurst(newBurst interface{}) *Limiter_SetBurst_Call { + return &Limiter_SetBurst_Call{Call: _e.mock.On("SetBurst", newBurst)} +} + +func (_c *Limiter_SetBurst_Call) Run(run func(newBurst int)) *Limiter_SetBurst_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int)) + }) + return _c +} + +func (_c *Limiter_SetBurst_Call) Return() *Limiter_SetBurst_Call { + _c.Call.Return() + return _c +} + +func (_c *Limiter_SetBurst_Call) RunAndReturn(run func(int)) *Limiter_SetBurst_Call { + _c.Call.Return(run) + return _c +} + +// SetBurstAt provides a mock function with given fields: t, newBurst +func (_m *Limiter) SetBurstAt(t time.Time, newBurst int) { + _m.Called(t, newBurst) +} + +// Limiter_SetBurstAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetBurstAt' +type Limiter_SetBurstAt_Call struct { + *mock.Call +} + +// SetBurstAt is a helper method to define mock.On call +// - t time.Time +// - newBurst int +func (_e *Limiter_Expecter) SetBurstAt(t interface{}, newBurst interface{}) *Limiter_SetBurstAt_Call { + return &Limiter_SetBurstAt_Call{Call: _e.mock.On("SetBurstAt", t, newBurst)} +} + +func (_c *Limiter_SetBurstAt_Call) Run(run func(t time.Time, newBurst int)) *Limiter_SetBurstAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time), args[1].(int)) + }) + return _c +} + +func (_c *Limiter_SetBurstAt_Call) Return() *Limiter_SetBurstAt_Call { + _c.Call.Return() + return _c +} + +func (_c *Limiter_SetBurstAt_Call) RunAndReturn(run func(time.Time, int)) *Limiter_SetBurstAt_Call { + _c.Call.Return(run) + return _c +} + +// SetLimit provides a mock function with given fields: newLimit +func (_m *Limiter) SetLimit(newLimit rate.Limit) { + _m.Called(newLimit) +} + +// Limiter_SetLimit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetLimit' +type Limiter_SetLimit_Call struct { + *mock.Call +} + +// SetLimit is a helper method to define mock.On call +// - newLimit rate.Limit +func (_e *Limiter_Expecter) SetLimit(newLimit interface{}) *Limiter_SetLimit_Call { + return &Limiter_SetLimit_Call{Call: _e.mock.On("SetLimit", newLimit)} +} + +func (_c *Limiter_SetLimit_Call) Run(run func(newLimit rate.Limit)) *Limiter_SetLimit_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(rate.Limit)) + }) + return _c +} + +func (_c *Limiter_SetLimit_Call) Return() *Limiter_SetLimit_Call { + _c.Call.Return() + return _c +} + +func (_c *Limiter_SetLimit_Call) RunAndReturn(run func(rate.Limit)) *Limiter_SetLimit_Call { + _c.Call.Return(run) + return _c +} + +// SetLimitAt provides a mock function with given fields: t, newLimit +func (_m *Limiter) SetLimitAt(t time.Time, newLimit rate.Limit) { + _m.Called(t, newLimit) +} + +// Limiter_SetLimitAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetLimitAt' +type Limiter_SetLimitAt_Call struct { + *mock.Call +} + +// SetLimitAt is a helper method to define mock.On call +// - t time.Time +// - newLimit rate.Limit +func (_e *Limiter_Expecter) SetLimitAt(t interface{}, newLimit interface{}) *Limiter_SetLimitAt_Call { + return &Limiter_SetLimitAt_Call{Call: _e.mock.On("SetLimitAt", t, newLimit)} +} + +func (_c *Limiter_SetLimitAt_Call) Run(run func(t time.Time, newLimit rate.Limit)) *Limiter_SetLimitAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time), args[1].(rate.Limit)) + }) + return _c +} + +func (_c *Limiter_SetLimitAt_Call) Return() *Limiter_SetLimitAt_Call { + _c.Call.Return() + return _c +} + +func (_c *Limiter_SetLimitAt_Call) RunAndReturn(run func(time.Time, rate.Limit)) *Limiter_SetLimitAt_Call { + _c.Call.Return(run) + return _c +} + +// Tokens provides a mock function with given fields: +func (_m *Limiter) Tokens() float64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Tokens") + } + + var r0 float64 + if rf, ok := ret.Get(0).(func() float64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(float64) + } + + return r0 +} + +// Limiter_Tokens_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Tokens' +type Limiter_Tokens_Call struct { + *mock.Call +} + +// Tokens is a helper method to define mock.On call +func (_e *Limiter_Expecter) Tokens() *Limiter_Tokens_Call { + return &Limiter_Tokens_Call{Call: _e.mock.On("Tokens")} +} + +func (_c *Limiter_Tokens_Call) Run(run func()) *Limiter_Tokens_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Tokens_Call) Return(_a0 float64) *Limiter_Tokens_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Tokens_Call) RunAndReturn(run func() float64) *Limiter_Tokens_Call { + _c.Call.Return(run) + return _c +} + +// TokensAt provides a mock function with given fields: t +func (_m *Limiter) TokensAt(t time.Time) float64 { + ret := _m.Called(t) + + if len(ret) == 0 { + panic("no return value specified for TokensAt") + } + + var r0 float64 + if rf, ok := ret.Get(0).(func(time.Time) float64); ok { + r0 = rf(t) + } else { + r0 = ret.Get(0).(float64) + } + + return r0 +} + +// Limiter_TokensAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TokensAt' +type Limiter_TokensAt_Call struct { + *mock.Call +} + +// TokensAt is a helper method to define mock.On call +// - t time.Time +func (_e *Limiter_Expecter) TokensAt(t interface{}) *Limiter_TokensAt_Call { + return &Limiter_TokensAt_Call{Call: _e.mock.On("TokensAt", t)} +} + +func (_c *Limiter_TokensAt_Call) Run(run func(t time.Time)) *Limiter_TokensAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time)) + }) + return _c +} + +func (_c *Limiter_TokensAt_Call) Return(_a0 float64) *Limiter_TokensAt_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_TokensAt_Call) RunAndReturn(run func(time.Time) float64) *Limiter_TokensAt_Call { + _c.Call.Return(run) + return _c +} + +// Wait provides a mock function with given fields: ctx +func (_m *Limiter) Wait(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Wait") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Limiter_Wait_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Wait' +type Limiter_Wait_Call struct { + *mock.Call +} + +// Wait is a helper method to define mock.On call +// - ctx context.Context +func (_e *Limiter_Expecter) Wait(ctx interface{}) *Limiter_Wait_Call { + return &Limiter_Wait_Call{Call: _e.mock.On("Wait", ctx)} +} + +func (_c *Limiter_Wait_Call) Run(run func(ctx context.Context)) *Limiter_Wait_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Limiter_Wait_Call) Return(err error) *Limiter_Wait_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Limiter_Wait_Call) RunAndReturn(run func(context.Context) error) *Limiter_Wait_Call { + _c.Call.Return(run) + return _c +} + +// WaitN provides a mock function with given fields: ctx, n +func (_m *Limiter) WaitN(ctx context.Context, n int) error { + ret := _m.Called(ctx, n) + + if len(ret) == 0 { + panic("no return value specified for WaitN") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int) error); ok { + r0 = rf(ctx, n) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Limiter_WaitN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WaitN' +type Limiter_WaitN_Call struct { + *mock.Call +} + +// WaitN is a helper method to define mock.On call +// - ctx context.Context +// - n int +func (_e *Limiter_Expecter) WaitN(ctx interface{}, n interface{}) *Limiter_WaitN_Call { + return &Limiter_WaitN_Call{Call: _e.mock.On("WaitN", ctx, n)} +} + +func (_c *Limiter_WaitN_Call) Run(run func(ctx context.Context, n int)) *Limiter_WaitN_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int)) + }) + return _c +} + +func (_c *Limiter_WaitN_Call) Return(err error) *Limiter_WaitN_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Limiter_WaitN_Call) RunAndReturn(run func(context.Context, int) error) *Limiter_WaitN_Call { + _c.Call.Return(run) + return _c +} + +// NewLimiter creates a new instance of Limiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewLimiter(t interface { + mock.TestingT + Cleanup(func()) +}) *Limiter { + mock := &Limiter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/flytepropeller/pkg/controller/mocks/reservation.go b/flytepropeller/pkg/controller/mocks/reservation.go new file mode 100644 index 0000000000..d609c0b034 --- /dev/null +++ b/flytepropeller/pkg/controller/mocks/reservation.go @@ -0,0 +1,237 @@ +// Code generated by mockery v2.40.3. DO NOT EDIT. + +package mocks + +import ( + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// Reservation is an autogenerated mock type for the Reservation type +type Reservation struct { + mock.Mock +} + +type Reservation_Expecter struct { + mock *mock.Mock +} + +func (_m *Reservation) EXPECT() *Reservation_Expecter { + return &Reservation_Expecter{mock: &_m.Mock} +} + +// Cancel provides a mock function with given fields: +func (_m *Reservation) Cancel() { + _m.Called() +} + +// Reservation_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel' +type Reservation_Cancel_Call struct { + *mock.Call +} + +// Cancel is a helper method to define mock.On call +func (_e *Reservation_Expecter) Cancel() *Reservation_Cancel_Call { + return &Reservation_Cancel_Call{Call: _e.mock.On("Cancel")} +} + +func (_c *Reservation_Cancel_Call) Run(run func()) *Reservation_Cancel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Reservation_Cancel_Call) Return() *Reservation_Cancel_Call { + _c.Call.Return() + return _c +} + +func (_c *Reservation_Cancel_Call) RunAndReturn(run func()) *Reservation_Cancel_Call { + _c.Call.Return(run) + return _c +} + +// CancelAt provides a mock function with given fields: t +func (_m *Reservation) CancelAt(t time.Time) { + _m.Called(t) +} + +// Reservation_CancelAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CancelAt' +type Reservation_CancelAt_Call struct { + *mock.Call +} + +// CancelAt is a helper method to define mock.On call +// - t time.Time +func (_e *Reservation_Expecter) CancelAt(t interface{}) *Reservation_CancelAt_Call { + return &Reservation_CancelAt_Call{Call: _e.mock.On("CancelAt", t)} +} + +func (_c *Reservation_CancelAt_Call) Run(run func(t time.Time)) *Reservation_CancelAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time)) + }) + return _c +} + +func (_c *Reservation_CancelAt_Call) Return() *Reservation_CancelAt_Call { + _c.Call.Return() + return _c +} + +func (_c *Reservation_CancelAt_Call) RunAndReturn(run func(time.Time)) *Reservation_CancelAt_Call { + _c.Call.Return(run) + return _c +} + +// Delay provides a mock function with given fields: +func (_m *Reservation) Delay() time.Duration { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Delay") + } + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// Reservation_Delay_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delay' +type Reservation_Delay_Call struct { + *mock.Call +} + +// Delay is a helper method to define mock.On call +func (_e *Reservation_Expecter) Delay() *Reservation_Delay_Call { + return &Reservation_Delay_Call{Call: _e.mock.On("Delay")} +} + +func (_c *Reservation_Delay_Call) Run(run func()) *Reservation_Delay_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Reservation_Delay_Call) Return(_a0 time.Duration) *Reservation_Delay_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Reservation_Delay_Call) RunAndReturn(run func() time.Duration) *Reservation_Delay_Call { + _c.Call.Return(run) + return _c +} + +// DelayFrom provides a mock function with given fields: t +func (_m *Reservation) DelayFrom(t time.Time) time.Duration { + ret := _m.Called(t) + + if len(ret) == 0 { + panic("no return value specified for DelayFrom") + } + + var r0 time.Duration + if rf, ok := ret.Get(0).(func(time.Time) time.Duration); ok { + r0 = rf(t) + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// Reservation_DelayFrom_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DelayFrom' +type Reservation_DelayFrom_Call struct { + *mock.Call +} + +// DelayFrom is a helper method to define mock.On call +// - t time.Time +func (_e *Reservation_Expecter) DelayFrom(t interface{}) *Reservation_DelayFrom_Call { + return &Reservation_DelayFrom_Call{Call: _e.mock.On("DelayFrom", t)} +} + +func (_c *Reservation_DelayFrom_Call) Run(run func(t time.Time)) *Reservation_DelayFrom_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time)) + }) + return _c +} + +func (_c *Reservation_DelayFrom_Call) Return(_a0 time.Duration) *Reservation_DelayFrom_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Reservation_DelayFrom_Call) RunAndReturn(run func(time.Time) time.Duration) *Reservation_DelayFrom_Call { + _c.Call.Return(run) + return _c +} + +// OK provides a mock function with given fields: +func (_m *Reservation) OK() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for OK") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Reservation_OK_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OK' +type Reservation_OK_Call struct { + *mock.Call +} + +// OK is a helper method to define mock.On call +func (_e *Reservation_Expecter) OK() *Reservation_OK_Call { + return &Reservation_OK_Call{Call: _e.mock.On("OK")} +} + +func (_c *Reservation_OK_Call) Run(run func()) *Reservation_OK_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Reservation_OK_Call) Return(_a0 bool) *Reservation_OK_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Reservation_OK_Call) RunAndReturn(run func() bool) *Reservation_OK_Call { + _c.Call.Return(run) + return _c +} + +// NewReservation creates a new instance of Reservation. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReservation(t interface { + mock.TestingT + Cleanup(func()) +}) *Reservation { + mock := &Reservation{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/flytepropeller/pkg/controller/rate_limiter.go b/flytepropeller/pkg/controller/rate_limiter.go new file mode 100644 index 0000000000..100d6aa82c --- /dev/null +++ b/flytepropeller/pkg/controller/rate_limiter.go @@ -0,0 +1,116 @@ +package controller + +import ( + "context" + "sync" + "time" + + "golang.org/x/time/rate" + "k8s.io/client-go/util/workqueue" + + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/interfaces" +) + +// limiterAdapter adapts rate.NewLimiter to use the Reservation interface so that it can be used in unittests. +type limiterAdapter struct { + limiter *rate.Limiter +} + +func NewLimiter(r rate.Limit, b int) interfaces.Limiter { + return &limiterAdapter{rate.NewLimiter(r, b)} +} + +func (l *limiterAdapter) Allow() bool { + return l.limiter.Allow() +} + +func (l *limiterAdapter) AllowN(t time.Time, n int) bool { + return l.limiter.AllowN(t, n) +} + +func (l *limiterAdapter) Burst() int { + return l.limiter.Burst() +} + +func (l *limiterAdapter) Limit() rate.Limit { + return l.limiter.Limit() +} + +func (l *limiterAdapter) Reserve() interfaces.Reservation { + return l.limiter.Reserve() +} + +func (l *limiterAdapter) ReserveN(t time.Time, n int) interfaces.Reservation { + return l.limiter.ReserveN(t, n) +} +func (l *limiterAdapter) SetBurst(newBurst int) { + l.limiter.SetBurst(newBurst) +} + +func (l *limiterAdapter) SetBurstAt(t time.Time, newBurst int) { + l.limiter.SetBurstAt(t, newBurst) +} + +func (l *limiterAdapter) SetLimit(newLimit rate.Limit) { + l.limiter.SetLimit(newLimit) +} + +func (l *limiterAdapter) SetLimitAt(t time.Time, newLimit rate.Limit) { + l.limiter.SetLimitAt(t, newLimit) +} + +func (l *limiterAdapter) Tokens() float64 { + return l.limiter.Tokens() +} + +func (l *limiterAdapter) TokensAt(t time.Time) float64 { + return l.limiter.TokensAt(t) +} + +func (l *limiterAdapter) Wait(ctx context.Context) (err error) { + return l.limiter.Wait(ctx) +} + +func (l *limiterAdapter) WaitN(ctx context.Context, n int) (err error) { + return l.limiter.WaitN(ctx, n) +} + +// Similar to the standard BucketRateLimiter but dedupes items in order to avoid reserving token slots for the +// same item multiple times. Intened to be used with a DelayingQueue, which dedupes items on insertion. +type dedupingBucketRateLimiter struct { + Limiter interfaces.Limiter + mu sync.Mutex + reservations map[interface{}]interfaces.Reservation +} + +func NewDedupingBucketRateLimiter(limiter interfaces.Limiter) workqueue.RateLimiter { + return &dedupingBucketRateLimiter{ + Limiter: limiter, + reservations: make(map[interface{}]interfaces.Reservation), + } +} + +var _ workqueue.RateLimiter = &dedupingBucketRateLimiter{} + +func (r *dedupingBucketRateLimiter) When(item interface{}) time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + // Check if this item has an outstanding reservation. If so, use it to avoid a duplicate reservation. + if res, ok := r.reservations[item]; ok && res.Delay() > 0 { + return res.Delay() + } + r.reservations[item] = r.Limiter.Reserve() + return r.reservations[item].Delay() +} + +func (r *dedupingBucketRateLimiter) NumRequeues(item interface{}) int { + return 0 +} + +func (r *dedupingBucketRateLimiter) Forget(item interface{}) { + r.mu.Lock() + defer r.mu.Unlock() + if res, ok := r.reservations[item]; ok && res.Delay() <= 0 { + delete(r.reservations, item) + } +} diff --git a/flytepropeller/pkg/controller/rate_limiter_test.go b/flytepropeller/pkg/controller/rate_limiter_test.go new file mode 100644 index 0000000000..16e5bae417 --- /dev/null +++ b/flytepropeller/pkg/controller/rate_limiter_test.go @@ -0,0 +1,98 @@ +package controller + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/mocks" +) + +type rateLimiterTests struct { + suite.Suite + limiter *mocks.Limiter + deduping *dedupingBucketRateLimiter +} + +func TestDedupingBucketRateLimiter(t *testing.T) { + suite.Run(t, &rateLimiterTests{}) +} + +func (s *rateLimiterTests) SetupTest() { + s.limiter = mocks.NewLimiter(s.T()) + s.deduping = NewDedupingBucketRateLimiter(s.limiter).(*dedupingBucketRateLimiter) +} + +func (s *rateLimiterTests) TearDownTest() { + s.limiter.AssertExpectations(s.T()) +} + +func (s *rateLimiterTests) Test_When_NotFound() { + newReservation := mocks.NewReservation(s.T()) + defer newReservation.AssertExpectations(s.T()) + newReservation.EXPECT().Delay().Return(time.Minute).Once() + s.limiter.EXPECT().Reserve().Return(newReservation).Once() + + d := s.deduping.When("item1") + + assert.Equal(s.T(), newReservation, s.deduping.reservations["item1"]) + assert.Equal(s.T(), time.Minute, d) +} + +func (s *rateLimiterTests) Test_When_FoundPast() { + pastReservation := mocks.NewReservation(s.T()) + defer pastReservation.AssertExpectations(s.T()) + pastReservation.EXPECT().Delay().Return(-time.Minute).Once() + s.deduping.reservations["item1"] = pastReservation + newReservation := mocks.NewReservation(s.T()) + defer newReservation.AssertExpectations(s.T()) + newReservation.EXPECT().Delay().Return(time.Minute).Once() + s.limiter.EXPECT().Reserve().Return(newReservation).Once() + + d := s.deduping.When("item1") + + assert.Equal(s.T(), newReservation, s.deduping.reservations["item1"]) + assert.Equal(s.T(), time.Minute, d) +} + +func (s *rateLimiterTests) Test_When_FoundFuture() { + futureReservation := mocks.NewReservation(s.T()) + defer futureReservation.AssertExpectations(s.T()) + futureReservation.EXPECT().Delay().Return(time.Minute).Twice() + s.deduping.reservations["item1"] = futureReservation + + d := s.deduping.When("item1") + + assert.Equal(s.T(), futureReservation, s.deduping.reservations["item1"]) + assert.Equal(s.T(), time.Minute, d) +} + +func (s *rateLimiterTests) Test_Forget_NotFound() { + s.deduping.Forget("item1") + + assert.NotContains(s.T(), s.deduping.reservations, "item1") +} + +func (s *rateLimiterTests) Test_Forget_PastReservation() { + pastReservation := mocks.NewReservation(s.T()) + defer pastReservation.AssertExpectations(s.T()) + pastReservation.EXPECT().Delay().Return(-time.Minute).Once() + s.deduping.reservations["item1"] = pastReservation + + s.deduping.Forget("item1") + + assert.NotContains(s.T(), s.deduping.reservations, "item1") +} + +func (s *rateLimiterTests) Test_Forget_FutureReservation() { + futureReservation := mocks.NewReservation(s.T()) + defer futureReservation.AssertExpectations(s.T()) + futureReservation.EXPECT().Delay().Return(time.Minute).Once() + s.deduping.reservations["item1"] = futureReservation + + s.deduping.Forget("item1") + + assert.Equal(s.T(), futureReservation, s.deduping.reservations["item1"]) +} diff --git a/flytepropeller/pkg/controller/workqueue.go b/flytepropeller/pkg/controller/workqueue.go index 1d10cb5f2a..fcf97c3e1c 100644 --- a/flytepropeller/pkg/controller/workqueue.go +++ b/flytepropeller/pkg/controller/workqueue.go @@ -18,10 +18,8 @@ func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) case config.WorkqueueTypeBucketRateLimiter: logger.Infof(ctx, "Using Bucket Ratelimited Workqueue, Rate [%v] Capacity [%v]", cfg.Rate, cfg.Capacity) return workqueue.NewNamedRateLimitingQueue( - // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.BucketRateLimiter{ - Limiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity), - }, name), nil + NewDedupingBucketRateLimiter(NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity)), + name), nil case config.WorkqueueTypeExponentialFailureRateLimiter: logger.Infof(ctx, "Using Exponential failure backoff Ratelimited Workqueue, Base Delay [%v], max Delay [%v]", cfg.BaseDelay, cfg.MaxDelay) return workqueue.NewNamedRateLimitingQueue( @@ -31,9 +29,7 @@ func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) logger.Infof(ctx, "Using Max-of Ratelimited Workqueue, Bucket {Rate [%v] Capacity [%v]} | FailureBackoff {Base Delay [%v], max Delay [%v]}", cfg.Rate, cfg.Capacity, cfg.BaseDelay, cfg.MaxDelay) return workqueue.NewNamedRateLimitingQueue( workqueue.NewMaxOfRateLimiter( - &workqueue.BucketRateLimiter{ - Limiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity), - }, + NewDedupingBucketRateLimiter(NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity)), workqueue.NewItemExponentialFailureRateLimiter(cfg.BaseDelay.Duration, cfg.MaxDelay.Duration), ), name), nil