Skip to content

Commit

Permalink
Optimize log poller calls
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 25, 2024
1 parent 20308b8 commit c88664c
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 84 deletions.
98 changes: 59 additions & 39 deletions core/services/llo/onchain_channel_definition_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"maps"
"math/big"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -22,13 +23,18 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/channel_config_store"
"github.com/smartcontractkit/chainlink/v2/core/utils"
clhttp "github.com/smartcontractkit/chainlink/v2/core/utils/http"
)

// TODO: MERC-3524
// Move to evm/llo?

const (
// MaxChannelDefinitionsFileSize is a sanity limit to avoid OOM for a
// maliciously large file. It should be much larger than any real expected
Expand All @@ -43,10 +49,10 @@ const (
)

var (
channelConfigStoreABI abi.ABI
topicNewChannelDefinition = (channel_config_store.ChannelConfigStoreNewChannelDefinition{}).Topic()
channelConfigStoreABI abi.ABI
NewChannelDefinition = (channel_config_store.ChannelConfigStoreNewChannelDefinition{}).Topic()

allTopics = []common.Hash{topicNewChannelDefinition}
NoLimitSortAsc = query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Asc))
)

func init() {
Expand All @@ -67,7 +73,7 @@ var _ llotypes.ChannelDefinitionCache = &channelDefinitionCache{}

type LogPoller interface {
LatestBlock(ctx context.Context) (logpoller.LogPollerBlock, error)
LogsWithSigs(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error)
RegisterFilter(ctx context.Context, filter logpoller.Filter) error
UnregisterFilter(ctx context.Context, filterName string) error
}
Expand All @@ -92,6 +98,8 @@ type channelDefinitionCache struct {
logPollInterval time.Duration
addr common.Address
donID uint32
donIDTopic common.Hash
filterExprs []query.Expression
lggr logger.SugaredLogger
initialBlockNum int64

Expand Down Expand Up @@ -121,6 +129,17 @@ func filterName(addr common.Address, donID uint32) string {

func NewChannelDefinitionCache(lggr logger.Logger, orm ChannelDefinitionCacheORM, client HTTPClient, lp logpoller.LogPoller, addr common.Address, donID uint32, fromBlock int64, options ...Option) llotypes.ChannelDefinitionCache {
filterName := logpoller.FilterName("OCR3 LLO ChannelDefinitionCachePoller", addr.String(), donID)
donIDTopic := common.BigToHash(big.NewInt(int64(donID)))

exprs := []query.Expression{
logpoller.NewAddressFilter(addr),
logpoller.NewEventSigFilter(NewChannelDefinition),
logpoller.NewEventByTopicFilter(1, []logpoller.HashedValueComparator{
{Value: donIDTopic, Operator: primitives.Eq},
}),
query.Confidence(primitives.Unconfirmed), // TODO: Ought it to be primitives.Finalized? MERC-3524
}

cdc := &channelDefinitionCache{
orm: orm,
client: client,
Expand All @@ -130,6 +149,8 @@ func NewChannelDefinitionCache(lggr logger.Logger, orm ChannelDefinitionCacheORM
logPollInterval: defaultLogPollInterval,
addr: addr,
donID: donID,
donIDTopic: donIDTopic,
filterExprs: exprs,
lggr: logger.Sugared(lggr).Named("ChannelDefinitionCache").With("addr", addr, "fromBlock", fromBlock),
newLogCh: make(chan *channel_config_store.ChannelConfigStoreNewChannelDefinition, 1),
initialBlockNum: fromBlock,
Expand All @@ -144,8 +165,7 @@ func NewChannelDefinitionCache(lggr logger.Logger, orm ChannelDefinitionCacheORM
func (c *channelDefinitionCache) Start(ctx context.Context) error {
// Initial load from DB, then async poll from chain thereafter
return c.StartOnce("ChannelDefinitionCache", func() (err error) {
donIDTopic := common.BigToHash(big.NewInt(int64(c.donID)))
err = c.lp.RegisterFilter(ctx, logpoller.Filter{Name: c.filterName, EventSigs: allTopics, Topic2: []common.Hash{donIDTopic}, Addresses: []common.Address{c.addr}})
err = c.lp.RegisterFilter(ctx, logpoller.Filter{Name: c.filterName, EventSigs: []common.Hash{NewChannelDefinition}, Topic2: []common.Hash{c.donIDTopic}, Addresses: []common.Address{c.addr}})
if err != nil {
return err
}
Expand Down Expand Up @@ -216,48 +236,48 @@ func (c *channelDefinitionCache) readLogs(ctx context.Context) (err error) {
return nil
}

// NOTE: We assume that log poller returns logs in order of block_num, log_index ASC
// TODO: Could improve performance a little bit here by adding a don ID topic filter
// MERC-3524
logs, err := c.lp.LogsWithSigs(ctx, fromBlock, toBlock, allTopics, c.addr)
exprs := append(c.filterExprs,
query.Block(strconv.FormatInt(fromBlock, 10), primitives.Gte),
query.Block(strconv.FormatInt(toBlock, 10), primitives.Lte),
)

logs, err := c.lp.FilteredLogs(ctx, exprs, NoLimitSortAsc, "ChannelDefinitionCachePoller - NewChannelDefinition")
if err != nil {
return err
}

for _, log := range logs {
switch log.EventSig {
case topicNewChannelDefinition:
unpacked := new(channel_config_store.ChannelConfigStoreNewChannelDefinition)

err := channelConfigStoreABI.UnpackIntoInterface(unpacked, newChannelDefinitionEventName, log.Data)
if err != nil {
return fmt.Errorf("failed to unpack log data: %w", err)
}
if len(log.Topics) < 2 {
// should never happen but must guard against unexpected panics
c.lggr.Warnw("Log missing expected topics", "log", log)
continue
}
unpacked.DonId = new(big.Int).SetBytes(log.Topics[1])

if unpacked.DonId.Cmp(big.NewInt(int64(c.donID))) != 0 {
// skip logs for other donIDs
continue
}
if log.EventSig != NewChannelDefinition {
// ignore unrecognized logs
continue
}
unpacked := new(channel_config_store.ChannelConfigStoreNewChannelDefinition)

c.newLogMu.Lock()
if c.newLog == nil || unpacked.Version > c.newLog.Version {
// assume that donID is correct due to log poller filtering
c.lggr.Infow("Got new channel definitions from chain", "version", unpacked.Version, "blockNumber", log.BlockNumber, "sha", fmt.Sprintf("%x", unpacked.Sha), "url", unpacked.Url)
c.newLog = unpacked
c.newLogCh <- unpacked
}
c.newLogMu.Unlock()
err := channelConfigStoreABI.UnpackIntoInterface(unpacked, newChannelDefinitionEventName, log.Data)
if err != nil {
return fmt.Errorf("failed to unpack log data: %w", err)
}
if len(log.Topics) < 2 {
// should never happen but must guard against unexpected panics
c.lggr.Warnw("Log missing expected topics", "log", log)
continue
}
unpacked.DonId = new(big.Int).SetBytes(log.Topics[1])

default:
// ignore unrecognized logs
if unpacked.DonId.Cmp(big.NewInt(int64(c.donID))) != 0 {
// skip logs for other donIDs, shouldn't happen given the
// FilterLogs call, but belts and braces
continue
}

c.newLogMu.Lock()
if c.newLog == nil || unpacked.Version > c.newLog.Version {
c.lggr.Infow("Got new channel definitions from chain", "version", unpacked.Version, "blockNumber", log.BlockNumber, "sha", fmt.Sprintf("%x", unpacked.Sha), "url", unpacked.Url)
c.newLog = unpacked
c.newLogCh <- unpacked
}
c.newLogMu.Unlock()

}

return nil
Expand Down
7 changes: 2 additions & 5 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,10 +883,7 @@ func (d *Delegate) newServicesMercury(
return nil, errors.New("could not coerce PluginProvider to MercuryProvider")
}

// HACK: We need fast config switchovers because they create downtime. This
// won't be properly resolved until we implement blue-green deploys:
// https://smartcontract-it.atlassian.net/browse/MERC-3386
lc.ContractConfigTrackerPollInterval = 1 * time.Second // Mercury requires a fast poll interval, this is the fastest that libocr supports. See: https://github.com/smartcontractkit/offchain-reporting/pull/520
lc.ContractConfigTrackerPollInterval = 1 * time.Second // This is the fastest that libocr supports. See: https://github.com/smartcontractkit/offchain-reporting/pull/520

ocrLogger := ocrcommon.NewOCRWrapper(lggr, d.cfg.OCR2().TraceLogging(), func(ctx context.Context, msg string) {
lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error")
Expand Down Expand Up @@ -1005,7 +1002,7 @@ func (d *Delegate) newServicesLLO(

// Use the default key bundle if not specified
// NOTE: Only JSON and EVMPremiumLegacy supported for now
// https://smartcontract-it.atlassian.net/browse/MERC-3722
// TODO: MERC-3594
//
// Also re-use EVM keys for signing the retirement report. This isn't
// required, just seems easiest since it's the only key type available for
Expand Down
2 changes: 0 additions & 2 deletions core/services/ocr2/plugins/llo/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func (p PluginConfig) Validate() (merr error) {
if err := json.Unmarshal([]byte(p.ChannelDefinitions), &cd); err != nil {
merr = errors.Join(merr, fmt.Errorf("channelDefinitions is invalid JSON: %w", err))
}
// TODO: Verify Opts format here?
// MERC-3524
} else {
if p.ChannelDefinitionsContractAddress == (common.Address{}) {
merr = errors.Join(merr, errors.New("llo: ChannelDefinitionsContractAddress is required if ChannelDefinitions is not specified"))
Expand Down
5 changes: 0 additions & 5 deletions core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,6 @@ func (r *Relayer) NewMercuryProvider(ctx context.Context, rargs commontypes.Rela
}
}

// FIXME: We actually know the version here since it's in the feed ID, can
// we use generics to avoid passing three of this?
// https://smartcontract-it.atlassian.net/browse/MERC-1414
reportCodecV1 := reportcodecv1.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV1"))
reportCodecV2 := reportcodecv2.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV2"))
reportCodecV3 := reportcodecv3.NewReportCodec(*relayConfig.FeedID, lggr.Named("ReportCodecV3"))
Expand Down Expand Up @@ -542,8 +539,6 @@ func (r *Relayer) NewLLOProvider(ctx context.Context, rargs commontypes.RelayArg
return nil, pkgerrors.Wrap(err, "failed to get CSA key for mercury connection")
}

// FIXME: Remove after benchmarking is done
// https://smartcontract-it.atlassian.net/browse/MERC-3487
var transmitter LLOTransmitter
if lloCfg.BenchmarkMode {
r.lggr.Info("Benchmark mode enabled, using dummy transmitter. NOTE: THIS WILL NOT TRANSMIT ANYTHING")
Expand Down
55 changes: 39 additions & 16 deletions core/services/relay/evm/llo/config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"fmt"
"math"
"math/big"
"strconv"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/configurator"
Expand All @@ -26,15 +29,18 @@ const (
InstanceTypeGreen InstanceType = InstanceType("Green")
)

var (
NoLimitSortAsc = query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Asc))
)

type ConfigPollerService interface {
services.Service
ocrtypes.ContractConfigTracker
}

type LogPoller interface {
IndexedLogsByBlockRange(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash) ([]logpoller.Log, error)
LatestBlock(ctx context.Context) (logpoller.LogPollerBlock, error)
LogsWithSigs(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error)
}

// ConfigCache is most likely the global RetirementReportCache. Every config
Expand All @@ -47,11 +53,12 @@ type configPoller struct {
services.Service
eng *services.Engine

lp LogPoller
cc ConfigCache
addr common.Address
donID uint32
donIDHash [32]byte
lp LogPoller
cc ConfigCache
addr common.Address
donID uint32
donIDTopic [32]byte
filterExprs []query.Expression

fromBlock uint64

Expand All @@ -70,12 +77,25 @@ func NewConfigPoller(lggr logger.Logger, lp LogPoller, cc ConfigCache, addr comm
}

func newConfigPoller(lggr logger.Logger, lp LogPoller, cc ConfigCache, addr common.Address, donID uint32, instanceType InstanceType, fromBlock uint64) *configPoller {
donIDTopic := DonIDToBytes32(donID)
exprs := []query.Expression{
logpoller.NewAddressFilter(addr),
query.Or(
logpoller.NewEventSigFilter(ProductionConfigSet),
logpoller.NewEventSigFilter(StagingConfigSet),
),
logpoller.NewEventByTopicFilter(1, []logpoller.HashedValueComparator{
{Value: donIDTopic, Operator: primitives.Eq},
}),
query.Confidence(primitives.Unconfirmed), // TODO: Ought it to be primitives.Finalized? MERC-3524
}
cp := &configPoller{
lp: lp,
cc: cc,
addr: addr,
donID: donID,
donIDHash: DonIDToBytes32(donID),
donIDTopic: DonIDToBytes32(donID),
filterExprs: exprs,
instanceType: instanceType,
fromBlock: fromBlock,
}
Expand All @@ -100,18 +120,21 @@ func (cp *configPoller) LatestConfigDetails(ctx context.Context) (changedInBlock
}

func (cp *configPoller) latestConfig(ctx context.Context, fromBlock, toBlock int64) (latestConfig FullConfigFromLog, latestLog logpoller.Log, err error) {
// Get all config set logs run through them forwards
// TODO: This could probably be optimized with a 'latestBlockNumber' cache or something to avoid reading from `fromBlock` on every call
// TODO: Actually we only care about the latest of each type here
// MERC-3524
logs, err := cp.lp.LogsWithSigs(ctx, fromBlock, toBlock, []common.Hash{ProductionConfigSet, StagingConfigSet}, cp.addr)
// Get all configset logs and run through them forwards
// NOTE: It's useful to get _all_ logs rather than just the latest since
// they are stored in the ConfigCache
exprs := append(cp.filterExprs,
query.Block(strconv.FormatInt(fromBlock, 10), primitives.Gte),
query.Block(strconv.FormatInt(toBlock, 10), primitives.Lte),
)
logs, err := cp.lp.FilteredLogs(ctx, exprs, NoLimitSortAsc, "LLOConfigPoller - latestConfig")
if err != nil {
return latestConfig, latestLog, fmt.Errorf("failed to get logs: %w", err)
}
for _, log := range logs {
// TODO: This can be optimized probably by adding donIDHash to the logpoller lookup
// MERC-3524
if !bytes.Equal(log.Topics[1], cp.donIDHash[:]) {
if !bytes.Equal(log.Topics[1], cp.donIDTopic[:]) {
// skip logs for other donIDs, shouldn't happen given the
// FilterLogs call, but belts and braces
continue
}
switch log.EventSig {
Expand Down
Loading

0 comments on commit c88664c

Please sign in to comment.