Skip to content

Commit

Permalink
Add report methods
Browse files Browse the repository at this point in the history
  • Loading branch information
rstout committed Jul 16, 2024
1 parent d925d47 commit 1608eb2
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 79 deletions.
2 changes: 1 addition & 1 deletion commit/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewPlugin(
p.bgSyncCancelFunc = bgSyncCf
p.bgSyncWG = &sync.WaitGroup{}
p.bgSyncWG.Add(1)
backgroundReaderSync(
BackgroundReaderSync(
bgSyncCtx,
p.bgSyncWG,
lggr,
Expand Down
2 changes: 1 addition & 1 deletion commit/plugin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func validateMerkleRootsState(
return true, nil
}

func backgroundReaderSync(
func BackgroundReaderSync(
ctx context.Context,
wg *sync.WaitGroup,
lggr logger.Logger,
Expand Down
2 changes: 1 addition & 1 deletion commit/plugin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ func Test_backgroundReaderSync(t *testing.T) {
wg.Add(1)

// start background syncing
backgroundReaderSync(ctx, wg, lggr, reader, syncTimeout, ticker)
BackgroundReaderSync(ctx, wg, lggr, reader, syncTimeout, ticker)

// send a tick to trigger the first sync that errors
reader.On("Sync", mock.Anything).Return(false, fmt.Errorf("some err")).Once()
Expand Down
121 changes: 66 additions & 55 deletions commit_rmn_ocb/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ func (p *Plugin) ReportRangesOutcome(
"offRampMaxSeqNum", offRampMaxSeqNum,
"onRampMaxSeqNum", onRampMaxSeqNum,
"chainSelector", chainSel)
continue
} else if offRampMaxSeqNum == onRampMaxSeqNum {
continue
} else {
}

if offRampMaxSeqNum < onRampMaxSeqNum {
chainRange := ChainRange{
ChainSel: chainSel,
SeqNumRange: [2]cciptypes.SeqNum{offRampMaxSeqNum, onRampMaxSeqNum},
Expand All @@ -85,6 +84,7 @@ func (p *Plugin) ReportRangesOutcome(
}
}

// TODO: explain this
sort.Slice(rangesToReport, func(i, j int) bool { return rangesToReport[i].ChainSel < rangesToReport[j].ChainSel })

outcome := CommitPluginOutcome{
Expand All @@ -95,20 +95,55 @@ func (p *Plugin) ReportRangesOutcome(
return outcome.Encode()
}

// TODO: doc
// TODO: doc, break apart
func (p *Plugin) buildReport(
query CommitQuery,
consensusObservation ConsensusObservation,
) (ocr3types.Outcome, error) {
if query.SignedMerkleRoots == nil || len(query.SignedMerkleRoots) == 0 {
// TODO: metrics
return ocr3types.Outcome{}, fmt.Errorf("buildReport: query.SignedMerkleRoots is empty")
verifiedSignedRoots := p.verifySignedRoots(query.SignedMerkleRoots, consensusObservation.MerkleRoots)

chainsToExclude := make([]cciptypes.ChainSelector, 0)
for chain := range verifiedSignedRoots {
if _, exists := consensusObservation.GasPrices[chain]; !exists {
// TODO: metrics
p.log.Warnw(
"did not find a consensus gas price for chain %d, excluding it from the report", chain)
chainsToExclude = append(chainsToExclude, chain)
}
}

observedMerkleRoots := consensusObservation.MerkleRoots
for _, chainSelector := range chainsToExclude {
delete(verifiedSignedRoots, chainSelector)
}

// TODO: token prices validation
// exclude merkle roots if expected token prices don't exist

verifiedSignedRootsArray := make([]SignedMerkleRoot, 0, len(verifiedSignedRoots))
for _, signedRoot := range verifiedSignedRoots {
verifiedSignedRootsArray = append(verifiedSignedRootsArray, signedRoot)
}

// TODO: explain this
sort.Slice(verifiedSignedRootsArray, func(i, j int) bool {
return verifiedSignedRootsArray[i].chain() < verifiedSignedRootsArray[j].chain()
})

return CommitPluginOutcome{
OutcomeType: ReportGenerated,
SignedRootsToReport: verifiedSignedRootsArray,
GasPrices: consensusObservation.GasPricesSortedArray(),
TokenPrices: consensusObservation.TokenPricesSortedArray(),
}.Encode()
}

// TODO: doc, break apart
func (p *Plugin) verifySignedRoots(
rmnSignedRoots []SignedMerkleRoot,
observedRoots map[cciptypes.ChainSelector]MerkleRoot,
) map[cciptypes.ChainSelector]SignedMerkleRoot {
verifiedSignedRoots := make(map[cciptypes.ChainSelector]SignedMerkleRoot)
for _, signedRoot := range query.SignedMerkleRoots {
for _, signedRoot := range rmnSignedRoots {
if err := p.rmn.VerifySignedMerkleRoot(signedRoot); err != nil {
// TODO: metrics
p.log.Warnw("failed to verify signed merkle root",
Expand All @@ -117,30 +152,26 @@ func (p *Plugin) buildReport(
continue
}

if observedMerkleRoot, exists := observedMerkleRoots[signedRoot.chain()]; exists {
// check merkle root equality
if observedMerkleRoot != signedRoot.MerkleRoot {
if observedMerkleRoot, exists := observedRoots[signedRoot.chain()]; exists {
if observedMerkleRoot == signedRoot.MerkleRoot {
verifiedSignedRoots[signedRoot.chain()] = signedRoot
} else {
// TODO: metrics
p.log.Warnw("observed merkle root does not match merkle root received from RMN",
"rmnSignedRoot", signedRoot,
"observedMerkleRoot", observedMerkleRoot)
continue
} else {

}
} else {
// TODO: metrics
p.log.Warnw(
"received a signed merkle root from RMN for a chain, but did not observe a merkle root for "+
"this chain",
"rmnSignedRoot", signedRoot)
continue
}

verifiedSignedRoots[signedRoot.chain()] = signedRoot
}

for chain, observedMerkleRoot := range observedMerkleRoots {
// TODO: explain this
for chain, observedMerkleRoot := range observedRoots {
if _, exists := verifiedSignedRoots[chain]; !exists {
if p.rmn.ChainThreshold(chain) == 0 {
verifiedSignedRoots[chain] = SignedMerkleRoot{
Expand All @@ -156,23 +187,7 @@ func (p *Plugin) buildReport(
}
}

chainsToExclude := make([]cciptypes.ChainSelector, 0)
for chain, _ := range verifiedSignedRoots {
if _, exists := consensusObservation.GasPrices[chain]; !exists {
// TODO: metrics
p.log.Warnw(
"did not find a consensus gas price for chain %d, excluding it from the report", chain)
chainsToExclude = append(chainsToExclude, chain)
}
}

for _, chainSelector := range chainsToExclude {
delete(verifiedSignedRoots, chainSelector)
}

// TODO: token prices validation

return nil, nil
return verifiedSignedRoots
}

// TODO: doc
Expand All @@ -193,21 +208,21 @@ func (p *Plugin) checkForReportTransmission(

if offRampUpdated {
return CommitPluginOutcome{
OutcomeType: CommitPluginOutcomeType(ReportGenerated),
OutcomeType: ReportGenerated,
}.Encode()
} else {
if previousOutcome.ReportTransmissionCheckAttempts+1 >= p.cfg.MaxReportTransmissionCheckAttempts {
return CommitPluginOutcome{
OutcomeType: CommitPluginOutcomeType(ReportNotTransmitted),
}.Encode()
} else {
return CommitPluginOutcome{
OutcomeType: CommitPluginOutcomeType(ReportNotYetTransmitted),
OffRampMaxSeqNums: previousOutcome.OffRampMaxSeqNums,
ReportTransmissionCheckAttempts: previousOutcome.ReportTransmissionCheckAttempts + 1,
}.Encode()
}
}

if previousOutcome.ReportTransmissionCheckAttempts+1 >= p.cfg.MaxReportTransmissionCheckAttempts {
return CommitPluginOutcome{
OutcomeType: ReportNotTransmitted,
}.Encode()
}

return CommitPluginOutcome{
OutcomeType: ReportNotYetTransmitted,
OffRampMaxSeqNums: previousOutcome.OffRampMaxSeqNums,
ReportTransmissionCheckAttempts: previousOutcome.ReportTransmissionCheckAttempts + 1,
}.Encode()
}

// getConsensusObservation TODO: doc
Expand Down Expand Up @@ -401,11 +416,7 @@ func mostFrequentElem[T comparable](elems []T) (T, int) {
func counts[T comparable](elems []T) map[T]int {
m := make(map[T]int)
for _, elem := range elems {
if _, exists := m[elem]; exists {
m[elem]++
} else {
m[elem] = 1
}
m[elem]++
}

return m
Expand Down
57 changes: 55 additions & 2 deletions commit_rmn_ocb/plugin.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package commitrmnocb

import (
"context"
"fmt"
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/smartcontractkit/libocr/commontypes"
Expand All @@ -11,6 +14,8 @@ import (
"github.com/smartcontractkit/chainlink-ccip/internal/reader"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/commit"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

Expand All @@ -25,6 +30,8 @@ type Plugin struct {
tokenPricesReader reader.TokenPrices
ccipReader reader.CCIP
homeChain reader.HomeChain
bgSyncCancelFunc context.CancelFunc
bgSyncWG *sync.WaitGroup
}

func NewPlugin(
Expand All @@ -39,7 +46,7 @@ func NewPlugin(
ccipReader reader.CCIP,
homeChain reader.HomeChain,
) *Plugin {
return &Plugin{
p := &Plugin{
reportingCfg: reportingCfg,
nodeID: nodeID,
oracleIDToP2pID: oracleIDToP2pID,
Expand All @@ -51,13 +58,42 @@ func NewPlugin(
ccipReader: ccipReader,
homeChain: homeChain,
}

bgSyncCtx, bgSyncCf := context.WithCancel(context.Background())
p.bgSyncCancelFunc = bgSyncCf
p.bgSyncWG = &sync.WaitGroup{}
p.bgSyncWG.Add(1)
commit.BackgroundReaderSync(
bgSyncCtx,
p.bgSyncWG,
log,
ccipReader,
syncTimeout(cfg.SyncTimeout),
time.NewTicker(syncFrequency(p.cfg.SyncFrequency)).C,
)

return p
}

func (p *Plugin) Close() error {
timeout := 10 * time.Second
ctx, cf := context.WithTimeout(context.Background(), timeout)
defer cf()

if err := p.ccipReader.Close(ctx); err != nil {
return fmt.Errorf("close ccip reader: %w", err)
}

p.bgSyncCancelFunc()
p.bgSyncWG.Wait()
return nil
}

// TODO: doc
// SelectingRangesForReport doesn't depend on the previous outcome, explain how this is resilient (to being unable
// to parse previous outcome)
func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) (CommitPluginOutcome, CommitPluginState) {
if outcome == nil || len(outcome) == 0 {
if len(outcome) == 0 {
return CommitPluginOutcome{}, SelectingRangesForReport
}

Expand Down Expand Up @@ -99,3 +135,20 @@ func (p *Plugin) supportsDestChain(oracle commontypes.OracleID) (bool, error) {
}
return destChainConfig.SupportedNodes.Contains(p.oracleIDToP2pID[oracle]), nil
}

func syncFrequency(configuredValue time.Duration) time.Duration {
if configuredValue.Milliseconds() == 0 {
return 10 * time.Second
}
return configuredValue
}

func syncTimeout(configuredValue time.Duration) time.Duration {
if configuredValue.Milliseconds() == 0 {
return 3 * time.Second
}
return configuredValue
}

// Interface compatibility checks.
var _ ocr3types.ReportingPlugin[[]byte] = &Plugin{}
Loading

0 comments on commit 1608eb2

Please sign in to comment.