-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FUN-973] s4 snapshot caching #12275
Changes from 1 commit
29a1558
c676b85
3339fac
d17f14e
2d09526
f2b3e96
0c5752f
72d0822
5fd736c
dd93e2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package s4 | ||
|
||
import ( | ||
"fmt" | ||
"math/big" | ||
"strings" | ||
"time" | ||
|
||
"github.com/patrickmn/go-cache" | ||
|
||
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/pg" | ||
) | ||
|
||
const ( | ||
// defaultExpiration decides how long info will be valid for. | ||
defaultExpiration = 10 * time.Minute | ||
// cleanupInterval decides when the expired items in cache will be deleted. | ||
cleanupInterval = 5 * time.Minute | ||
) | ||
|
||
// CachedORM is a cached orm wrapper that implements the ORM interface. | ||
// It adds a cache layer in order to remove unnecessary preassure to the underlaying implementation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: spelling of |
||
type CachedORM struct { | ||
underlayingORM ORM | ||
cache *cache.Cache | ||
lggr logger.Logger | ||
} | ||
|
||
var _ ORM = (*CachedORM)(nil) | ||
|
||
func NewCachedORMWrapper(orm ORM, lggr logger.Logger) *CachedORM { | ||
return &CachedORM{ | ||
underlayingORM: orm, | ||
cache: cache.New(defaultExpiration, cleanupInterval), | ||
lggr: lggr, | ||
} | ||
} | ||
|
||
func (c CachedORM) Get(address *ubig.Big, slotId uint, qopts ...pg.QOpt) (*Row, error) { | ||
return c.underlayingORM.Get(address, slotId, qopts...) | ||
} | ||
|
||
func (c CachedORM) Update(row *Row, qopts ...pg.QOpt) error { | ||
err := c.clearCache(row) | ||
if err != nil { | ||
c.lggr.Error("failed to clear cache: %w", err) | ||
} | ||
|
||
return c.underlayingORM.Update(row, qopts...) | ||
} | ||
|
||
func (c CachedORM) DeleteExpired(limit uint, utcNow time.Time, qopts ...pg.QOpt) (int64, error) { | ||
return c.underlayingORM.DeleteExpired(limit, utcNow, qopts...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should clear the cache here if DeleteExpired removed any entries. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 good idea, I'll flush the cache to start fresh given there is no way of knowing what was deleted |
||
} | ||
|
||
func (c CachedORM) GetSnapshot(addressRange *AddressRange, qopts ...pg.QOpt) ([]*SnapshotRow, error) { | ||
key := fmt.Sprintf("GetSnapshot_%s_%s", addressRange.MinAddress.String(), addressRange.MaxAddress.String()) | ||
|
||
cached, found := c.cache.Get(key) | ||
if found { | ||
return cached.([]*SnapshotRow), nil | ||
} | ||
|
||
c.lggr.Info("Snapshot not found in cache, fetching it from underlaying implementation") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: this could probably be debug level |
||
data, err := c.underlayingORM.GetSnapshot(addressRange, qopts...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
c.cache.Set(key, data, defaultExpiration) | ||
|
||
return data, nil | ||
} | ||
|
||
func (c CachedORM) GetUnconfirmedRows(limit uint, qopts ...pg.QOpt) ([]*Row, error) { | ||
return c.underlayingORM.GetUnconfirmedRows(limit, qopts...) | ||
} | ||
|
||
func (c CachedORM) clearCache(row *Row) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if returning an error from this method has any value. If any of the three error cases below happen, we should simply delete the key and continue iterating in the loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense! added the changes 👍🏼 |
||
for key := range c.cache.Items() { | ||
keyParts := strings.Split(key, "_") | ||
if len(keyParts) != 3 { | ||
return fmt.Errorf("invalid cache key") | ||
} | ||
|
||
minAddress, ok := new(big.Int).SetString(keyParts[1], 10) | ||
if !ok { | ||
return fmt.Errorf("error while converting minAddress string: %s to big.Int ", keyParts[1]) | ||
} | ||
|
||
maxAddress, ok := new(big.Int).SetString(keyParts[2], 10) | ||
if !ok { | ||
return fmt.Errorf("error while converting minAddress string: %s to big.Int ", keyParts[2]) | ||
} | ||
|
||
if row.Address.ToInt().Cmp(minAddress) >= 0 && row.Address.ToInt().Cmp(maxAddress) <= 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am trying to understand a case where this would not be true... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. S4 supports snapshot sharding. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha. I re-read the CLIP and this now makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, we want to only delete the cached snapshots that its addressRange contain a specific row. Just to avoid deleting snapshots that are not affected |
||
c.cache.Delete(key) | ||
} | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,272 @@ | ||
package s4_test | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" | ||
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/s4" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/s4/mocks" | ||
) | ||
|
||
func TestGetSnapshotEmpty(t *testing.T) { | ||
t.Run("OK-no_rows", func(t *testing.T) { | ||
psqlORM := setupORM(t, "test") | ||
lggr := logger.TestLogger(t) | ||
orm := s4.NewCachedORMWrapper(psqlORM, lggr) | ||
|
||
rows, err := orm.GetSnapshot(s4.NewFullAddressRange()) | ||
assert.NoError(t, err) | ||
assert.Empty(t, rows) | ||
}) | ||
} | ||
|
||
func TestGetSnapshotCacheFilled(t *testing.T) { | ||
t.Run("OK_with_rows_already_cached", func(t *testing.T) { | ||
rows := generateTestSnapshotRows(t, 100) | ||
|
||
fullAddressRange := s4.NewFullAddressRange() | ||
|
||
lggr := logger.TestLogger(t) | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once() | ||
|
||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
// first call will go to the underlaying orm implementation to fill the cache | ||
first_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(first_snapshot)) | ||
|
||
// on the second call, the results will come from the cache, if not the mock will return an error because of .Once() | ||
cache_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(cache_snapshot)) | ||
|
||
snapshotRowMap := make(map[string]*s4.SnapshotRow) | ||
for i, sr := range cache_snapshot { | ||
// assuming unique addresses | ||
snapshotRowMap[sr.Address.String()] = cache_snapshot[i] | ||
} | ||
|
||
for _, sr := range rows { | ||
snapshotRow, ok := snapshotRowMap[sr.Address.String()] | ||
assert.True(t, ok) | ||
assert.NotNil(t, snapshotRow) | ||
assert.Equal(t, snapshotRow.Address, sr.Address) | ||
assert.Equal(t, snapshotRow.SlotId, sr.SlotId) | ||
assert.Equal(t, snapshotRow.Version, sr.Version) | ||
assert.Equal(t, snapshotRow.Expiration, sr.Expiration) | ||
assert.Equal(t, snapshotRow.Confirmed, sr.Confirmed) | ||
assert.Equal(t, snapshotRow.PayloadSize, sr.PayloadSize) | ||
} | ||
}) | ||
} | ||
|
||
func TestUpdateInvalidatesSnapshotCache(t *testing.T) { | ||
t.Run("OK-GetSnapshot_cache_invalidated_after_update", func(t *testing.T) { | ||
rows := generateTestSnapshotRows(t, 100) | ||
|
||
fullAddressRange := s4.NewFullAddressRange() | ||
|
||
lggr := logger.TestLogger(t) | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once() | ||
|
||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
// first call will go to the underlaying orm implementation to fill the cache | ||
first_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(first_snapshot)) | ||
|
||
// on the second call, the results will come from the cache, if not the mock will return an error because of .Once() | ||
cache_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(cache_snapshot)) | ||
|
||
// this update call will invalidate the cache | ||
row := &s4.Row{ | ||
Address: big.New(common.HexToAddress("0x0000000000000000000000000000000000000000000000000000000000000005").Big()), | ||
SlotId: 1, | ||
Payload: cltest.MustRandomBytes(t, 32), | ||
Version: 1, | ||
Expiration: time.Now().Add(time.Hour).UnixMilli(), | ||
Confirmed: true, | ||
Signature: cltest.MustRandomBytes(t, 32), | ||
} | ||
underlayingORM.On("Update", row).Return(nil).Once() | ||
err = orm.Update(row) | ||
assert.NoError(t, err) | ||
|
||
// given the cache was invalidated this request will reach the underlaying orm implementation | ||
underlayingORM.On("GetSnapshot", fullAddressRange).Return(rows, nil).Once() | ||
third_snapshot, err := orm.GetSnapshot(fullAddressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(third_snapshot)) | ||
}) | ||
|
||
t.Run("OK-GetSnapshot_cache_not_invalidated_after_update", func(t *testing.T) { | ||
rows := generateTestSnapshotRows(t, 5) | ||
|
||
addressRange := &s4.AddressRange{ | ||
MinAddress: ubig.New(common.BytesToAddress(bytes.Repeat([]byte{0x00}, common.AddressLength)).Big()), | ||
MaxAddress: ubig.New(common.BytesToAddress(append(bytes.Repeat([]byte{0x00}, common.AddressLength-1), 3)).Big()), | ||
} | ||
|
||
lggr := logger.TestLogger(t) | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetSnapshot", addressRange).Return(rows, nil).Once() | ||
|
||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
// first call will go to the underlaying orm implementation to fill the cache | ||
first_snapshot, err := orm.GetSnapshot(addressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(first_snapshot)) | ||
|
||
// on the second call, the results will come from the cache, if not the mock will return an error because of .Once() | ||
cache_snapshot, err := orm.GetSnapshot(addressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(cache_snapshot)) | ||
|
||
// this update call wont invalidate the cache because the address is out of the cache address range | ||
outOfCachedRangeAddress := ubig.New(common.BytesToAddress(append(bytes.Repeat([]byte{0x00}, common.AddressLength-1), 5)).Big()) | ||
row := &s4.Row{ | ||
Address: outOfCachedRangeAddress, | ||
SlotId: 1, | ||
Payload: cltest.MustRandomBytes(t, 32), | ||
Version: 1, | ||
Expiration: time.Now().Add(time.Hour).UnixMilli(), | ||
Confirmed: true, | ||
Signature: cltest.MustRandomBytes(t, 32), | ||
} | ||
underlayingORM.On("Update", row).Return(nil).Once() | ||
err = orm.Update(row) | ||
assert.NoError(t, err) | ||
|
||
// given the cache was not invalidated this request wont reach the underlaying orm implementation | ||
third_snapshot, err := orm.GetSnapshot(addressRange) | ||
assert.NoError(t, err) | ||
assert.Equal(t, len(rows), len(third_snapshot)) | ||
}) | ||
} | ||
|
||
func TestGet(t *testing.T) { | ||
address := big.New(testutils.NewAddress().Big()) | ||
var slotID uint = 1 | ||
|
||
lggr := logger.TestLogger(t) | ||
|
||
t.Run("OK-Get_underlaying_ORM_returns_a_row", func(t *testing.T) { | ||
underlayingORM := mocks.NewORM(t) | ||
expectedRow := &s4.Row{ | ||
Address: address, | ||
SlotId: slotID, | ||
} | ||
underlayingORM.On("Get", address, slotID).Return(expectedRow, nil).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
row, err := orm.Get(address, slotID) | ||
require.NoError(t, err) | ||
require.Equal(t, expectedRow, row) | ||
}) | ||
t.Run("NOK-Get_underlaying_ORM_returns_an_error", func(t *testing.T) { | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("Get", address, slotID).Return(nil, fmt.Errorf("some_error")).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
row, err := orm.Get(address, slotID) | ||
require.Nil(t, row) | ||
require.EqualError(t, err, "some_error") | ||
}) | ||
} | ||
|
||
func TestDeletedExpired(t *testing.T) { | ||
var limit uint = 1 | ||
now := time.Now() | ||
|
||
lggr := logger.TestLogger(t) | ||
|
||
t.Run("OK-DeletedExpired_underlaying_ORM_returns_a_row", func(t *testing.T) { | ||
var expectedDeleted int64 = 10 | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("DeleteExpired", limit, now).Return(expectedDeleted, nil).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
actualDeleted, err := orm.DeleteExpired(limit, now) | ||
require.NoError(t, err) | ||
require.Equal(t, expectedDeleted, actualDeleted) | ||
}) | ||
t.Run("NOK-DeletedExpired_underlaying_ORM_returns_an_error", func(t *testing.T) { | ||
var expectedDeleted int64 | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("DeleteExpired", limit, now).Return(expectedDeleted, fmt.Errorf("some_error")).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
actualDeleted, err := orm.DeleteExpired(limit, now) | ||
require.EqualError(t, err, "some_error") | ||
require.Equal(t, expectedDeleted, actualDeleted) | ||
}) | ||
} | ||
|
||
// GetUnconfirmedRows(limit uint, qopts ...pg.QOpt) ([]*Row, error) | ||
func TestGetUnconfirmedRows(t *testing.T) { | ||
var limit uint = 1 | ||
lggr := logger.TestLogger(t) | ||
|
||
t.Run("OK-GetUnconfirmedRows_underlaying_ORM_returns_a_row", func(t *testing.T) { | ||
address := big.New(testutils.NewAddress().Big()) | ||
var slotID uint = 1 | ||
|
||
expectedRow := []*s4.Row{{ | ||
Address: address, | ||
SlotId: slotID, | ||
}} | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetUnconfirmedRows", limit).Return(expectedRow, nil).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
actualRow, err := orm.GetUnconfirmedRows(limit) | ||
require.NoError(t, err) | ||
require.Equal(t, expectedRow, actualRow) | ||
}) | ||
t.Run("NOK-GetUnconfirmedRows_underlaying_ORM_returns_an_error", func(t *testing.T) { | ||
underlayingORM := mocks.NewORM(t) | ||
underlayingORM.On("GetUnconfirmedRows", limit).Return(nil, fmt.Errorf("some_error")).Once() | ||
orm := s4.NewCachedORMWrapper(underlayingORM, lggr) | ||
|
||
actualRow, err := orm.GetUnconfirmedRows(limit) | ||
require.Nil(t, actualRow) | ||
require.EqualError(t, err, "some_error") | ||
}) | ||
} | ||
|
||
func generateTestSnapshotRows(t *testing.T, n int) []*s4.SnapshotRow { | ||
t.Helper() | ||
|
||
rows := make([]*s4.SnapshotRow, n) | ||
for i := 0; i < n; i++ { | ||
row := &s4.SnapshotRow{ | ||
Address: big.New(testutils.NewAddress().Big()), | ||
SlotId: 1, | ||
PayloadSize: 32, | ||
Version: 1 + uint64(i), | ||
Expiration: time.Now().Add(time.Hour).UnixMilli(), | ||
Confirmed: i%2 == 0, | ||
} | ||
rows[i] = row | ||
} | ||
|
||
return rows | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency must already exist I suppose since I don't see any changes to the go.mod file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes! the evmregistry is already using it