Skip to content

Commit

Permalink
Use pluggable clock for auto refresh cache and make unit tests fast (#…
Browse files Browse the repository at this point in the history
…5767)

* Use pluggable clock and make unit tests fast

Signed-off-by: Jason Parraga <[email protected]>

* lint fixes

Signed-off-by: Jason Parraga <[email protected]>

---------

Signed-off-by: Jason Parraga <[email protected]>
  • Loading branch information
Sovietaced authored Sep 24, 2024
1 parent c0fc6d4 commit c01a059
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 37 deletions.
1 change: 0 additions & 1 deletion flytestdlib/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ linters:
- structcheck
- typecheck
- unconvert
- unparam
- unused
- varcheck

Expand Down
51 changes: 39 additions & 12 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/errors"
Expand Down Expand Up @@ -125,6 +125,7 @@ type autoRefresh struct {
workqueue workqueue.RateLimitingInterface
parallelizm int
lock sync.RWMutex
clock clock.Clock
}

func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) {
Expand Down Expand Up @@ -165,17 +166,29 @@ func (w *autoRefresh) Start(ctx context.Context) error {
}

enqueueCtx := contextutils.WithGoroutineLabel(ctx, fmt.Sprintf("%v-enqueue", w.name))

go wait.Until(func() {
err := w.enqueueBatches(enqueueCtx)
if err != nil {
logger.Errorf(enqueueCtx, "Failed to enqueue. Error: %v", err)
}
}, w.syncPeriod, enqueueCtx.Done())
go w.enqueueLoop(enqueueCtx)

return nil
}

func (w *autoRefresh) enqueueLoop(ctx context.Context) {
timer := w.clock.NewTimer(w.syncPeriod)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-timer.C():
err := w.enqueueBatches(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to enqueue. Error: %v", err)
}
timer.Reset(w.syncPeriod)
}
}
}

// Update updates the item only if it exists in the cache, return true if we updated the item.
func (w *autoRefresh) Update(id ItemID, item Item) (ok bool) {
w.lock.Lock()
Expand Down Expand Up @@ -221,7 +234,7 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
w.processing.Store(id, time.Now())
w.processing.Store(id, w.clock.Now())
return item, nil
}

Expand Down Expand Up @@ -265,7 +278,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
b := batch
w.workqueue.AddRateLimited(&b)
for i := 1; i < len(b); i++ {
w.processing.Store(b[i].GetID(), time.Now())
w.processing.Store(b[i].GetID(), w.clock.Now())
}
}

Expand Down Expand Up @@ -365,7 +378,7 @@ func (w *autoRefresh) inProcessing(key interface{}) bool {
item, found := w.processing.Load(key)
if found {
// handle potential race conditions where the item is in processing but not in the workqueue
if timeItem, ok := item.(time.Time); ok && time.Since(timeItem) > (w.syncPeriod*5) {
if timeItem, ok := item.(time.Time); ok && w.clock.Since(timeItem) > (w.syncPeriod*5) {
w.processing.Delete(key)
return false
}
Expand All @@ -377,6 +390,11 @@ func (w *autoRefresh) inProcessing(key interface{}) bool {
// Instantiates a new AutoRefresh Cache that syncs items in batches.
func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter,
resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) {
return newAutoRefreshBatchedCacheWithClock(name, createBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope, clock.RealClock{})
}

func newAutoRefreshBatchedCacheWithClock(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter,
resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope, clock clock.WithTicker) (AutoRefresh, error) {

metrics := newMetrics(scope)
lruCache, err := lru.NewWithEvict(size, getEvictionFunction(metrics.Evictions))
Expand All @@ -394,7 +412,11 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy
processing: &sync.Map{},
toDelete: newSyncSet(),
syncPeriod: resyncPeriod,
workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()),
workqueue: workqueue.NewRateLimitingQueueWithConfig(syncRateLimiter, workqueue.RateLimitingQueueConfig{
Name: scope.CurrentScope(),
Clock: clock,
}),
clock: clock,
}

return cache, nil
Expand All @@ -406,3 +428,8 @@ func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue

return NewAutoRefreshBatchedCache(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope)
}

func newAutoRefreshCacheWithClock(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration,
parallelizm, size int, scope promutils.Scope, clock clock.WithTicker) (AutoRefresh, error) {
return newAutoRefreshBatchedCacheWithClock(name, SingleItemBatches, syncCb, syncRateLimiter, resyncPeriod, parallelizm, size, scope, clock)
}
63 changes: 39 additions & 24 deletions flytestdlib/cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"k8s.io/client-go/util/workqueue"
testingclock "k8s.io/utils/clock/testing"

"github.com/flyteorg/flyte/flytestdlib/atomic"
"github.com/flyteorg/flyte/flytestdlib/errors"
Expand Down Expand Up @@ -74,12 +75,13 @@ func (p *panickingSyncer) sync(_ context.Context, _ Batch) ([]ItemSyncResponse,
}

func TestCacheFour(t *testing.T) {
testResyncPeriod := 10 * time.Millisecond
testResyncPeriod := 5 * time.Second
rateLimiter := workqueue.DefaultControllerRateLimiter()
fakeClock := testingclock.NewFakeClock(time.Now())

t.Run("normal operation", func(t *testing.T) {
// the size of the cache is at least as large as the number of items we're storing
cache, err := NewAutoRefreshCache("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope())
cache, err := newAutoRefreshCacheWithClock("fake1", syncFakeItem, rateLimiter, testResyncPeriod, 10, 10, promutils.NewTestScope(), fakeClock)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -94,18 +96,21 @@ func TestCacheFour(t *testing.T) {
}

assert.EventuallyWithT(t, func(c *assert.CollectT) {
// trigger periodic sync
fakeClock.Step(testResyncPeriod)

for i := 1; i <= 10; i++ {
item, err := cache.Get(fmt.Sprintf("%d", i))
assert.NoError(c, err)
assert.Equal(c, 10, item.(fakeCacheItem).val)
}
}, 3*time.Second, 100*time.Millisecond)
}, 3*time.Second, time.Millisecond)
cancel()
})

t.Run("Not Found", func(t *testing.T) {
// the size of the cache is at least as large as the number of items we're storing
cache, err := NewAutoRefreshCache("fake2", syncFakeItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
cache, err := newAutoRefreshCacheWithClock("fake2", syncFakeItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), fakeClock)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -133,7 +138,7 @@ func TestCacheFour(t *testing.T) {
})

t.Run("Enqueue nothing", func(t *testing.T) {
cache, err := NewAutoRefreshCache("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
cache, err := newAutoRefreshCacheWithClock("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), fakeClock)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -147,15 +152,16 @@ func TestCacheFour(t *testing.T) {
assert.NoError(t, err)
}

// Wait half a second for all resync periods to complete
// Enqueue first batch
fakeClock.Step(testResyncPeriod)
// If the cache tries to enqueue the item, a panic will be thrown.
time.Sleep(500 * time.Millisecond)
fakeClock.Step(testResyncPeriod)

cancel()
})

t.Run("Test update and delete cache", func(t *testing.T) {
cache, err := NewAutoRefreshCache("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
cache, err := newAutoRefreshCacheWithClock("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), fakeClock)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -167,24 +173,27 @@ func TestCacheFour(t *testing.T) {
})
assert.NoError(t, err)

// Wait half a second for all resync periods to complete
// If the cache tries to enqueue the item, a panic will be thrown.
time.Sleep(500 * time.Millisecond)
fakeClock.Step(testResyncPeriod)

err = cache.DeleteDelayed(itemID)
assert.NoError(t, err)

time.Sleep(500 * time.Millisecond)
item, err := cache.Get(itemID)
assert.Nil(t, item)
assert.Error(t, err)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
// trigger a sync
fakeClock.Step(testResyncPeriod)

item, err := cache.Get(itemID)
assert.Nil(c, item)
assert.Error(c, err)
}, 3*time.Second, time.Millisecond)

cancel()
})

t.Run("Test panic on sync and shutdown", func(t *testing.T) {
syncer := &panickingSyncer{}
cache, err := NewAutoRefreshCache("fake3", syncer.sync, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
cache, err := newAutoRefreshCacheWithClock("fake3", syncer.sync, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope(), fakeClock)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -198,22 +207,20 @@ func TestCacheFour(t *testing.T) {

// wait for all workers to run
assert.Eventually(t, func() bool {
// trigger a sync
fakeClock.Step(testResyncPeriod)

return syncer.callCount.Load() == int32(10)
}, 5*time.Second, time.Millisecond)

// wait some more time
time.Sleep(500 * time.Millisecond)

// all workers should have shut down.
assert.Equal(t, int32(10), syncer.callCount.Load())

cancel()
})
}

func TestQueueBuildUp(t *testing.T) {
testResyncPeriod := time.Hour
rateLimiter := workqueue.DefaultControllerRateLimiter()
fakeClock := testingclock.NewFakeClock(time.Now())

syncCount := atomic.NewInt32(0)
m := sync.Map{}
Expand All @@ -231,7 +238,7 @@ func TestQueueBuildUp(t *testing.T) {
}

size := 100
cache, err := NewAutoRefreshCache("fake2", alwaysFailing, rateLimiter, testResyncPeriod, 10, size, promutils.NewTestScope())
cache, err := newAutoRefreshCacheWithClock("fake2", alwaysFailing, rateLimiter, testResyncPeriod, 10, size, promutils.NewTestScope(), fakeClock)
assert.NoError(t, err)

ctx := context.Background()
Expand All @@ -244,16 +251,24 @@ func TestQueueBuildUp(t *testing.T) {
}

assert.NoError(t, cache.Start(ctx))
time.Sleep(5 * time.Second)
assert.Equal(t, int32(size), syncCount.Load())

// wait for all workers to run
assert.Eventually(t, func() bool {
// trigger a sync and unlock the work queue
fakeClock.Step(time.Millisecond)

return syncCount.Load() == int32(size)
}, 5*time.Second, time.Millisecond)
}

func TestInProcessing(t *testing.T) {

syncPeriod := time.Millisecond
fakeClock := testingclock.NewFakeClock(time.Now())
cache := &autoRefresh{
processing: &sync.Map{},
syncPeriod: syncPeriod,
clock: fakeClock,
}

assert.False(t, cache.inProcessing("test"))
Expand Down

0 comments on commit c01a059

Please sign in to comment.