Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 22, 2024
1 parent 7cf02a3 commit 5eed4bb
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 71 deletions.
98 changes: 59 additions & 39 deletions core/services/llo/onchain_channel_definition_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"maps"
"math/big"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -22,13 +23,18 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/channel_config_store"
"github.com/smartcontractkit/chainlink/v2/core/utils"
clhttp "github.com/smartcontractkit/chainlink/v2/core/utils/http"
)

// TODO: MERC-3524
// Move to evm/llo?

const (
// MaxChannelDefinitionsFileSize is a sanity limit to avoid OOM for a
// maliciously large file. It should be much larger than any real expected
Expand All @@ -43,10 +49,8 @@ const (
)

var (
channelConfigStoreABI abi.ABI
topicNewChannelDefinition = (channel_config_store.ChannelConfigStoreNewChannelDefinition{}).Topic()

allTopics = []common.Hash{topicNewChannelDefinition}
channelConfigStoreABI abi.ABI
NewChannelDefinition = (channel_config_store.ChannelConfigStoreNewChannelDefinition{}).Topic()
)

func init() {
Expand All @@ -67,7 +71,7 @@ var _ llotypes.ChannelDefinitionCache = &channelDefinitionCache{}

type LogPoller interface {
LatestBlock(ctx context.Context) (logpoller.LogPollerBlock, error)
LogsWithSigs(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error)
RegisterFilter(ctx context.Context, filter logpoller.Filter) error
UnregisterFilter(ctx context.Context, filterName string) error
}
Expand All @@ -92,6 +96,8 @@ type channelDefinitionCache struct {
logPollInterval time.Duration
addr common.Address
donID uint32
donIDTopic common.Hash
filterExprs []query.Expression
lggr logger.SugaredLogger
initialBlockNum int64

Expand Down Expand Up @@ -121,6 +127,17 @@ func filterName(addr common.Address, donID uint32) string {

func NewChannelDefinitionCache(lggr logger.Logger, orm ChannelDefinitionCacheORM, client HTTPClient, lp logpoller.LogPoller, addr common.Address, donID uint32, fromBlock int64, options ...Option) llotypes.ChannelDefinitionCache {
filterName := logpoller.FilterName("OCR3 LLO ChannelDefinitionCachePoller", addr.String(), donID)
donIDTopic := common.BigToHash(big.NewInt(int64(donID)))

exprs := []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(NewChannelDefinition),
logpoller.NewEventByTopicFilter(1, []logpoller.HashedValueComparator{
{Value: donIDTopic, Operator: primitives.Eq},
}),
query.Confidence(primitives.Unconfirmed), // TODO: Ought it to be primitives.Finalized? MERC-3524
}

cdc := &channelDefinitionCache{
orm: orm,
client: client,
Expand All @@ -130,6 +147,8 @@ func NewChannelDefinitionCache(lggr logger.Logger, orm ChannelDefinitionCacheORM
logPollInterval: defaultLogPollInterval,
addr: addr,
donID: donID,
donIDTopic: donIDTopic,
filterExprs: exprs,
lggr: logger.Sugared(lggr).Named("ChannelDefinitionCache").With("addr", addr, "fromBlock", fromBlock),
newLogCh: make(chan *channel_config_store.ChannelConfigStoreNewChannelDefinition, 1),
initialBlockNum: fromBlock,
Expand All @@ -144,8 +163,7 @@ func NewChannelDefinitionCache(lggr logger.Logger, orm ChannelDefinitionCacheORM
func (c *channelDefinitionCache) Start(ctx context.Context) error {
// Initial load from DB, then async poll from chain thereafter
return c.StartOnce("ChannelDefinitionCache", func() (err error) {
donIDTopic := common.BigToHash(big.NewInt(int64(c.donID)))
err = c.lp.RegisterFilter(ctx, logpoller.Filter{Name: c.filterName, EventSigs: allTopics, Topic2: []common.Hash{donIDTopic}, Addresses: []common.Address{c.addr}})
err = c.lp.RegisterFilter(ctx, logpoller.Filter{Name: c.filterName, EventSigs: []common.Hash{NewChannelDefinition}, Topic2: []common.Hash{c.donIDTopic}, Addresses: []common.Address{c.addr}})
if err != nil {
return err
}
Expand Down Expand Up @@ -217,47 +235,49 @@ func (c *channelDefinitionCache) readLogs(ctx context.Context) (err error) {
}

// NOTE: We assume that log poller returns logs in order of block_num, log_index ASC
// TODO: Could improve performance a little bit here by adding a don ID topic filter
// MERC-3524
logs, err := c.lp.LogsWithSigs(ctx, fromBlock, toBlock, allTopics, c.addr)
exprs := append(c.filterExprs,
query.Block(strconv.FormatInt(fromBlock, 10), primitives.Gte),
query.Block(strconv.FormatInt(toBlock, 10), primitives.Lte),
)

// FIXME: Is the sorting correct?
logs, err := c.lp.FilteredLogs(ctx, exprs, query.LimitAndSort{}, "ChannelDefinitionCachePoller - NewChannelDefinition")
if err != nil {
return err
}

for _, log := range logs {
switch log.EventSig {
case topicNewChannelDefinition:
unpacked := new(channel_config_store.ChannelConfigStoreNewChannelDefinition)

err := channelConfigStoreABI.UnpackIntoInterface(unpacked, newChannelDefinitionEventName, log.Data)
if err != nil {
return fmt.Errorf("failed to unpack log data: %w", err)
}
if len(log.Topics) < 2 {
// should never happen but must guard against unexpected panics
c.lggr.Warnw("Log missing expected topics", "log", log)
continue
}
unpacked.DonId = new(big.Int).SetBytes(log.Topics[1])

if unpacked.DonId.Cmp(big.NewInt(int64(c.donID))) != 0 {
// skip logs for other donIDs
continue
}
if log.EventSig != NewChannelDefinition {
// ignore unrecognized logs
continue
}
unpacked := new(channel_config_store.ChannelConfigStoreNewChannelDefinition)

c.newLogMu.Lock()
if c.newLog == nil || unpacked.Version > c.newLog.Version {
// assume that donID is correct due to log poller filtering
c.lggr.Infow("Got new channel definitions from chain", "version", unpacked.Version, "blockNumber", log.BlockNumber, "sha", fmt.Sprintf("%x", unpacked.Sha), "url", unpacked.Url)
c.newLog = unpacked
c.newLogCh <- unpacked
}
c.newLogMu.Unlock()
err := channelConfigStoreABI.UnpackIntoInterface(unpacked, newChannelDefinitionEventName, log.Data)
if err != nil {
return fmt.Errorf("failed to unpack log data: %w", err)
}
if len(log.Topics) < 2 {
// should never happen but must guard against unexpected panics
c.lggr.Warnw("Log missing expected topics", "log", log)
continue
}
unpacked.DonId = new(big.Int).SetBytes(log.Topics[1])

default:
// ignore unrecognized logs
if unpacked.DonId.Cmp(big.NewInt(int64(c.donID))) != 0 {
// skip logs for other donIDs, shouldn't happen given the
// FilterLogs call, but belts and braces
continue
}

c.newLogMu.Lock()
if c.newLog == nil || unpacked.Version > c.newLog.Version {
c.lggr.Infow("Got new channel definitions from chain", "version", unpacked.Version, "blockNumber", log.BlockNumber, "sha", fmt.Sprintf("%x", unpacked.Sha), "url", unpacked.Url)
c.newLog = unpacked
c.newLogCh <- unpacked
}
c.newLogMu.Unlock()

}

return nil
Expand Down
2 changes: 0 additions & 2 deletions core/services/ocr2/plugins/llo/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func (p PluginConfig) Validate() (merr error) {
if err := json.Unmarshal([]byte(p.ChannelDefinitions), &cd); err != nil {
merr = errors.Join(merr, fmt.Errorf("channelDefinitions is invalid JSON: %w", err))
}
// TODO: Verify Opts format here?
// MERC-3524
} else {
if p.ChannelDefinitionsContractAddress == (common.Address{}) {
merr = errors.Join(merr, errors.New("llo: ChannelDefinitionsContractAddress is required if ChannelDefinitions is not specified"))
Expand Down
47 changes: 34 additions & 13 deletions core/services/relay/evm/llo/config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"fmt"
"math"
"math/big"
"strconv"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/configurator"
Expand All @@ -32,9 +35,9 @@ type ConfigPollerService interface {
}

type LogPoller interface {
IndexedLogsByBlockRange(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash) ([]logpoller.Log, error)
// IndexedLogsByBlockRange(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash) ([]logpoller.Log, error)
LatestBlock(ctx context.Context) (logpoller.LogPollerBlock, error)
LogsWithSigs(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error)
}

// ConfigCache is most likely the global RetirementReportCache. Every config
Expand All @@ -47,11 +50,12 @@ type configPoller struct {
services.Service
eng *services.Engine

lp LogPoller
cc ConfigCache
addr common.Address
donID uint32
donIDHash [32]byte
lp LogPoller
cc ConfigCache
addr common.Address
donID uint32
donIDTopic [32]byte
filterExprs []query.Expression

fromBlock uint64

Expand All @@ -70,12 +74,25 @@ func NewConfigPoller(lggr logger.Logger, lp LogPoller, cc ConfigCache, addr comm
}

func newConfigPoller(lggr logger.Logger, lp LogPoller, cc ConfigCache, addr common.Address, donID uint32, instanceType InstanceType, fromBlock uint64) *configPoller {
donIDTopic := DonIDToBytes32(donID)
exprs := []query.Expression{
logpoller.NewAddressFilter(addr),
query.Or(
logpoller.NewEventSigFilter(ProductionConfigSet),
logpoller.NewEventSigFilter(StagingConfigSet),
),
logpoller.NewEventByTopicFilter(1, []logpoller.HashedValueComparator{
{Value: donIDTopic, Operator: primitives.Eq},
}),
query.Confidence(primitives.Unconfirmed), // TODO: Ought it to be primitives.Finalized? MERC-3524
}
cp := &configPoller{
lp: lp,
cc: cc,
addr: addr,
donID: donID,
donIDHash: DonIDToBytes32(donID),
donIDTopic: DonIDToBytes32(donID),
filterExprs: exprs,
instanceType: instanceType,
fromBlock: fromBlock,
}
Expand All @@ -102,16 +119,20 @@ func (cp *configPoller) LatestConfigDetails(ctx context.Context) (changedInBlock
func (cp *configPoller) latestConfig(ctx context.Context, fromBlock, toBlock int64) (latestConfig FullConfigFromLog, latestLog logpoller.Log, err error) {
// Get all config set logs run through them forwards
// TODO: This could probably be optimized with a 'latestBlockNumber' cache or something to avoid reading from `fromBlock` on every call
// TODO: Actually we only care about the latest of each type here
// TODO: Actually we only care about the latest of each type here - two queries?
// MERC-3524
logs, err := cp.lp.LogsWithSigs(ctx, fromBlock, toBlock, []common.Hash{ProductionConfigSet, StagingConfigSet}, cp.addr)
exprs := append(cp.filterExprs,
query.Block(strconv.FormatInt(fromBlock, 10), primitives.Gte),
query.Block(strconv.FormatInt(toBlock, 10), primitives.Lte),
)
logs, err := cp.lp.FilteredLogs(ctx, exprs, query.LimitAndSort{}, "LLOConfigPoller - latestConfig")
if err != nil {
return latestConfig, latestLog, fmt.Errorf("failed to get logs: %w", err)
}
for _, log := range logs {
// TODO: This can be optimized probably by adding donIDHash to the logpoller lookup
// MERC-3524
if !bytes.Equal(log.Topics[1], cp.donIDHash[:]) {
if !bytes.Equal(log.Topics[1], cp.donIDTopic[:]) {
// skip logs for other donIDs, shouldn't happen given the
// FilterLogs call, but belts and braces
continue
}
switch log.EventSig {
Expand Down
55 changes: 40 additions & 15 deletions core/services/relay/evm/llo/should_retire_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package llo
import (
"bytes"
"context"
"math"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
)

type ShouldRetireCacheService interface {
Expand All @@ -25,10 +29,11 @@ type shouldRetireCache struct {
services.Service
eng *services.Engine

lp LogPoller
addr common.Address
donID uint32
donIDHash common.Hash
lp LogPoller
addr common.Address
donID uint32
donIDTopic common.Hash
filterExprs []query.Expression

pollPeriod time.Duration

Expand All @@ -42,13 +47,23 @@ func NewShouldRetireCache(lggr logger.Logger, lp LogPoller, addr common.Address,
}

func newShouldRetireCache(lggr logger.Logger, lp LogPoller, addr common.Address, donID uint32) *shouldRetireCache {
donIDTopic := DonIDToBytes32(donID)
exprs := []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(PromoteStagingConfig),
logpoller.NewEventByTopicFilter(1, []logpoller.HashedValueComparator{
{Value: donIDTopic, Operator: primitives.Eq},
}),
query.Confidence(primitives.Unconfirmed), // TODO: Ought it to be primitives.Finalized? MERC-3524
}
s := &shouldRetireCache{
lp: lp,
addr: addr,
donID: donID,
donIDHash: DonIDToBytes32(donID),
m: make(map[ocrtypes.ConfigDigest]struct{}),
pollPeriod: 1 * time.Second,
lp: lp,
addr: addr,
donID: donID,
donIDTopic: donIDTopic,
filterExprs: exprs,
m: make(map[ocrtypes.ConfigDigest]struct{}),
pollPeriod: 1 * time.Second,
}
s.Service, s.eng = services.Config{
Name: "LLOShouldRetireCache",
Expand Down Expand Up @@ -79,16 +94,26 @@ func (s *shouldRetireCache) start(ctx context.Context) error {

func (s *shouldRetireCache) checkShouldRetire(ctx context.Context) {
fromBlock := s.latestBlockNum + 1
logs, err := s.lp.LogsWithSigs(ctx, fromBlock, math.MaxInt64, []common.Hash{PromoteStagingConfig}, s.addr)

exprs := append(s.filterExprs,
query.Block(strconv.FormatInt(fromBlock, 10), primitives.Gte),
)

logs, err := s.lp.FilteredLogs(ctx, exprs, query.LimitAndSort{}, "ShouldRetireCache - PromoteStagingConfig")
if err != nil {
s.eng.SugaredLogger.Errorw("checkShouldRetire: IndexedLogs", "err", err)
return
}

for _, log := range logs {
// TODO: This can probably be optimized
// MERC-3524
if !bytes.Equal(log.Topics[1], s.donIDHash[:]) {
if log.EventSig != PromoteStagingConfig {
// ignore unrecognized logs
continue
}

if !bytes.Equal(log.Topics[1], s.donIDTopic[:]) {
// skip logs for other donIDs, shouldn't happen given the
// FilterLogs call, but belts and braces
continue
}
digestBytes := log.Topics[2]
Expand Down
2 changes: 0 additions & 2 deletions core/services/relay/evm/llo_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,6 @@ func (w *mercuryConfigPollerWrapper) close() error {
func newLLOConfigPollers(ctx context.Context, lggr logger.Logger, cc llo.ConfigCache, lp logpoller.LogPoller, chainID *big.Int, configuratorAddress common.Address, relayConfig types.RelayConfig) (cps []llo.ConfigPollerService, configDigester ocrtypes.OffchainConfigDigester, err error) {
donID := relayConfig.LLODONID
donIDHash := llo.DonIDToBytes32(donID)
// TODO: Can we auto-detect or verify based on if the contract implements `setConfig` or `setProductionConfig` interfaces?
// MERC-3524
switch relayConfig.LLOConfigMode {
case types.LLOConfigModeMercury:
// NOTE: This uses the old config digest prefix for compatibility with legacy contracts
Expand Down

0 comments on commit 5eed4bb

Please sign in to comment.