-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FUN-973] s4 snapshot caching #12275
Changes from 4 commits
29a1558
c676b85
3339fac
d17f14e
2d09526
f2b3e96
0c5752f
72d0822
5fd736c
dd93e2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package s4 | ||
|
||
import ( | ||
"fmt" | ||
"math/big" | ||
"strings" | ||
"time" | ||
|
||
"github.com/patrickmn/go-cache" | ||
|
||
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/pg" | ||
) | ||
|
||
const ( | ||
// defaultExpiration decides how long info will be valid for. | ||
defaultExpiration = 10 * time.Minute | ||
// cleanupInterval decides when the expired items in cache will be deleted. | ||
cleanupInterval = 5 * time.Minute | ||
) | ||
|
||
// CachedORM is a cached orm wrapper that implements the ORM interface. | ||
// It adds a cache layer in order to remove unnecessary preassure to the underlaying implementation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: spelling of |
||
type CachedORM struct { | ||
underlayingORM ORM | ||
cache *cache.Cache | ||
lggr logger.Logger | ||
} | ||
|
||
var _ ORM = (*CachedORM)(nil) | ||
|
||
func NewCachedORMWrapper(orm ORM, lggr logger.Logger) *CachedORM { | ||
return &CachedORM{ | ||
underlayingORM: orm, | ||
cache: cache.New(defaultExpiration, cleanupInterval), | ||
lggr: lggr, | ||
} | ||
} | ||
|
||
func (c CachedORM) Get(address *ubig.Big, slotId uint, qopts ...pg.QOpt) (*Row, error) { | ||
return c.underlayingORM.Get(address, slotId, qopts...) | ||
} | ||
|
||
func (c CachedORM) Update(row *Row, qopts ...pg.QOpt) error { | ||
c.deleteRowFromCache(row) | ||
|
||
return c.underlayingORM.Update(row, qopts...) | ||
} | ||
|
||
func (c CachedORM) DeleteExpired(limit uint, utcNow time.Time, qopts ...pg.QOpt) (int64, error) { | ||
deletedRows, err := c.underlayingORM.DeleteExpired(limit, utcNow, qopts...) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
if deletedRows > 0 { | ||
c.cache.Flush() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to flush the entire cache every time a single row is expired? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. more ctx here but basically its because we know some rows where deleted, but we don't know which ones, so the best effort on keeping the cache sync is to flush it |
||
} | ||
|
||
return deletedRows, nil | ||
} | ||
|
||
func (c CachedORM) GetSnapshot(addressRange *AddressRange, qopts ...pg.QOpt) ([]*SnapshotRow, error) { | ||
key := fmt.Sprintf("GetSnapshot_%s_%s", addressRange.MinAddress.String(), addressRange.MaxAddress.String()) | ||
|
||
cached, found := c.cache.Get(key) | ||
if found { | ||
return cached.([]*SnapshotRow), nil | ||
} | ||
|
||
c.lggr.Info("Snapshot not found in cache, fetching it from underlaying implementation") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: this could probably be debug level |
||
data, err := c.underlayingORM.GetSnapshot(addressRange, qopts...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
c.cache.Set(key, data, defaultExpiration) | ||
|
||
return data, nil | ||
} | ||
|
||
func (c CachedORM) GetUnconfirmedRows(limit uint, qopts ...pg.QOpt) ([]*Row, error) { | ||
return c.underlayingORM.GetUnconfirmedRows(limit, qopts...) | ||
} | ||
|
||
func (c CachedORM) deleteRowFromCache(row *Row) { | ||
for key := range c.cache.Items() { | ||
keyParts := strings.Split(key, "_") | ||
if len(keyParts) != 3 { | ||
c.lggr.Errorf("invalid cache key: %s", key) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still delete the key on every error? We don't want any invalid keys in the cache anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it, I've addressed it on this commit. I changed it slightly to delete only if the key is a GetSnapshot key and it fails to parse the addresses. |
||
continue | ||
} | ||
|
||
minAddress, ok := new(big.Int).SetString(keyParts[1], 10) | ||
if !ok { | ||
c.lggr.Errorf("error while converting minAddress string: %s to big.Int ", keyParts[1]) | ||
continue | ||
} | ||
|
||
maxAddress, ok := new(big.Int).SetString(keyParts[2], 10) | ||
if !ok { | ||
c.lggr.Errorf("error while converting minAddress string: %s to big.Int ", keyParts[2]) | ||
continue | ||
} | ||
|
||
if row.Address.ToInt().Cmp(minAddress) >= 0 && row.Address.ToInt().Cmp(maxAddress) <= 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am trying to understand a case where this would not be true... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. S4 supports snapshot sharding. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha. I re-read the CLIP and this now makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, we want to only delete the cached snapshots that its addressRange contain a specific row. Just to avoid deleting snapshots that are not affected |
||
c.cache.Delete(key) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,272 @@ | ||
package s4_test | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" | ||
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/s4" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/s4/mocks" | ||
) | ||
|
||
func TestGetSnapshotEmpty(t *testing.T) { | ||
t.Run("OK-no_rows", func(t *testing.T) { | ||
psqlORM := setupORM(t, "test") | ||
lggr := logger.TestLogger(t) | ||
orm := s4.NewCachedORMWrapper(psqlORM, lggr) | ||
|
||
rows, err := orm.GetSnapshot(s4.NewFullAddressRange()) | ||
assert.NoError(t, err) | ||
assert.Empty(t, rows) | ||
}) | ||
} | ||
|
||
func TestGetSnapshotCacheFilled(t *testing.T) { | ||
t.Run("OK_with_rows_already_cached", func(t *testing.T) { | ||
rows := generateTestSnapshotRows(t, 100) | ||
|
||
fullAddressRange := s4.NewFullAddressRange() | ||
|
||
lggr := logger.TestLogger(t) | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once() | ||
|
||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
// first call will go to the underlaying orm implementation to fill the cache | ||
first_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(first_snapshot)) | ||
|
||
// on the second call, the results will come from the cache, if not the mock will return an error because of .Once() | ||
cache_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(cache_snapshot)) | ||
|
||
snapshotRowMap := make(map[string]*s4.SnapshotRow) | ||
for i, sr := range cache_snapshot { | ||
// assuming unique addresses | ||
snapshotRowMap[sr.Address.String()] = cache_snapshot[i] | ||
} | ||
|
||
for _, sr := range rows { | ||
snapshotRow, ok := snapshotRowMap[sr.Address.String()] | ||
assert.True(t, ok) | ||
assert.NotNil(t, snapshotRow) | ||
assert.Equal(t, snapshotRow.Address, sr.Address) | ||
assert.Equal(t, snapshotRow.SlotId, sr.SlotId) | ||
assert.Equal(t, snapshotRow.Version, sr.Version) | ||
assert.Equal(t, snapshotRow.Expiration, sr.Expiration) | ||
assert.Equal(t, snapshotRow.Confirmed, sr.Confirmed) | ||
assert.Equal(t, snapshotRow.PayloadSize, sr.PayloadSize) | ||
} | ||
}) | ||
} | ||
|
||
func TestUpdateInvalidatesSnapshotCache(t *testing.T) { | ||
t.Run("OK-GetSnapshot_cache_invalidated_after_update", func(t *testing.T) { | ||
rows := generateTestSnapshotRows(t, 100) | ||
|
||
fullAddressRange := s4.NewFullAddressRange() | ||
|
||
lggr := logger.TestLogger(t) | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once() | ||
|
||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
// first call will go to the underlaying orm implementation to fill the cache | ||
first_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(first_snapshot)) | ||
|
||
// on the second call, the results will come from the cache, if not the mock will return an error because of .Once() | ||
cache_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(cache_snapshot)) | ||
|
||
// this update call will invalidate the cache | ||
row := &s4.Row{ | ||
Address: big.New(common.HexToAddress("0x0000000000000000000000000000000000000000000000000000000000000005").Big()), | ||
SlotId: 1, | ||
Payload: cltest.MustRandomBytes(t, 32), | ||
Version: 1, | ||
Expiration: time.Now().Add(time.Hour).UnixMilli(), | ||
Confirmed: true, | ||
Signature: cltest.MustRandomBytes(t, 32), | ||
} | ||
underlayingORM.On("Update", row).Return(nil).Once() | ||
err = orm.Update(row) | ||
assert.NoError(t, err) | ||
|
||
// given the cache was invalidated this request will reach the underlaying orm implementation | ||
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once() | ||
third_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(third_snapshot)) | ||
}) | ||
|
||
t.Run("OK-GetSnapshot_cache_not_invalidated_after_update", func(t *testing.T) { | ||
rows := generateTestSnapshotRows(t, 5) | ||
|
||
addressRange := &s4.AddressRange{ | ||
MinAddress: ubig.New(common.BytesToAddress(bytes.Repeat([]byte{0x00}, common.AddressLength)).Big()), | ||
MaxAddress: ubig.New(common.BytesToAddress(append(bytes.Repeat([]byte{0x00}, common.AddressLength-1), 3)).Big()), | ||
} | ||
|
||
lggr := logger.TestLogger(t) | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetSnapshot", addressRange).Return(rows, nil).Once() | ||
|
||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
// first call will go to the underlaying orm implementation to fill the cache | ||
first_snapshot, err := orm.GetSnapshot(addressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(first_snapshot)) | ||
|
||
// on the second call, the results will come from the cache, if not the mock will return an error because of .Once() | ||
cache_snapshot, err := orm.GetSnapshot(addressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(cache_snapshot)) | ||
|
||
// this update call wont invalidate the cache because the address is out of the cache address range | ||
outOfCachedRangeAddress := ubig.New(common.BytesToAddress(append(bytes.Repeat([]byte{0x00}, common.AddressLength-1), 5)).Big()) | ||
row := &s4.Row{ | ||
Address: outOfCachedRangeAddress, | ||
SlotId: 1, | ||
Payload: cltest.MustRandomBytes(t, 32), | ||
Version: 1, | ||
Expiration: time.Now().Add(time.Hour).UnixMilli(), | ||
Confirmed: true, | ||
Signature: cltest.MustRandomBytes(t, 32), | ||
} | ||
underlayingORM.On("Update", row).Return(nil).Once() | ||
err = orm.Update(row) | ||
assert.NoError(t, err) | ||
|
||
// given the cache was not invalidated this request wont reach the underlaying orm implementation | ||
third_snapshot, err := orm.GetSnapshot(addressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(third_snapshot)) | ||
}) | ||
} | ||
|
||
func TestGet(t *testing.T) { | ||
address := big.New(testutils.NewAddress().Big()) | ||
var slotID uint = 1 | ||
|
||
lggr := logger.TestLogger(t) | ||
|
||
t.Run("OK-Get_underlaying_ORM_returns_a_row", func(t *testing.T) { | ||
underlayingORM := mocks.NewORM(t) | ||
expectedRow := &s4.Row{ | ||
Address: address, | ||
SlotId: slotID, | ||
} | ||
underlayingORM.On("Get", address, slotID).Return(expectedRow, nil).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
row, err := orm.Get(address, slotID) | ||
require.NoError(t, err) | ||
require.Equal(t, expectedRow, row) | ||
}) | ||
t.Run("NOK-Get_underlaying_ORM_returns_an_error", func(t *testing.T) { | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("Get", address, slotID).Return(nil, fmt.Errorf("some_error")).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
row, err := orm.Get(address, slotID) | ||
require.Nil(t, row) | ||
require.EqualError(t, err, "some_error") | ||
}) | ||
} | ||
|
||
func TestDeletedExpired(t *testing.T) { | ||
var limit uint = 1 | ||
now := time.Now() | ||
|
||
lggr := logger.TestLogger(t) | ||
|
||
t.Run("OK-DeletedExpired_underlaying_ORM_returns_a_row", func(t *testing.T) { | ||
var expectedDeleted int64 = 10 | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("DeleteExpired", limit, now).Return(expectedDeleted, nil).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
actualDeleted, err := orm.DeleteExpired(limit, now) | ||
require.NoError(t, err) | ||
require.Equal(t, expectedDeleted, actualDeleted) | ||
}) | ||
t.Run("NOK-DeletedExpired_underlaying_ORM_returns_an_error", func(t *testing.T) { | ||
var expectedDeleted int64 | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("DeleteExpired", limit, now).Return(expectedDeleted, fmt.Errorf("some_error")).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
actualDeleted, err := orm.DeleteExpired(limit, now) | ||
require.EqualError(t, err, "some_error") | ||
require.Equal(t, expectedDeleted, actualDeleted) | ||
}) | ||
} | ||
|
||
// GetUnconfirmedRows(limit uint, qopts ...pg.QOpt) ([]*Row, error) | ||
func TestGetUnconfirmedRows(t *testing.T) { | ||
var limit uint = 1 | ||
lggr := logger.TestLogger(t) | ||
|
||
t.Run("OK-GetUnconfirmedRows_underlaying_ORM_returns_a_row", func(t *testing.T) { | ||
address := big.New(testutils.NewAddress().Big()) | ||
var slotID uint = 1 | ||
|
||
expectedRow := []*s4.Row{{ | ||
Address: address, | ||
SlotId: slotID, | ||
}} | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetUnconfirmedRows", limit).Return(expectedRow, nil).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
actualRow, err := orm.GetUnconfirmedRows(limit) | ||
require.NoError(t, err) | ||
require.Equal(t, expectedRow, actualRow) | ||
}) | ||
t.Run("NOK-GetUnconfirmedRows_underlaying_ORM_returns_an_error", func(t *testing.T) { | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetUnconfirmedRows", limit).Return(nil, fmt.Errorf("some_error")).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
actualRow, err := orm.GetUnconfirmedRows(limit) | ||
require.Nil(t, actualRow) | ||
require.EqualError(t, err, "some_error") | ||
}) | ||
} | ||
|
||
func generateTestSnapshotRows(t *testing.T, n int) []*s4.SnapshotRow { | ||
t.Helper() | ||
|
||
rows := make([]*s4.SnapshotRow, n) | ||
for i := 0; i < n; i++ { | ||
row := &s4.SnapshotRow{ | ||
Address: big.New(testutils.NewAddress().Big()), | ||
SlotId: 1, | ||
PayloadSize: 32, | ||
Version: 1 + uint64(i), | ||
Expiration: time.Now().Add(time.Hour).UnixMilli(), | ||
Confirmed: i%2 == 0, | ||
} | ||
rows[i] = row | ||
} | ||
|
||
return rows | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency must already exist I suppose since I don't see any changes to the go.mod file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes! the evmregistry is already using it