Skip to content

Commit

Permalink
Update the item only if it exists in the cache (#4117)
Browse files Browse the repository at this point in the history
* Add item ID to the workqueue instead

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* Update the item only if it exists in the cache

Signed-off-by: Kevin Su <[email protected]>

* Update tests

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* address comment

Signed-off-by: Kevin Su <[email protected]>

* fixed tests

Signed-off-by: Kevin Su <[email protected]>

* address comment

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Oct 12, 2023
1 parent b932cf1 commit da77476
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Test_monitor(t *testing.T) {
client.OnStatusMatch(ctx, mock.Anything).Return(core2.PhaseInfoSuccess(nil), nil)

wg := sync.WaitGroup{}
wg.Add(4)
wg.Add(8)
cacheObj, err := cache.NewAutoRefreshCache(rand.String(5), func(ctx context.Context, batch cache.Batch) (updatedBatch []cache.ItemSyncResponse, err error) {
wg.Done()
t.Logf("Syncing Item [%+v]", batch[0])
Expand Down
62 changes: 50 additions & 12 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"context"
"fmt"
"sync"
"time"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
Expand Down Expand Up @@ -122,6 +123,7 @@ type autoRefresh struct {
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
lock sync.RWMutex
}

func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) {
Expand Down Expand Up @@ -173,6 +175,25 @@ func (w *autoRefresh) Start(ctx context.Context) error {
return nil
}

// Update updates the item only if it exists in the cache, return true if we updated the item.
func (w *autoRefresh) Update(id ItemID, item Item) (ok bool) {
w.lock.Lock()
defer w.lock.Unlock()
ok = w.lruMap.Contains(id)
if ok {
w.lruMap.Add(id, item)
}
return ok
}

// Delete deletes the item from the cache if it exists.
func (w *autoRefresh) Delete(key interface{}) {
w.lock.Lock()
defer w.lock.Unlock()
w.toDelete.Remove(key)
w.lruMap.Remove(key)
}

func (w *autoRefresh) Get(id ItemID) (Item, error) {
if val, ok := w.lruMap.Get(id); ok {
w.metrics.CacheHit.Inc()
Expand Down Expand Up @@ -212,8 +233,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
snapshot := make([]ItemWrapper, 0, len(keys))
for _, k := range keys {
if w.toDelete.Contains(k) {
w.lruMap.Remove(k)
w.toDelete.Remove(k)
w.Delete(k)
continue
}
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
Expand Down Expand Up @@ -273,18 +293,37 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
case <-ctx.Done():
return nil
default:
item, shutdown := w.workqueue.Get()
batch, shutdown := w.workqueue.Get()
if shutdown {
logger.Debugf(ctx, "Shutting down worker")
return nil
}

t := w.metrics.SyncLatency.Start()
updatedBatch, err := w.syncCb(ctx, *item.(*Batch))

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(item)
w.workqueue.Done(item)
w.workqueue.Forget(batch)
w.workqueue.Done(batch)

newBatch := make(Batch, 0, len(*batch.(*Batch)))
for _, b := range *batch.(*Batch) {
itemID := b.GetID()
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
continue
}
if item.(Item).IsTerminal() {
logger.Debugf(ctx, "item with id [%v] is terminal", itemID)
continue
}
newBatch = append(newBatch, b)
}
if len(newBatch) == 0 {
continue
}

t := w.metrics.SyncLatency.Start()
updatedBatch, err := w.syncCb(ctx, newBatch)

if err != nil {
w.metrics.SyncErrors.Inc()
Expand All @@ -295,14 +334,13 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {

for _, item := range updatedBatch {
if item.Action == Update {
// Add adds the item if it has been evicted or updates an existing one.
w.lruMap.Add(item.ID, item.Item)
// Updates an existing item.
w.Update(item.ID, item.Item)
}
}

w.toDelete.Range(func(key interface{}) bool {
w.lruMap.Remove(key)
w.toDelete.Remove(key)
w.Delete(key)
return true
})

Expand Down
30 changes: 29 additions & 1 deletion flytestdlib/cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error
panic("This should never be called")
}

func TestCacheThree(t *testing.T) {
func TestCacheFour(t *testing.T) {
testResyncPeriod := time.Millisecond
rateLimiter := workqueue.DefaultControllerRateLimiter()

Expand Down Expand Up @@ -142,6 +142,34 @@ func TestCacheThree(t *testing.T) {

cancel()
})

t.Run("Test update and delete cache", func(t *testing.T) {
cache, err := NewAutoRefreshCache("fake3", syncTerminalItem, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, cache.Start(ctx))

itemID := "dummy_id"
_, err = cache.GetOrCreate(itemID, terminalCacheItem{
val: 0,
})
assert.NoError(t, err)

// Wait half a second for all resync periods to complete
// If the cache tries to enqueue the item, a panic will be thrown.
time.Sleep(500 * time.Millisecond)

err = cache.DeleteDelayed(itemID)
assert.NoError(t, err)

time.Sleep(500 * time.Millisecond)
item, err := cache.Get(itemID)
assert.Nil(t, item)
assert.Error(t, err)

cancel()
})
}

func TestQueueBuildUp(t *testing.T) {
Expand Down

0 comments on commit da77476

Please sign in to comment.