From 1608eb2844a16136c0236206f6905dc4a6bb46d5 Mon Sep 17 00:00:00 2001 From: Ryan Stout Date: Mon, 15 Jul 2024 14:23:41 -0700 Subject: [PATCH] Add report methods --- commit/plugin.go | 2 +- commit/plugin_functions.go | 2 +- commit/plugin_functions_test.go | 2 +- commit_rmn_ocb/outcome.go | 121 ++++++++++++++----------- commit_rmn_ocb/plugin.go | 57 +++++++++++- commit_rmn_ocb/report.go | 78 ++++++++++++++++ commit_rmn_ocb/types.go | 89 +++++++++++++++--- commit_rmn_ocb/validate_observation.go | 8 +- 8 files changed, 280 insertions(+), 79 deletions(-) create mode 100644 commit_rmn_ocb/report.go diff --git a/commit/plugin.go b/commit/plugin.go index 6816ef9af..31048c542 100644 --- a/commit/plugin.go +++ b/commit/plugin.go @@ -72,7 +72,7 @@ func NewPlugin( p.bgSyncCancelFunc = bgSyncCf p.bgSyncWG = &sync.WaitGroup{} p.bgSyncWG.Add(1) - backgroundReaderSync( + BackgroundReaderSync( bgSyncCtx, p.bgSyncWG, lggr, diff --git a/commit/plugin_functions.go b/commit/plugin_functions.go index 9ec85bd5c..24a2e0a03 100644 --- a/commit/plugin_functions.go +++ b/commit/plugin_functions.go @@ -634,7 +634,7 @@ func validateMerkleRootsState( return true, nil } -func backgroundReaderSync( +func BackgroundReaderSync( ctx context.Context, wg *sync.WaitGroup, lggr logger.Logger, diff --git a/commit/plugin_functions_test.go b/commit/plugin_functions_test.go index 2d2122de6..37860e9ae 100644 --- a/commit/plugin_functions_test.go +++ b/commit/plugin_functions_test.go @@ -1492,7 +1492,7 @@ func Test_backgroundReaderSync(t *testing.T) { wg.Add(1) // start background syncing - backgroundReaderSync(ctx, wg, lggr, reader, syncTimeout, ticker) + BackgroundReaderSync(ctx, wg, lggr, reader, syncTimeout, ticker) // send a tick to trigger the first sync that errors reader.On("Sync", mock.Anything).Return(false, fmt.Errorf("some err")).Once() diff --git a/commit_rmn_ocb/outcome.go b/commit_rmn_ocb/outcome.go index 53104d129..38a41553f 100644 --- a/commit_rmn_ocb/outcome.go +++ b/commit_rmn_ocb/outcome.go @@ -73,10 +73,9 @@ func (p *Plugin) ReportRangesOutcome( "offRampMaxSeqNum", offRampMaxSeqNum, "onRampMaxSeqNum", onRampMaxSeqNum, "chainSelector", chainSel) - continue - } else if offRampMaxSeqNum == onRampMaxSeqNum { - continue - } else { + } + + if offRampMaxSeqNum < onRampMaxSeqNum { chainRange := ChainRange{ ChainSel: chainSel, SeqNumRange: [2]cciptypes.SeqNum{offRampMaxSeqNum, onRampMaxSeqNum}, @@ -85,6 +84,7 @@ func (p *Plugin) ReportRangesOutcome( } } + // TODO: explain this sort.Slice(rangesToReport, func(i, j int) bool { return rangesToReport[i].ChainSel < rangesToReport[j].ChainSel }) outcome := CommitPluginOutcome{ @@ -95,20 +95,55 @@ func (p *Plugin) ReportRangesOutcome( return outcome.Encode() } -// TODO: doc +// TODO: doc, break apart func (p *Plugin) buildReport( query CommitQuery, consensusObservation ConsensusObservation, ) (ocr3types.Outcome, error) { - if query.SignedMerkleRoots == nil || len(query.SignedMerkleRoots) == 0 { - // TODO: metrics - return ocr3types.Outcome{}, fmt.Errorf("buildReport: query.SignedMerkleRoots is empty") + verifiedSignedRoots := p.verifySignedRoots(query.SignedMerkleRoots, consensusObservation.MerkleRoots) + + chainsToExclude := make([]cciptypes.ChainSelector, 0) + for chain := range verifiedSignedRoots { + if _, exists := consensusObservation.GasPrices[chain]; !exists { + // TODO: metrics + p.log.Warnw( + "did not find a consensus gas price for chain %d, excluding it from the report", chain) + chainsToExclude = append(chainsToExclude, chain) + } } - observedMerkleRoots := consensusObservation.MerkleRoots + for _, chainSelector := range chainsToExclude { + delete(verifiedSignedRoots, chainSelector) + } + // TODO: token prices validation + // exclude merkle roots if expected token prices don't exist + + verifiedSignedRootsArray := make([]SignedMerkleRoot, 0, len(verifiedSignedRoots)) + for _, signedRoot := range verifiedSignedRoots { + verifiedSignedRootsArray = append(verifiedSignedRootsArray, signedRoot) + } + + // TODO: explain this + sort.Slice(verifiedSignedRootsArray, func(i, j int) bool { + return verifiedSignedRootsArray[i].chain() < verifiedSignedRootsArray[j].chain() + }) + + return CommitPluginOutcome{ + OutcomeType: ReportGenerated, + SignedRootsToReport: verifiedSignedRootsArray, + GasPrices: consensusObservation.GasPricesSortedArray(), + TokenPrices: consensusObservation.TokenPricesSortedArray(), + }.Encode() +} + +// TODO: doc, break apart +func (p *Plugin) verifySignedRoots( + rmnSignedRoots []SignedMerkleRoot, + observedRoots map[cciptypes.ChainSelector]MerkleRoot, +) map[cciptypes.ChainSelector]SignedMerkleRoot { verifiedSignedRoots := make(map[cciptypes.ChainSelector]SignedMerkleRoot) - for _, signedRoot := range query.SignedMerkleRoots { + for _, signedRoot := range rmnSignedRoots { if err := p.rmn.VerifySignedMerkleRoot(signedRoot); err != nil { // TODO: metrics p.log.Warnw("failed to verify signed merkle root", @@ -117,16 +152,14 @@ func (p *Plugin) buildReport( continue } - if observedMerkleRoot, exists := observedMerkleRoots[signedRoot.chain()]; exists { - // check merkle root equality - if observedMerkleRoot != signedRoot.MerkleRoot { + if observedMerkleRoot, exists := observedRoots[signedRoot.chain()]; exists { + if observedMerkleRoot == signedRoot.MerkleRoot { + verifiedSignedRoots[signedRoot.chain()] = signedRoot + } else { // TODO: metrics p.log.Warnw("observed merkle root does not match merkle root received from RMN", "rmnSignedRoot", signedRoot, "observedMerkleRoot", observedMerkleRoot) - continue - } else { - } } else { // TODO: metrics @@ -134,13 +167,11 @@ func (p *Plugin) buildReport( "received a signed merkle root from RMN for a chain, but did not observe a merkle root for "+ "this chain", "rmnSignedRoot", signedRoot) - continue } - - verifiedSignedRoots[signedRoot.chain()] = signedRoot } - for chain, observedMerkleRoot := range observedMerkleRoots { + // TODO: explain this + for chain, observedMerkleRoot := range observedRoots { if _, exists := verifiedSignedRoots[chain]; !exists { if p.rmn.ChainThreshold(chain) == 0 { verifiedSignedRoots[chain] = SignedMerkleRoot{ @@ -156,23 +187,7 @@ func (p *Plugin) buildReport( } } - chainsToExclude := make([]cciptypes.ChainSelector, 0) - for chain, _ := range verifiedSignedRoots { - if _, exists := consensusObservation.GasPrices[chain]; !exists { - // TODO: metrics - p.log.Warnw( - "did not find a consensus gas price for chain %d, excluding it from the report", chain) - chainsToExclude = append(chainsToExclude, chain) - } - } - - for _, chainSelector := range chainsToExclude { - delete(verifiedSignedRoots, chainSelector) - } - - // TODO: token prices validation - - return nil, nil + return verifiedSignedRoots } // TODO: doc @@ -193,21 +208,21 @@ func (p *Plugin) checkForReportTransmission( if offRampUpdated { return CommitPluginOutcome{ - OutcomeType: CommitPluginOutcomeType(ReportGenerated), + OutcomeType: ReportGenerated, }.Encode() - } else { - if previousOutcome.ReportTransmissionCheckAttempts+1 >= p.cfg.MaxReportTransmissionCheckAttempts { - return CommitPluginOutcome{ - OutcomeType: CommitPluginOutcomeType(ReportNotTransmitted), - }.Encode() - } else { - return CommitPluginOutcome{ - OutcomeType: CommitPluginOutcomeType(ReportNotYetTransmitted), - OffRampMaxSeqNums: previousOutcome.OffRampMaxSeqNums, - ReportTransmissionCheckAttempts: previousOutcome.ReportTransmissionCheckAttempts + 1, - }.Encode() - } } + + if previousOutcome.ReportTransmissionCheckAttempts+1 >= p.cfg.MaxReportTransmissionCheckAttempts { + return CommitPluginOutcome{ + OutcomeType: ReportNotTransmitted, + }.Encode() + } + + return CommitPluginOutcome{ + OutcomeType: ReportNotYetTransmitted, + OffRampMaxSeqNums: previousOutcome.OffRampMaxSeqNums, + ReportTransmissionCheckAttempts: previousOutcome.ReportTransmissionCheckAttempts + 1, + }.Encode() } // getConsensusObservation TODO: doc @@ -401,11 +416,7 @@ func mostFrequentElem[T comparable](elems []T) (T, int) { func counts[T comparable](elems []T) map[T]int { m := make(map[T]int) for _, elem := range elems { - if _, exists := m[elem]; exists { - m[elem]++ - } else { - m[elem] = 1 - } + m[elem]++ } return m diff --git a/commit_rmn_ocb/plugin.go b/commit_rmn_ocb/plugin.go index f8c94d09c..9879831b9 100644 --- a/commit_rmn_ocb/plugin.go +++ b/commit_rmn_ocb/plugin.go @@ -1,7 +1,10 @@ package commitrmnocb import ( + "context" "fmt" + "sync" + "time" mapset "github.com/deckarep/golang-set/v2" "github.com/smartcontractkit/libocr/commontypes" @@ -11,6 +14,8 @@ import ( "github.com/smartcontractkit/chainlink-ccip/internal/reader" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3" + "github.com/smartcontractkit/chainlink-ccip/commit" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) @@ -25,6 +30,8 @@ type Plugin struct { tokenPricesReader reader.TokenPrices ccipReader reader.CCIP homeChain reader.HomeChain + bgSyncCancelFunc context.CancelFunc + bgSyncWG *sync.WaitGroup } func NewPlugin( @@ -39,7 +46,7 @@ func NewPlugin( ccipReader reader.CCIP, homeChain reader.HomeChain, ) *Plugin { - return &Plugin{ + p := &Plugin{ reportingCfg: reportingCfg, nodeID: nodeID, oracleIDToP2pID: oracleIDToP2pID, @@ -51,13 +58,42 @@ func NewPlugin( ccipReader: ccipReader, homeChain: homeChain, } + + bgSyncCtx, bgSyncCf := context.WithCancel(context.Background()) + p.bgSyncCancelFunc = bgSyncCf + p.bgSyncWG = &sync.WaitGroup{} + p.bgSyncWG.Add(1) + commit.BackgroundReaderSync( + bgSyncCtx, + p.bgSyncWG, + log, + ccipReader, + syncTimeout(cfg.SyncTimeout), + time.NewTicker(syncFrequency(p.cfg.SyncFrequency)).C, + ) + + return p +} + +func (p *Plugin) Close() error { + timeout := 10 * time.Second + ctx, cf := context.WithTimeout(context.Background(), timeout) + defer cf() + + if err := p.ccipReader.Close(ctx); err != nil { + return fmt.Errorf("close ccip reader: %w", err) + } + + p.bgSyncCancelFunc() + p.bgSyncWG.Wait() + return nil } // TODO: doc // SelectingRangesForReport doesn't depend on the previous outcome, explain how this is resilient (to being unable // to parse previous outcome) func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) (CommitPluginOutcome, CommitPluginState) { - if outcome == nil || len(outcome) == 0 { + if len(outcome) == 0 { return CommitPluginOutcome{}, SelectingRangesForReport } @@ -99,3 +135,20 @@ func (p *Plugin) supportsDestChain(oracle commontypes.OracleID) (bool, error) { } return destChainConfig.SupportedNodes.Contains(p.oracleIDToP2pID[oracle]), nil } + +func syncFrequency(configuredValue time.Duration) time.Duration { + if configuredValue.Milliseconds() == 0 { + return 10 * time.Second + } + return configuredValue +} + +func syncTimeout(configuredValue time.Duration) time.Duration { + if configuredValue.Milliseconds() == 0 { + return 3 * time.Second + } + return configuredValue +} + +// Interface compatibility checks. +var _ ocr3types.ReportingPlugin[[]byte] = &Plugin{} diff --git a/commit_rmn_ocb/report.go b/commit_rmn_ocb/report.go new file mode 100644 index 000000000..1e394f919 --- /dev/null +++ b/commit_rmn_ocb/report.go @@ -0,0 +1,78 @@ +package commitrmnocb + +import ( + "context" + "fmt" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" +) + +// Reports TODO: doc, metrics +func (p *Plugin) Reports(seqNr uint64, outcomeBytes ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) { + outcome, err := DecodeCommitPluginOutcome(outcomeBytes) + if err != nil { + // TODO: metrics + p.log.Errorw("failed to decode CommitPluginOutcome", "outcomeBytes", outcomeBytes, "err", err) + return nil, fmt.Errorf("failed to decode CommitPluginOutcome: %w", err) + } + + report := CommitPluginReport{ + SignedRoots: outcome.SignedRootsToReport, + GasPrices: outcome.GasPrices, + TokenPrices: outcome.TokenPrices, + } + + // TODO: log, metrics + + encodedReport, err := report.Encode() + if err != nil { + return nil, fmt.Errorf("encode commit plugin report: %w", err) + } + + return []ocr3types.ReportWithInfo[[]byte]{{Report: encodedReport, Info: nil}}, nil +} + +func (p *Plugin) ShouldAcceptAttestedReport( + _ context.Context, _ uint64, r ocr3types.ReportWithInfo[[]byte], +) (bool, error) { + decodedReport, err := DecodeCommitPluginReport(r.Report) + if err != nil { + // TODO: metrics + p.log.Errorw("failed to decode CommitPluginOutcome", "outcomeBytes", r.Report, "err", err) + return false, err + } + + if decodedReport.IsEmpty() { + // TODO: metrics + p.log.Warnf("found an empty report") + return false, nil + } + + return true, nil +} + +func (p *Plugin) ShouldTransmitAcceptedReport( + _ context.Context, _ uint64, r ocr3types.ReportWithInfo[[]byte], +) (bool, error) { + destChainSupported, err := p.supportsDestChain(p.nodeID) + if err != nil { + return false, fmt.Errorf("call to supportsDestChain failed: %w", err) + } + if !destChainSupported { + p.log.Debugw("oracle does not support dest chain, skipping report transmission") + return false, nil + } + + decodedReport, err := DecodeCommitPluginReport(r.Report) + if err != nil { + return false, fmt.Errorf("decode commit plugin report: %w", err) + } + + // TODO: metrics + p.log.Debugw("transmitting report", + "signedRoots", len(decodedReport.SignedRoots), + "tokenPrices", len(decodedReport.TokenPrices), + "gasPriceUpdates", len(decodedReport.GasPrices), + ) + return true, nil +} diff --git a/commit_rmn_ocb/types.go b/commit_rmn_ocb/types.go index 07386fb16..99284819e 100644 --- a/commit_rmn_ocb/types.go +++ b/commit_rmn_ocb/types.go @@ -3,6 +3,8 @@ package commitrmnocb import ( "encoding/json" "fmt" + "sort" + "time" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -32,6 +34,12 @@ type CommitPluginConfig struct { // The maximum number of times to check if the previous report has been transmitted MaxReportTransmissionCheckAttempts uint + + // SyncTimeout is the timeout for syncing the commit plugin reader. + SyncTimeout time.Duration `json:"syncTimeout"` + + // SyncFrequency is the frequency at which the commit plugin reader should sync. + SyncFrequency time.Duration `json:"syncFrequency"` } type RmnSig struct { @@ -138,47 +146,43 @@ func aggregateObservations(aos []types.AttributedObservation) AggregatedObservat // MerkleRoots for _, merkleRoot := range obs.MerkleRoots { - AppendToMap(aggObs.MerkleRoots, merkleRoot.ChainSel, merkleRoot) + aggObs.MerkleRoots[merkleRoot.ChainSel] = + append(aggObs.MerkleRoots[merkleRoot.ChainSel], merkleRoot) } // GasPrices for _, gasPriceChain := range obs.GasPrices { - AppendToMap(aggObs.GasPrices, gasPriceChain.ChainSel, gasPriceChain.GasPrice) + aggObs.GasPrices[gasPriceChain.ChainSel] = + append(aggObs.GasPrices[gasPriceChain.ChainSel], gasPriceChain.GasPrice) } // TokenPrices for _, tokenPrice := range obs.TokenPrices { - AppendToMap(aggObs.TokenPrices, tokenPrice.TokenID, tokenPrice.Price) + aggObs.TokenPrices[tokenPrice.TokenID] = + append(aggObs.TokenPrices[tokenPrice.TokenID], tokenPrice.Price) } // OnRampMaxSeqNums for _, seqNumChain := range obs.OnRampMaxSeqNums { - AppendToMap(aggObs.OnRampMaxSeqNums, seqNumChain.ChainSel, seqNumChain.SeqNum) + aggObs.OnRampMaxSeqNums[seqNumChain.ChainSel] = + append(aggObs.OnRampMaxSeqNums[seqNumChain.ChainSel], seqNumChain.SeqNum) } // OffRampMaxSeqNums for _, seqNumChain := range obs.OffRampMaxSeqNums { - AppendToMap(aggObs.OffRampMaxSeqNums, seqNumChain.ChainSel, seqNumChain.SeqNum) + aggObs.OffRampMaxSeqNums[seqNumChain.ChainSel] = + append(aggObs.OffRampMaxSeqNums[seqNumChain.ChainSel], seqNumChain.SeqNum) } // FChain for chainSel, f := range obs.FChain { - AppendToMap(aggObs.FChain, chainSel, f) + aggObs.FChain[chainSel] = append(aggObs.FChain[chainSel], f) } } return aggObs } -// AppendToMap TODO: doc -func AppendToMap[K comparable, V any](m map[K][]V, k K, v V) { - if _, exists := m[k]; exists { - m[k] = append(m[k], v) - } else { - m[k] = []V{v} - } -} - // ConsensusObservation TODO: doc type ConsensusObservation struct { // A map from chain selectors to each chain's consensus merkle root @@ -200,6 +204,36 @@ type ConsensusObservation struct { FChain map[cciptypes.ChainSelector]int } +// GasPricesSortedArray TODO: doc +func (co ConsensusObservation) GasPricesSortedArray() []cciptypes.GasPriceChain { + gasPrices := make([]cciptypes.GasPriceChain, 0, len(co.GasPrices)) + for chain, gasPrice := range co.GasPrices { + gasPrices = append(gasPrices, cciptypes.NewGasPriceChain(gasPrice.Int, chain)) + } + + // TODO: explain this + sort.Slice(gasPrices, func(i, j int) bool { + return gasPrices[i].ChainSel < gasPrices[j].ChainSel + }) + + return gasPrices +} + +// TokenPricesSortedArray TODO: doc +func (co ConsensusObservation) TokenPricesSortedArray() []cciptypes.TokenPrice { + tokenPrices := make([]cciptypes.TokenPrice, 0, len(co.TokenPrices)) + for tokenID, tokenPrice := range co.TokenPrices { + tokenPrices = append(tokenPrices, cciptypes.NewTokenPrice(tokenID, tokenPrice.Int)) + } + + // TODO: explain this + sort.Slice(tokenPrices, func(i, j int) bool { + return tokenPrices[i].TokenID < tokenPrices[j].TokenID + }) + + return tokenPrices +} + type CommitPluginOutcomeType int const ( @@ -284,3 +318,28 @@ type OnChain interface { GetOffRampMaxSeqNums() ([]plugintypes.SeqNumChain, error) GetMerkleRoots([]ChainRange) ([]MerkleRoot, error) } + +type CommitPluginReport struct { + SignedRoots []SignedMerkleRoot + TokenPrices []cciptypes.TokenPrice `json:"tokenPrices"` + GasPrices []cciptypes.GasPriceChain `json:"gasPrices"` +} + +func (r CommitPluginReport) IsEmpty() bool { + return len(r.SignedRoots) == 0 && len(r.TokenPrices) == 0 && len(r.GasPrices) == 0 +} + +func (r CommitPluginReport) Encode() ([]byte, error) { + encodedReport, err := json.Marshal(r) + if err != nil { + return nil, fmt.Errorf("failed to encode CommitPluginReport: %w", err) + } + + return encodedReport, nil +} + +func DecodeCommitPluginReport(b []byte) (CommitPluginReport, error) { + r := CommitPluginReport{} + err := json.Unmarshal(b, &r) + return r, err +} diff --git a/commit_rmn_ocb/validate_observation.go b/commit_rmn_ocb/validate_observation.go index 947c812be..c8ceae61c 100644 --- a/commit_rmn_ocb/validate_observation.go +++ b/commit_rmn_ocb/validate_observation.go @@ -50,7 +50,7 @@ func (p *Plugin) ValidateObservation(_ ocr3types.OutcomeContext, _ types.Query, // validateMerkleRoots TODO: doc // No duplicate chains, only contains chainSelector that the owner can read func (p *Plugin) validateObservedMerkleRoots(merkleRoots []MerkleRoot, observer commontypes.OracleID) error { - if merkleRoots == nil || len(merkleRoots) == 0 { + if len(merkleRoots) == 0 { return nil } @@ -81,7 +81,7 @@ func (p *Plugin) validateObservedOnRampMaxSeqNums( onRampMaxSeqNums []plugintypes.SeqNumChain, observer commontypes.OracleID, ) error { - if onRampMaxSeqNums == nil || len(onRampMaxSeqNums) == 0 { + if len(onRampMaxSeqNums) == 0 { return nil } @@ -112,7 +112,7 @@ func (p *Plugin) validateObservedOffRampMaxSeqNums( offRampMaxSeqNums []plugintypes.SeqNumChain, observer commontypes.OracleID, ) error { - if offRampMaxSeqNums == nil || len(offRampMaxSeqNums) == 0 { + if len(offRampMaxSeqNums) == 0 { return nil } @@ -139,7 +139,7 @@ func (p *Plugin) validateObservedOffRampMaxSeqNums( // validateFChains TODO: doc // FChain must not be empty func (p *Plugin) validateFChain(fchain map[cciptypes.ChainSelector]int) error { - if fchain == nil || len(fchain) == 0 { + if len(fchain) == 0 { return fmt.Errorf("fchain map is empty") }