Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mercury cache fixes #11448

Merged
merged 3 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing())

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
Logger: appLggr,
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
Expand Down
1 change: 0 additions & 1 deletion core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
loopRegistry := plugins.NewLoopRegistry(lggr, nil)

mercuryPool := wsrpc.NewPool(lggr, cache.Config{
Logger: lggr,
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
Expand Down
53 changes: 26 additions & 27 deletions core/services/relay/evm/mercury/wsrpc/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type Cache interface {
}

type Config struct {
Logger logger.Logger
// LatestReportTTL controls how "stale" we will allow a price to be e.g. if
// set to 1s, a new price will always be fetched if the last result was
// from more than 1 second ago.
Expand All @@ -84,8 +83,8 @@ type Config struct {
LatestReportDeadline time.Duration
}

func NewCache(client Client, cfg Config) Cache {
return newMemCache(client, cfg)
func NewCache(lggr logger.Logger, client Client, cfg Config) Cache {
return newMemCache(lggr, client, cfg)
}

type cacheVal struct {
Expand Down Expand Up @@ -164,24 +163,20 @@ type memCache struct {

client Client

latestPriceTTL time.Duration
maxStaleAge time.Duration
latestReportDeadline time.Duration
cfg Config

cache sync.Map

wg sync.WaitGroup
chStop services.StopChan
}

func newMemCache(client Client, cfg Config) *memCache {
func newMemCache(lggr logger.Logger, client Client, cfg Config) *memCache {
return &memCache{
services.StateMachine{},
cfg.Logger.Named("MercuryMemCache"),
lggr.Named("MemCache"),
client,
cfg.LatestReportTTL,
cfg.MaxStaleAge,
cfg.LatestReportDeadline,
cfg,
sync.Map{},
sync.WaitGroup{},
make(chan (struct{})),
Expand All @@ -197,10 +192,11 @@ func (m *memCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest
if req == nil {
return nil, errors.New("req must not be nil")
}
if m.latestPriceTTL <= 0 {
feedIDHex := mercuryutils.BytesToFeedID(req.FeedId).String()
if m.cfg.LatestReportTTL <= 0 {
return m.client.RawClient().LatestReport(ctx, req)
}
vi, _ := m.cache.LoadOrStore(req, &cacheVal{
vi, loaded := m.cache.LoadOrStore(feedIDHex, &cacheVal{
sync.RWMutex{},
false,
nil,
Expand All @@ -210,44 +206,46 @@ func (m *memCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest
})
v := vi.(*cacheVal)

m.lggr.Tracew("LatestReport", "feedID", feedIDHex, "loaded", loaded)

// HOT PATH
v.RLock()
if time.Now().Before(v.expiresAt) {
// CACHE HIT
promCacheHitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheHitCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()

defer v.RUnlock()
return v.val, nil
} else if v.fetching {
// CACHE WAIT
promCacheWaitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheWaitCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
// if someone else is fetching then wait for the fetch to complete
ch := v.fetchCh
v.RUnlock()
return v.waitForResult(ctx, ch, m.chStop)
}
// CACHE MISS
promCacheMissCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheMissCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
// fallthrough to cold path and fetch
v.RUnlock()

// COLD PATH
v.Lock()
if time.Now().Before(v.expiresAt) {
// CACHE HIT
promCacheHitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheHitCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
defer v.RUnlock()
return v.val, nil
} else if v.fetching {
// CACHE WAIT
promCacheWaitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheWaitCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
// if someone else is fetching then wait for the fetch to complete
ch := v.fetchCh
v.Unlock()
return v.waitForResult(ctx, ch, m.chStop)
}
// CACHE MISS
promCacheMissCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheMissCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
// initiate the fetch and wait for result
ch := v.initiateFetch()
v.Unlock()
Expand All @@ -269,7 +267,7 @@ const minBackoffRetryInterval = 50 * time.Millisecond
// newBackoff creates a backoff for retrying
func (m *memCache) newBackoff() backoff.Backoff {
min := minBackoffRetryInterval
max := m.latestPriceTTL / 2
max := m.cfg.LatestReportTTL / 2
if min > max {
// avoid setting a min that is greater than max
min = max
Expand All @@ -293,16 +291,16 @@ func (m *memCache) fetch(req *pb.LatestReportRequest, v *cacheVal) {
var val *pb.LatestReportResponse
var err error
defer func() {
v.completeFetch(val, err, t.Add(m.latestPriceTTL))
v.completeFetch(val, err, t.Add(m.cfg.LatestReportTTL))
}()

for {
t = time.Now()

ctx := memcacheCtx
cancel := func() {}
if m.latestReportDeadline > 0 {
ctx, cancel = context.WithTimeoutCause(memcacheCtx, m.latestReportDeadline, errors.New("latest report fetch deadline exceeded"))
if m.cfg.LatestReportDeadline > 0 {
ctx, cancel = context.WithTimeoutCause(memcacheCtx, m.cfg.LatestReportDeadline, errors.New("latest report fetch deadline exceeded"))
}

// NOTE: must drop down to RawClient here otherwise we enter an
Expand Down Expand Up @@ -330,6 +328,7 @@ func (m *memCache) fetch(req *pb.LatestReportRequest, v *cacheVal) {

func (m *memCache) Start(context.Context) error {
return m.StartOnce(m.Name(), func() error {
m.lggr.Debugw("MemCache starting", "config", m.cfg)
m.wg.Add(1)
go m.runloop()
return nil
Expand All @@ -339,16 +338,16 @@ func (m *memCache) Start(context.Context) error {
func (m *memCache) runloop() {
defer m.wg.Done()

if m.maxStaleAge == 0 {
if m.cfg.MaxStaleAge == 0 {
return
}
t := time.NewTicker(utils.WithJitter(m.maxStaleAge))
t := time.NewTicker(utils.WithJitter(m.cfg.MaxStaleAge))

for {
select {
case <-t.C:
m.cleanup()
t.Reset(utils.WithJitter(m.maxStaleAge))
t.Reset(utils.WithJitter(m.cfg.MaxStaleAge))
case <-m.chStop:
return
}
Expand All @@ -372,7 +371,7 @@ func (m *memCache) cleanup() {
// skip cleanup if fetching
return true
}
if time.Now().After(v.expiresAt.Add(m.maxStaleAge)) {
if time.Now().After(v.expiresAt.Add(m.cfg.MaxStaleAge)) {
// garbage collection
m.cache.Delete(k)
}
Expand Down
23 changes: 8 additions & 15 deletions core/services/relay/evm/mercury/wsrpc/cache/cache_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"

"golang.org/x/exp/maps"

Expand All @@ -28,27 +27,26 @@ type cacheSet struct {
lggr logger.Logger
caches map[string]Cache

latestPriceTTL time.Duration
maxStaleAge time.Duration
cfg Config
}

func NewCacheSet(cfg Config) CacheSet {
return newCacheSet(cfg)
func NewCacheSet(lggr logger.Logger, cfg Config) CacheSet {
return newCacheSet(lggr, cfg)
}

func newCacheSet(cfg Config) *cacheSet {
func newCacheSet(lggr logger.Logger, cfg Config) *cacheSet {
return &cacheSet{
sync.RWMutex{},
services.StateMachine{},
cfg.Logger.Named("CacheSet"),
lggr.Named("CacheSet"),
make(map[string]Cache),
cfg.LatestReportTTL,
cfg.MaxStaleAge,
cfg,
}
}

func (cs *cacheSet) Start(context.Context) error {
return cs.StartOnce("CacheSet", func() error {
cs.lggr.Debugw("CacheSet starting", "config", cs.cfg)
return nil
})
}
Expand Down Expand Up @@ -93,12 +91,7 @@ func (cs *cacheSet) get(ctx context.Context, client Client) (Fetcher, error) {
if exists {
return c, nil
}
cfg := Config{
Logger: cs.lggr.With("serverURL", sURL),
LatestReportTTL: cs.latestPriceTTL,
MaxStaleAge: cs.maxStaleAge,
}
c = newMemCache(client, cfg)
c = newMemCache(cs.lggr, client, cs.cfg)
if err := c.Start(ctx); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func Test_CacheSet(t *testing.T) {
lggr := logger.TestLogger(t)
cs := newCacheSet(Config{Logger: lggr})
cs := newCacheSet(lggr, Config{})
ctx := testutils.Context(t)
require.NoError(t, cs.Start(ctx))
t.Cleanup(func() {
Expand Down
22 changes: 12 additions & 10 deletions core/services/relay/evm/mercury/wsrpc/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,33 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb"
)

const neverExpireTTL = 1000 * time.Hour // some massive value that will never expire during a test

func Test_Cache(t *testing.T) {
lggr := logger.TestLogger(t)
client := &mockClient{}
cfg := Config{
Logger: logger.TestLogger(t),
}
cfg := Config{}
ctx := testutils.Context(t)

req1 := &pb.LatestReportRequest{FeedId: []byte{1}}
req2 := &pb.LatestReportRequest{FeedId: []byte{2}}
req3 := &pb.LatestReportRequest{FeedId: []byte{3}}

feedID1Hex := mercuryutils.BytesToFeedID(req1.FeedId).String()

t.Run("errors with nil req", func(t *testing.T) {
c := newMemCache(client, cfg)
c := newMemCache(lggr, client, cfg)

_, err := c.LatestReport(ctx, nil)
assert.EqualError(t, err, "req must not be nil")
})

t.Run("with LatestReportTTL=0 does no caching", func(t *testing.T) {
c := newMemCache(client, cfg)
c := newMemCache(lggr, client, cfg)

req := &pb.LatestReportRequest{}
for i := 0; i < 5; i++ {
Expand All @@ -58,7 +60,7 @@ func Test_Cache(t *testing.T) {
t.Run("caches repeated calls to LatestReport, keyed by request", func(t *testing.T) {
cfg.LatestReportTTL = neverExpireTTL
client.err = nil
c := newMemCache(client, cfg)
c := newMemCache(lggr, client, cfg)

t.Run("if cache is unstarted, returns error", func(t *testing.T) {
// starting the cache is required for state management if we
Expand Down Expand Up @@ -122,8 +124,8 @@ func Test_Cache(t *testing.T) {
})

t.Run("re-queries when a cache item has expired", func(t *testing.T) {
vi, exists := c.cache.Load(req1)
assert.True(t, exists)
vi, exists := c.cache.Load(feedID1Hex)
require.True(t, exists)
v := vi.(*cacheVal)
v.expiresAt = time.Now().Add(-1 * time.Second)

Expand Down Expand Up @@ -167,7 +169,7 @@ func Test_Cache(t *testing.T) {
})

t.Run("timeouts", func(t *testing.T) {
c := newMemCache(client, cfg)
c := newMemCache(lggr, client, cfg)
// simulate fetch already executing in background
v := &cacheVal{
fetching: true,
Expand All @@ -176,7 +178,7 @@ func Test_Cache(t *testing.T) {
err: nil,
expiresAt: time.Now().Add(-1 * time.Second),
}
c.cache.Store(req1, v)
c.cache.Store(feedID1Hex, v)

canceledCtx, cancel := context.WithCancel(testutils.Context(t))
cancel()
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/mercury/wsrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func Test_Client_LatestReport(t *testing.T) {

t.Run("with cache disabled", func(t *testing.T) {
req := &pb.LatestReportRequest{}
cacheSet := cache.NewCacheSet(cache.Config{LatestReportTTL: 0, Logger: lggr})
cacheSet := cache.NewCacheSet(lggr, cache.Config{LatestReportTTL: 0})
resp := &pb.LatestReportResponse{}

var calls int
Expand Down Expand Up @@ -178,7 +178,7 @@ func Test_Client_LatestReport(t *testing.T) {
t.Run("with caching", func(t *testing.T) {
req := &pb.LatestReportRequest{}
const neverExpireTTL = 1000 * time.Hour // some massive value that will never expire during a test
cacheSet := cache.NewCacheSet(cache.Config{LatestReportTTL: neverExpireTTL, Logger: lggr})
cacheSet := cache.NewCacheSet(lggr, cache.Config{LatestReportTTL: neverExpireTTL})
resp := &pb.LatestReportResponse{}

var calls int
Expand Down
5 changes: 3 additions & 2 deletions core/services/relay/evm/mercury/wsrpc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ type pool struct {
}

func NewPool(lggr logger.Logger, cacheCfg cache.Config) Pool {
p := newPool(lggr.Named("Mercury.WSRPCPool"))
lggr = lggr.Named("Mercury.WSRPCPool")
p := newPool(lggr)
p.newClient = NewClient
p.cacheSet = cache.NewCacheSet(cacheCfg)
p.cacheSet = cache.NewCacheSet(lggr, cacheCfg)
return p
}

Expand Down
Loading