Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add caching for PriceReader #372

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ packages:
interfaces:
ChainSupport:
PluginProcessor:
github.com/smartcontractkit/chainlink-ccip/internal/cache:
interfaces:
Cache:
github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn:
interfaces:
Controller:
Expand Down
162 changes: 162 additions & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package cache

import (
"sync"
"time"

"github.com/patrickmn/go-cache"
)

/*
Package cache provides a generic caching implementation that wraps the go-cache library
with additional support for custom eviction policies. It allows both time-based expiration
(inherited from go-cache) and custom eviction rules through user-defined policies.

The cache is type-safe through Go generics, thread-safe through mutex locks, and supports
all basic cache operations. Keys are strings, and values can be of any type. Each cached
value stores its insertion timestamp, allowing for time-based validation in custom policies.

Example usage with contract reader:
type Event struct {
Timestamp int64
Data string
}

type ContractReader interface {
QueryEvents(ctx context.Context, filter QueryFilter) ([]Event, error)
}

reader := NewContractReader()

// Create cache with contract reader in closure
cache := NewCustomCache[Event](
5*time.Minute, // Default expiration
10*time.Minute, // Cleanup interval
func(ev Event, _ time.Time) bool {
ctx := context.Background()
filter := QueryFilter{
FromTimestamp: ev.Timestamp(),
Confidence: Finalized,
}

// Query for any events after our cache insertion time
newEvents, err := reader.QueryEvents(ctx, filter)
if err != nil {
return false // Keep cache on error
}

// Evict if new events exist after our cache time
return len(newEvents) > 0
},
)

// Cache an event
ev := Event{Timestamp: time.Now().Unix(), Data: "..."}
cache.Set("key", ev, NoExpiration)

// Later: event will be evicted if newer ones exist on chain
ev, found := cache.Get("key")

The cache ensures data freshness through:
- Automatic time-based expiration from go-cache
- Custom eviction policies with access to storage timestamps
- Thread-safe operations for concurrent access
- Type safety through Go generics
*/

const (
NoExpiration = cache.NoExpiration
)

// Cache defines the interface for cache operations
type Cache[V any] interface {
// Set adds an item to the cache with an expiration time
Set(key string, value V, expiration time.Duration)
// Get retrieves an item from the cache
Get(key string) (V, bool)
// Delete removes an item from the cache
Delete(key string)
// Items returns all items in the cache
Items() map[string]V
}

// timestampedValue wraps a value with its storage timestamp
type timestampedValue[V any] struct {
Value V
StoredAt time.Time
}

type CustomCache[V any] struct {
*cache.Cache
customPolicy func(V, time.Time) bool // Updated to include storage time
mutex sync.RWMutex
}

// NewCustomCache creates a new cache with both time-based and custom eviction policies
func NewCustomCache[V any](
defaultExpiration time.Duration,
cleanupInterval time.Duration,
customPolicy func(V, time.Time) bool,
) *CustomCache[V] {
return &CustomCache[V]{
Cache: cache.New(defaultExpiration, cleanupInterval),
customPolicy: customPolicy,
}
}

// Set adds an item to the cache with current timestamp
func (c *CustomCache[V]) Set(key string, value V, expiration time.Duration) {
wrapped := timestampedValue[V]{
Value: value,
StoredAt: time.Now(),
}
c.Cache.Set(key, wrapped, expiration)
}

// Get retrieves an item from the cache, checking both time-based and custom policies
func (c *CustomCache[V]) Get(key string) (V, bool) {
c.mutex.Lock()
defer c.mutex.Unlock()

var zero V
value, found := c.Cache.Get(key)
if !found {
return zero, false
}

// Type assertion for timestamped value
wrapped, ok := value.(timestampedValue[V])
if !ok {
return zero, false
}

// Check custom policy with timestamp
if c.customPolicy != nil && c.customPolicy(wrapped.Value, wrapped.StoredAt) {
c.Cache.Delete(key)
return zero, false
}

return wrapped.Value, true
}

// Delete removes an item from the cache
func (c *CustomCache[V]) Delete(key string) {
c.Cache.Delete(key)
}

// Items returns all items in the cache
func (c *CustomCache[V]) Items() map[string]V {
c.mutex.RLock()
defer c.mutex.RUnlock()

items := c.Cache.Items()
result := make(map[string]V)

for k, v := range items {
if wrapped, ok := v.Object.(timestampedValue[V]); ok {
result[k] = wrapped.Value
}
}

return result
}
160 changes: 160 additions & 0 deletions internal/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package cache

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestCustomCache(t *testing.T) {
t.Run("basic operations without custom policy", func(t *testing.T) {
cache := NewCustomCache[int](5*time.Minute, 10*time.Minute, nil)

// Test Set and Get
cache.Set("test1", 100, NoExpiration)
value, found := cache.Get("test1")
assert.True(t, found)
assert.Equal(t, 100, value)

// Test non-existent key
_, found = cache.Get("nonexistent")
assert.False(t, found)

// Test Delete
cache.Delete("test1")
_, found = cache.Get("test1")
assert.False(t, found)
})

t.Run("custom policy with timestamp", func(t *testing.T) {
now := time.Now()
isStale := func(v int, storedAt time.Time) bool {
return storedAt.Before(now)
}
cache := NewCustomCache[int](5*time.Minute, 10*time.Minute, isStale)

// Value stored now should not be evicted
cache.Set("fresh", 1, NoExpiration)
value, found := cache.Get("fresh")
assert.True(t, found)
assert.Equal(t, 1, value)

// Simulate old value by manipulating timestamp
oldValue := timestampedValue[int]{
Value: 2,
StoredAt: now.Add(-1 * time.Hour),
}
cache.Cache.Set("stale", oldValue, NoExpiration)

// Stale value should be evicted
_, found = cache.Get("stale")
assert.False(t, found)
})

t.Run("time based expiration", func(t *testing.T) {
cache := NewCustomCache[string](1*time.Second, 1*time.Second, nil)

cache.Set("key", "value", 100*time.Millisecond)

// Should exist initially
value, found := cache.Get("key")
assert.True(t, found)
assert.Equal(t, "value", value)

// Should expire
time.Sleep(200 * time.Millisecond)
_, found = cache.Get("key")
assert.False(t, found)
})

t.Run("items retrieval", func(t *testing.T) {
cache := NewCustomCache[int](5*time.Minute, 10*time.Minute, nil)

cache.Set("one", 1, NoExpiration)
cache.Set("two", 2, NoExpiration)

items := cache.Items()
assert.Len(t, items, 2)
assert.Equal(t, 1, items["one"])
assert.Equal(t, 2, items["two"])
})

t.Run("concurrent access with timestamps", func(t *testing.T) {
cache := NewCustomCache[int](5*time.Minute, 10*time.Minute, nil)

// Run multiple goroutines accessing the cache
done := make(chan bool)
for i := 0; i < 10; i++ {
go func(val int) {
cache.Set("key", val, NoExpiration)
_, _ = cache.Get("key")
done <- true
}(i)
}

// Wait for all goroutines
for i := 0; i < 10; i++ {
<-done
}

// Should have a value at the end
_, found := cache.Get("key")
assert.True(t, found)
})

t.Run("complex types with timestamp eviction", func(t *testing.T) {
type ComplexType struct {
ID int
Name string
Timestamp time.Time
}

threshold := time.Now()
cache := NewCustomCache[ComplexType](
5*time.Minute,
10*time.Minute,
func(v ComplexType, storedAt time.Time) bool {
return storedAt.Before(threshold)
},
)

value := ComplexType{ID: 1, Name: "test", Timestamp: time.Now()}
cache.Set("complex", value, NoExpiration)

// Fresh value should not be evicted
retrieved, found := cache.Get("complex")
assert.True(t, found)
assert.Equal(t, value, retrieved)

// Simulate old value
oldValue := timestampedValue[ComplexType]{
Value: ComplexType{ID: 2, Name: "old"},
StoredAt: threshold.Add(-1 * time.Hour),
}
cache.Cache.Set("old", oldValue, NoExpiration)

// Old value should be evicted
_, found = cache.Get("old")
assert.False(t, found)
})

t.Run("correct timestamp storage", func(t *testing.T) {
cache := NewCustomCache[string](5*time.Minute, 10*time.Minute, nil)

before := time.Now()
cache.Set("key", "value", NoExpiration)
after := time.Now()

// Get the raw timestamped value
raw, found := cache.Cache.Get("key")
assert.True(t, found)

wrapped, ok := raw.(timestampedValue[string])
assert.True(t, ok)

// StoredAt should be between before and after
assert.True(t, wrapped.StoredAt.After(before) || wrapped.StoredAt.Equal(before))
assert.True(t, wrapped.StoredAt.Before(after) || wrapped.StoredAt.Equal(after))
})
}
22 changes: 22 additions & 0 deletions internal/cache/cachekeys/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cachekeys

import (
"fmt"

"github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
)

// TokenDecimals creates a cache key for token decimals
func TokenDecimals(token ccipocr3.UnknownEncodedAddress, address string) string {
return fmt.Sprintf("token-decimals:%s:%s", token, address)
}

// FeeQuoterTokenUpdate creates a cache key for fee quoter token updates
func FeeQuoterTokenUpdate(token ccipocr3.UnknownEncodedAddress, chain ccipocr3.ChainSelector) string {
return fmt.Sprintf("fee-quoter-update:%d:%s", chain, token)
}

// TokenPrice creates a cache key for token USD prices
func FeedPricesUSD(token ccipocr3.UnknownEncodedAddress) string {
return fmt.Sprintf("token-price:%s", token)
}
Loading
Loading