Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FUN-973] s4 snapshot caching #12275

Merged
merged 10 commits into from
Mar 11, 2024
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/functions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
// Create all OCR2 plugin Oracles and all extra services needed to run a Functions job.
func NewFunctionsServices(ctx context.Context, functionsOracleArgs, thresholdOracleArgs, s4OracleArgs *libocr2.OCR2OracleArgs, conf *FunctionsServicesConfig) ([]job.ServiceCtx, error) {
pluginORM := functions.NewORM(conf.DB, conf.Logger, conf.QConfig, common.HexToAddress(conf.ContractID))
s4ORM := s4.NewPostgresORM(conf.DB, conf.Logger, conf.QConfig, s4.SharedTableName, FunctionsS4Namespace)
s4ORM := s4.NewCachedORMWrapper(s4.NewPostgresORM(conf.DB, conf.Logger, conf.QConfig, s4.SharedTableName, FunctionsS4Namespace), conf.Logger)

var pluginConfig config.PluginConfig
if err := json.Unmarshal(conf.Job.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig); err != nil {
Expand Down
103 changes: 103 additions & 0 deletions core/services/s4/cached_orm_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package s4

import (
"fmt"
"math/big"
"strings"
"time"

"github.com/patrickmn/go-cache"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency must already exist I suppose since I don't see any changes to the go.mod file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! the evmregistry is already using it


ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

const (
// defaultExpiration decides how long info will be valid for.
defaultExpiration = 10 * time.Minute
// cleanupInterval decides when the expired items in cache will be deleted.
cleanupInterval = 5 * time.Minute
)

// CachedORM is a cached orm wrapper that implements the ORM interface.
// It adds a cache layer in order to remove unnecessary preassure to the underlaying implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spelling of pressure

type CachedORM struct {
underlayingORM ORM
cache *cache.Cache
lggr logger.Logger
}

var _ ORM = (*CachedORM)(nil)

func NewCachedORMWrapper(orm ORM, lggr logger.Logger) *CachedORM {
return &CachedORM{
underlayingORM: orm,
cache: cache.New(defaultExpiration, cleanupInterval),
lggr: lggr,
}
}

func (c CachedORM) Get(address *ubig.Big, slotId uint, qopts ...pg.QOpt) (*Row, error) {
return c.underlayingORM.Get(address, slotId, qopts...)
}

func (c CachedORM) Update(row *Row, qopts ...pg.QOpt) error {
err := c.clearCache(row)
if err != nil {
c.lggr.Error("failed to clear cache: %w", err)
}

return c.underlayingORM.Update(row, qopts...)
}

func (c CachedORM) DeleteExpired(limit uint, utcNow time.Time, qopts ...pg.QOpt) (int64, error) {
return c.underlayingORM.DeleteExpired(limit, utcNow, qopts...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should clear the cache here if DeleteExpired removed any entries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 good idea, I'll flush the cache to start fresh given there is no way of knowing what was deleted

}

func (c CachedORM) GetSnapshot(addressRange *AddressRange, qopts ...pg.QOpt) ([]*SnapshotRow, error) {
key := fmt.Sprintf("GetSnapshot_%s_%s", addressRange.MinAddress.String(), addressRange.MaxAddress.String())

cached, found := c.cache.Get(key)
if found {
return cached.([]*SnapshotRow), nil
}

c.lggr.Info("Snapshot not found in cache, fetching it from underlaying implementation")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this could probably be debug level

data, err := c.underlayingORM.GetSnapshot(addressRange, qopts...)
if err != nil {
return nil, err
}
c.cache.Set(key, data, defaultExpiration)

return data, nil
}

func (c CachedORM) GetUnconfirmedRows(limit uint, qopts ...pg.QOpt) ([]*Row, error) {
return c.underlayingORM.GetUnconfirmedRows(limit, qopts...)
}

func (c CachedORM) clearCache(row *Row) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if returning an error from this method has any value. If any of the three error cases below happen, we should simply delete the key and continue iterating in the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense! added the changes 👍🏼

for key := range c.cache.Items() {
keyParts := strings.Split(key, "_")
if len(keyParts) != 3 {
return fmt.Errorf("invalid cache key")
}

minAddress, ok := new(big.Int).SetString(keyParts[1], 10)
if !ok {
return fmt.Errorf("error while converting minAddress string: %s to big.Int ", keyParts[1])
}

maxAddress, ok := new(big.Int).SetString(keyParts[2], 10)
if !ok {
return fmt.Errorf("error while converting minAddress string: %s to big.Int ", keyParts[2])
}

if row.Address.ToInt().Cmp(minAddress) >= 0 && row.Address.ToInt().Cmp(maxAddress) <= 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand a case where this would not be true...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S4 supports snapshot sharding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. I re-read the CLIP and this now makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we want to only delete the cached snapshots that its addressRange contain a specific row. Just to avoid deleting snapshots that are not affected

c.cache.Delete(key)
}
}

return nil
}
272 changes: 272 additions & 0 deletions core/services/s4/cached_orm_wrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
package s4_test

import (
"bytes"
"fmt"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/s4"
"github.com/smartcontractkit/chainlink/v2/core/services/s4/mocks"
)

func TestGetSnapshotEmpty(t *testing.T) {
t.Run("OK-no_rows", func(t *testing.T) {
psqlORM := setupORM(t, "test")
lggr := logger.TestLogger(t)
orm := s4.NewCachedORMWrapper(psqlORM, lggr)

rows, err := orm.GetSnapshot(s4.NewFullAddressRange())
assert.NoError(t, err)
assert.Empty(t, rows)
})
}

func TestGetSnapshotCacheFilled(t *testing.T) {
t.Run("OK_with_rows_already_cached", func(t *testing.T) {
rows := generateTestSnapshotRows(t, 100)

fullAddressRange := s4.NewFullAddressRange()

lggr := logger.TestLogger(t)
underlayingORM := mocks.NewORM(t)
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once()

orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

// first call will go to the underlaying orm implementation to fill the cache
first_snapshot, err := orm.GetSnapshot(fullAddressRange)
assert.NoError(t, err)
assert.Equal(t, len(rows), len(first_snapshot))

// on the second call, the results will come from the cache, if not the mock will return an error because of .Once()
cache_snapshot, err := orm.GetSnapshot(fullAddressRange)
assert.NoError(t, err)
assert.Equal(t, len(rows), len(cache_snapshot))

snapshotRowMap := make(map[string]*s4.SnapshotRow)
for i, sr := range cache_snapshot {
// assuming unique addresses
snapshotRowMap[sr.Address.String()] = cache_snapshot[i]
}

for _, sr := range rows {
snapshotRow, ok := snapshotRowMap[sr.Address.String()]
assert.True(t, ok)
assert.NotNil(t, snapshotRow)
assert.Equal(t, snapshotRow.Address, sr.Address)
assert.Equal(t, snapshotRow.SlotId, sr.SlotId)
assert.Equal(t, snapshotRow.Version, sr.Version)
assert.Equal(t, snapshotRow.Expiration, sr.Expiration)
assert.Equal(t, snapshotRow.Confirmed, sr.Confirmed)
assert.Equal(t, snapshotRow.PayloadSize, sr.PayloadSize)
}
})
}

func TestUpdateInvalidatesSnapshotCache(t *testing.T) {
t.Run("OK-GetSnapshot_cache_invalidated_after_update", func(t *testing.T) {
rows := generateTestSnapshotRows(t, 100)

fullAddressRange := s4.NewFullAddressRange()

lggr := logger.TestLogger(t)
underlayingORM := mocks.NewORM(t)
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once()

orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

// first call will go to the underlaying orm implementation to fill the cache
first_snapshot, err := orm.GetSnapshot(fullAddressRange)
assert.NoError(t, err)
assert.Equal(t, len(rows), len(first_snapshot))

// on the second call, the results will come from the cache, if not the mock will return an error because of .Once()
cache_snapshot, err := orm.GetSnapshot(fullAddressRange)
assert.NoError(t, err)
assert.Equal(t, len(rows), len(cache_snapshot))

// this update call will invalidate the cache
row := &s4.Row{
Address: big.New(common.HexToAddress("0x0000000000000000000000000000000000000000000000000000000000000005").Big()),
SlotId: 1,
Payload: cltest.MustRandomBytes(t, 32),
Version: 1,
Expiration: time.Now().Add(time.Hour).UnixMilli(),
Confirmed: true,
Signature: cltest.MustRandomBytes(t, 32),
}
underlayingORM.On("Update", row).Return(nil).Once()
err = orm.Update(row)
assert.NoError(t, err)

// given the cache was invalidated this request will reach the underlaying orm implementation
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once()
third_snapshot, err := orm.GetSnapshot(fullAddressRange)
assert.NoError(t, err)
assert.Equal(t, len(rows), len(third_snapshot))
})

t.Run("OK-GetSnapshot_cache_not_invalidated_after_update", func(t *testing.T) {
rows := generateTestSnapshotRows(t, 5)

addressRange := &s4.AddressRange{
MinAddress: ubig.New(common.BytesToAddress(bytes.Repeat([]byte{0x00}, common.AddressLength)).Big()),
MaxAddress: ubig.New(common.BytesToAddress(append(bytes.Repeat([]byte{0x00}, common.AddressLength-1), 3)).Big()),
}

lggr := logger.TestLogger(t)
underlayingORM := mocks.NewORM(t)
underlayingORM.On("GetSnapshot", addressRange).Return(rows, nil).Once()

orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

// first call will go to the underlaying orm implementation to fill the cache
first_snapshot, err := orm.GetSnapshot(addressRange)
assert.NoError(t, err)
assert.Equal(t, len(rows), len(first_snapshot))

// on the second call, the results will come from the cache, if not the mock will return an error because of .Once()
cache_snapshot, err := orm.GetSnapshot(addressRange)
assert.NoError(t, err)
assert.Equal(t, len(rows), len(cache_snapshot))

// this update call wont invalidate the cache because the address is out of the cache address range
outOfCachedRangeAddress := ubig.New(common.BytesToAddress(append(bytes.Repeat([]byte{0x00}, common.AddressLength-1), 5)).Big())
row := &s4.Row{
Address: outOfCachedRangeAddress,
SlotId: 1,
Payload: cltest.MustRandomBytes(t, 32),
Version: 1,
Expiration: time.Now().Add(time.Hour).UnixMilli(),
Confirmed: true,
Signature: cltest.MustRandomBytes(t, 32),
}
underlayingORM.On("Update", row).Return(nil).Once()
err = orm.Update(row)
assert.NoError(t, err)

// given the cache was not invalidated this request wont reach the underlaying orm implementation
third_snapshot, err := orm.GetSnapshot(addressRange)
assert.NoError(t, err)
assert.Equal(t, len(rows), len(third_snapshot))
})
}

func TestGet(t *testing.T) {
address := big.New(testutils.NewAddress().Big())
var slotID uint = 1

lggr := logger.TestLogger(t)

t.Run("OK-Get_underlaying_ORM_returns_a_row", func(t *testing.T) {
underlayingORM := mocks.NewORM(t)
expectedRow := &s4.Row{
Address: address,
SlotId: slotID,
}
underlayingORM.On("Get", address, slotID).Return(expectedRow, nil).Once()
orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

row, err := orm.Get(address, slotID)
require.NoError(t, err)
require.Equal(t, expectedRow, row)
})
t.Run("NOK-Get_underlaying_ORM_returns_an_error", func(t *testing.T) {
underlayingORM := mocks.NewORM(t)
underlayingORM.On("Get", address, slotID).Return(nil, fmt.Errorf("some_error")).Once()
orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

row, err := orm.Get(address, slotID)
require.Nil(t, row)
require.EqualError(t, err, "some_error")
})
}

func TestDeletedExpired(t *testing.T) {
var limit uint = 1
now := time.Now()

lggr := logger.TestLogger(t)

t.Run("OK-DeletedExpired_underlaying_ORM_returns_a_row", func(t *testing.T) {
var expectedDeleted int64 = 10
underlayingORM := mocks.NewORM(t)
underlayingORM.On("DeleteExpired", limit, now).Return(expectedDeleted, nil).Once()
orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

actualDeleted, err := orm.DeleteExpired(limit, now)
require.NoError(t, err)
require.Equal(t, expectedDeleted, actualDeleted)
})
t.Run("NOK-DeletedExpired_underlaying_ORM_returns_an_error", func(t *testing.T) {
var expectedDeleted int64
underlayingORM := mocks.NewORM(t)
underlayingORM.On("DeleteExpired", limit, now).Return(expectedDeleted, fmt.Errorf("some_error")).Once()
orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

actualDeleted, err := orm.DeleteExpired(limit, now)
require.EqualError(t, err, "some_error")
require.Equal(t, expectedDeleted, actualDeleted)
})
}

// GetUnconfirmedRows(limit uint, qopts ...pg.QOpt) ([]*Row, error)
func TestGetUnconfirmedRows(t *testing.T) {
var limit uint = 1
lggr := logger.TestLogger(t)

t.Run("OK-GetUnconfirmedRows_underlaying_ORM_returns_a_row", func(t *testing.T) {
address := big.New(testutils.NewAddress().Big())
var slotID uint = 1

expectedRow := []*s4.Row{{
Address: address,
SlotId: slotID,
}}
underlayingORM := mocks.NewORM(t)
underlayingORM.On("GetUnconfirmedRows", limit).Return(expectedRow, nil).Once()
orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

actualRow, err := orm.GetUnconfirmedRows(limit)
require.NoError(t, err)
require.Equal(t, expectedRow, actualRow)
})
t.Run("NOK-GetUnconfirmedRows_underlaying_ORM_returns_an_error", func(t *testing.T) {
underlayingORM := mocks.NewORM(t)
underlayingORM.On("GetUnconfirmedRows", limit).Return(nil, fmt.Errorf("some_error")).Once()
orm := s4.NewCachedORMWrapper(underlayingORM, lggr)

actualRow, err := orm.GetUnconfirmedRows(limit)
require.Nil(t, actualRow)
require.EqualError(t, err, "some_error")
})
}

func generateTestSnapshotRows(t *testing.T, n int) []*s4.SnapshotRow {
t.Helper()

rows := make([]*s4.SnapshotRow, n)
for i := 0; i < n; i++ {
row := &s4.SnapshotRow{
Address: big.New(testutils.NewAddress().Big()),
SlotId: 1,
PayloadSize: 32,
Version: 1 + uint64(i),
Expiration: time.Now().Add(time.Hour).UnixMilli(),
Confirmed: i%2 == 0,
}
rows[i] = row
}

return rows
}
Loading