Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offchain - commit store updates fetching optimization #195

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions core/services/ocr2/plugins/ccip/commit_price_updates_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ccip

import (
"context"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
)

type tokenPriceUpdatesCache struct {
mem map[common.Address]update
mu *sync.RWMutex
expiry time.Duration
}

func newTokenPriceUpdatesCache(ctx context.Context, expiry time.Duration) *tokenPriceUpdatesCache {
c := &tokenPriceUpdatesCache{
mem: make(map[common.Address]update),
mu: &sync.RWMutex{},
expiry: expiry,
}
go c.expirationWorker(ctx)
return c
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *tokenPriceUpdatesCache) expirationWorker(ctx context.Context) {
tick := time.NewTicker(c.expiry)

for {
select {
case <-ctx.Done():
return
case <-tick.C:
c.mu.Lock()
c.mem = make(map[common.Address]update)
c.mu.Unlock()
}
}
}

func (c *tokenPriceUpdatesCache) mostRecentTs() time.Time {
c.mu.RLock()
defer c.mu.RUnlock()

ts := time.Time{}
for _, upd := range c.mem {
if upd.timestamp.After(ts) {
ts = upd.timestamp
}
}
return ts
}

func (c *tokenPriceUpdatesCache) updateIfMoreRecent(ts time.Time, tk common.Address, val *big.Int) bool {
c.mu.RLock()
v, exists := c.mem[tk]
c.mu.RUnlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we keep the lock over the entire method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the lock over the entire method we will have a deadlock.

If we use only a Lock() over the entire method, instead of both RLock() for read and Lock() for updates then it's slightly less efficient.


if !exists || v.timestamp.Before(ts) {
c.mu.Lock()
c.mem[tk] = update{timestamp: ts, value: val}
c.mu.Unlock()
return true
}

return false
}

func (c *tokenPriceUpdatesCache) get() map[common.Address]update {
c.mu.RLock()
defer c.mu.RUnlock()
cp := make(map[common.Address]update, len(c.mem))
for k, v := range c.mem {
cp[k] = v
}
return cp
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ccip

import (
"context"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
)

func Test_tokenPriceUpdatesCache(t *testing.T) {
ctx := context.Background()

tk := common.HexToAddress("1")
ts := time.Now().Truncate(time.Second)

t.Run("base", func(t *testing.T) {
c := newTokenPriceUpdatesCache(ctx, time.Minute)
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, time.Time{}, c.mostRecentTs())

c.updateIfMoreRecent(ts, tk, big.NewInt(100))
assert.Equal(t, ts, c.mostRecentTs())
v := c.get()
assert.Equal(t, big.NewInt(100), v[tk].value)

// should not get updated if ts is older
c.updateIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101))
v = c.get()
assert.Equal(t, big.NewInt(100), v[tk].value)
})

t.Run("test expiration", func(t *testing.T) {
c := newTokenPriceUpdatesCache(ctx, 200*time.Nanosecond) // every 1ns cache expires
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, time.Time{}, c.mostRecentTs())
c.updateIfMoreRecent(ts, tk, big.NewInt(100))
time.Sleep(5 * time.Millisecond)
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, time.Time{}, c.mostRecentTs()) // should have expired
assert.Len(t, c.get(), 0)
})

t.Run("test expiration worker cancellation", func(t *testing.T) {
ctx, cf := context.WithCancel(context.Background())
c := newTokenPriceUpdatesCache(ctx, time.Nanosecond) // every 1ns cache expires
cf() // stop the cancellation worker
c.updateIfMoreRecent(ts, tk, big.NewInt(100))
time.Sleep(10 * time.Nanosecond)
assert.Equal(t, ts, c.mostRecentTs()) // should not have expired, since worker stopped
assert.Len(t, c.get(), 1)
})
}
38 changes: 21 additions & 17 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ type CommitReportingPlugin struct {
// Offchain
priceGetter pricegetter.PriceGetter
// State
inflightReports *inflightCommitReportsContainer
inflightReports *inflightCommitReportsContainer
tokenPriceUpdatesCache *tokenPriceUpdatesCache
}

type CommitReportingPluginFactory struct {
Expand Down Expand Up @@ -159,7 +160,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
rf.config.destClient,
int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth),
),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
tokenPriceUpdatesCache: newTokenPriceUpdatesCache(context.Background(), time.Hour),
},
types.ReportingPluginInfo{
Name: "CCIPCommit",
Expand Down Expand Up @@ -360,27 +362,29 @@ func calculateUsdPer1e18TokenAmount(price *big.Int, decimals uint8) *big.Int {
// Gets the latest token price updates based on logs within the heartbeat
// The updates returned by this function are guaranteed to not contain nil values.
func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time, checkInflight bool) (map[common.Address]update, error) {
tokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter(
ctx,
now.Add(-r.offchainConfig.TokenPriceHeartBeat),
0,
)
ts := now.Add(-r.offchainConfig.TokenPriceHeartBeat)
if mostRecentCachedTs := r.tokenPriceUpdatesCache.mostRecentTs(); mostRecentCachedTs.After(ts) {
ts = mostRecentCachedTs
}

newTokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter(ctx, ts, 0)
if err != nil {
return nil, err
}

latestUpdates := make(map[common.Address]update)
for _, tokenUpdate := range tokenPriceUpdates {
priceUpdate := tokenUpdate.Data
// Ordered by ascending timestamps
timestamp := time.Unix(priceUpdate.Timestamp.Int64(), 0)
if priceUpdate.Value != nil && !timestamp.Before(latestUpdates[priceUpdate.Token].timestamp) {
latestUpdates[priceUpdate.Token] = update{
timestamp: timestamp,
value: priceUpdate.Value,
}
for _, upd := range newTokenPriceUpdates {
if upd.Data.Value == nil {
continue
}

r.tokenPriceUpdatesCache.updateIfMoreRecent(
time.Unix(upd.Data.Timestamp.Int64(), 0),
upd.Data.Token,
upd.Data.Value,
)
}

latestUpdates := r.tokenPriceUpdatesCache.get()
if !checkInflight {
return latestUpdates, nil
}
Expand Down
68 changes: 68 additions & 0 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
p.gasPriceEstimator = gasPriceEstimator
p.offchainConfig.GasPriceHeartBeat = gasPriceHeartBeat.Duration()
p.commitStoreReader = commitStoreReader
p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache(context.Background(), time.Hour)
p.F = tc.f

aos := make([]types.AttributedObservation, 0, len(tc.observations))
Expand Down Expand Up @@ -1506,6 +1507,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) {
//_, priceRegAddr := testhelpers.NewFakePriceRegistry(t)
priceReg := ccipdata.NewMockPriceRegistryReader(t)
p.destPriceRegistryReader = priceReg
p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache(context.Background(), time.Hour)

//destReader := ccipdata.NewMockReader(t)
var events []ccipdata.Event[ccipdata.TokenPriceUpdate]
Expand Down Expand Up @@ -1545,6 +1547,72 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) {

}

func TestCommitReportingPlugin_getLatestTokenPriceUpdates_cache(t *testing.T) {
priceReg := ccipdata.NewMockPriceRegistryReader(t)
p := &CommitReportingPlugin{
tokenPriceUpdatesCache: newTokenPriceUpdatesCache(context.Background(), time.Hour),
destPriceRegistryReader: priceReg,
offchainConfig: ccipdata.CommitOffchainConfig{
TokenPriceHeartBeat: 12 * time.Hour,
},
}

twoHoursAgo := time.Now().Add(-2 * time.Hour)
threeHoursAgo := time.Now().Add(-3 * time.Hour)
fourHoursAgo := time.Now().Add(-4 * time.Hour)

tk1 := utils.RandomAddress()
now := time.Now()

onChainUpdates := []ccipdata.Event[ccipdata.TokenPriceUpdate]{
{
Data: ccipdata.TokenPriceUpdate{
TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(100)},
Timestamp: big.NewInt(0).SetInt64(fourHoursAgo.Unix()),
},
},
{
Data: ccipdata.TokenPriceUpdate{
TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(102)},
Timestamp: big.NewInt(0).SetInt64(twoHoursAgo.Unix()),
},
},
{
Data: ccipdata.TokenPriceUpdate{
TokenPrice: ccipdata.TokenPrice{Token: tk1, Value: big.NewInt(101)},
Timestamp: big.NewInt(0).SetInt64(threeHoursAgo.Unix()),
},
},
}
rand.Shuffle(len(onChainUpdates), func(i, j int) { onChainUpdates[i], onChainUpdates[j] = onChainUpdates[j], onChainUpdates[i] })
priceReg.On(
"GetTokenPriceUpdatesCreatedAfter",
mock.Anything,
now.Add(-p.offchainConfig.TokenPriceHeartBeat), // first call should pass the token price heart beat duration
0,
).Return(onChainUpdates, nil).Once()

priceUpdates, err := p.getLatestTokenPriceUpdates(context.Background(), now, false)
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
assert.NoError(t, err)
// we expect to get only one update, since all three updates above are for the same token
assert.Len(t, priceUpdates, 1)
// and we expect to get the latest price update
assert.Equal(t, big.NewInt(102), priceUpdates[tk1].value)
assert.Equal(t, twoHoursAgo.Unix(), priceUpdates[tk1].timestamp.Unix())

priceReg.On(
"GetTokenPriceUpdatesCreatedAfter",
mock.Anything,
twoHoursAgo.Truncate(time.Second), // now we expect to ask for price updates after the most recent price update
0,
).Return([]ccipdata.Event[ccipdata.TokenPriceUpdate]{}, nil).Once()
priceUpdates, err = p.getLatestTokenPriceUpdates(context.Background(), now, false)
dimkouv marked this conversation as resolved.
Show resolved Hide resolved
assert.NoError(t, err)
// and we expect to get the exact same price update since there wasn't anything new recorded onchain
assert.Equal(t, big.NewInt(102), priceUpdates[tk1].value)
assert.Equal(t, twoHoursAgo.Unix(), priceUpdates[tk1].timestamp.Unix())
}

func Test_commitReportSize(t *testing.T) {
testParams := gopter.DefaultTestParameters()
testParams.MinSuccessfulTests = 100
Expand Down
Loading