From a30e10ef5115cc4a7a704abb7e9d9fb7f17f1e98 Mon Sep 17 00:00:00 2001 From: nogo <110664798+0xnogo@users.noreply.github.com> Date: Wed, 30 Oct 2024 16:22:08 +0400 Subject: [PATCH] Bootstrapper improvements (#14953) * MessageHasher logging and bugfixing * EVMRmnCrypto logging * Bootstraper oracle updates * RMNPeerClient logging improvement * add RMNIntegrationTest * upgrade cl-ccip * skip the test * remove exec part * fix linter errs and add comments on addrs * fix typo * include exec report and fix exec plugin codec * fix exec codec unit test * use remote proxy * use rmnProxy * ccip bump * bootstrap change * lint --------- Co-authored-by: dimkouv --- .../ccip/oraclecreator/bootstrap.go | 346 ++++++++++++------ .../ccip/oraclecreator/bootstrap_test.go | 100 +++++ 2 files changed, 330 insertions(+), 116 deletions(-) create mode 100644 core/capabilities/ccip/oraclecreator/bootstrap_test.go diff --git a/core/capabilities/ccip/oraclecreator/bootstrap.go b/core/capabilities/ccip/oraclecreator/bootstrap.go index 005b2cb4005..5e4331197d0 100644 --- a/core/capabilities/ccip/oraclecreator/bootstrap.go +++ b/core/capabilities/ccip/oraclecreator/bootstrap.go @@ -5,11 +5,14 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" + "errors" "fmt" "io" "sync" "time" + mapset "github.com/deckarep/golang-set/v2" + "github.com/ethereum/go-ethereum/common/hexutil" ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" @@ -19,10 +22,11 @@ import ( libocr3 "github.com/smartcontractkit/libocr/offchainreporting2plus" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" - "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn" "github.com/smartcontractkit/chainlink-ccip/pkg/consts" ccipreaderpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader" cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" + + "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/ocrimpls" @@ -36,13 +40,79 @@ import ( var _ cctypes.OracleCreator = &bootstrapOracleCreator{} +// bootstrapOracle wraps a CCIPOracle (the bootstrapper) and manages RMN-specific resources +type bootstrapOracle struct { + baseOracle cctypes.CCIPOracle + peerGroupDialer *peerGroupDialer + rmnHomeReader ccipreaderpkg.RMNHome + mu sync.Mutex +} + +func newBootstrapOracle( + baseOracle cctypes.CCIPOracle, + peerGroupDialer *peerGroupDialer, + rmnHomeReader ccipreaderpkg.RMNHome, +) cctypes.CCIPOracle { + return &bootstrapOracle{ + baseOracle: baseOracle, + peerGroupDialer: peerGroupDialer, + rmnHomeReader: rmnHomeReader, + } +} + +func (o *bootstrapOracle) Start() error { + o.mu.Lock() + defer o.mu.Unlock() + + // Start RMNHome reader first + if err := o.rmnHomeReader.Start(context.Background()); err != nil { + return fmt.Errorf("failed to start RMNHome reader: %w", err) + } + + o.peerGroupDialer.Start() + + // Then start the base oracle (bootstrapper) + if err := o.baseOracle.Start(); err != nil { + // Clean up RMN components if base fails to start + _ = o.rmnHomeReader.Close() + _ = o.peerGroupDialer.Close() + return fmt.Errorf("failed to start base oracle: %w", err) + } + + return nil +} + +func (o *bootstrapOracle) Close() error { + o.mu.Lock() + defer o.mu.Unlock() + + var errs []error + + if err := o.baseOracle.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close base oracle: %w", err)) + } + + if err := o.peerGroupDialer.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close peer group dialer: %w", err)) + } + + if err := o.rmnHomeReader.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close RMN home reader: %w", err)) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil +} + type bootstrapOracleCreator struct { - peerWrapper *ocrcommon.SingletonPeerWrapper - bootstrapperLocators []commontypes.BootstrapperLocator - db ocr3types.Database - monitoringEndpointGen telemetry.MonitoringEndpointGenerator - lggr logger.Logger - contractReader types.ContractReader + peerWrapper *ocrcommon.SingletonPeerWrapper + bootstrapperLocators []commontypes.BootstrapperLocator + db ocr3types.Database + monitoringEndpointGen telemetry.MonitoringEndpointGenerator + lggr logger.Logger + homeChainContractReader types.ContractReader } func NewBootstrapOracleCreator( @@ -51,15 +121,15 @@ func NewBootstrapOracleCreator( db ocr3types.Database, monitoringEndpointGen telemetry.MonitoringEndpointGenerator, lggr logger.Logger, - contractReader types.ContractReader, + homeChainContractReader types.ContractReader, ) cctypes.OracleCreator { return &bootstrapOracleCreator{ - peerWrapper: peerWrapper, - bootstrapperLocators: bootstrapperLocators, - db: db, - monitoringEndpointGen: monitoringEndpointGen, - lggr: lggr, - contractReader: contractReader, + peerWrapper: peerWrapper, + bootstrapperLocators: bootstrapperLocators, + db: db, + monitoringEndpointGen: monitoringEndpointGen, + lggr: lggr, + homeChainContractReader: homeChainContractReader, } } @@ -79,15 +149,6 @@ func (i *bootstrapOracleCreator) Create(_ uint32, config cctypes.OCR3ConfigWithM return nil, fmt.Errorf("failed to get chain ID from selector: %w", err) } - ctx := context.Background() - rmnHomeReader, err := i.getRmnHomeReader(ctx, config) - if err != nil { - return nil, fmt.Errorf("failed to get RMNHome reader: %w", err) - } - if err = rmnHomeReader.Start(ctx); err != nil { - return nil, fmt.Errorf("failed to start RMNHome reader: %w", err) - } - destChainFamily := chaintype.EVM destRelayID := types.NewRelayID(string(destChainFamily), fmt.Sprintf("%d", chainID)) @@ -96,6 +157,12 @@ func (i *bootstrapOracleCreator) Create(_ uint32, config cctypes.OCR3ConfigWithM oraclePeerIDs = append(oraclePeerIDs, n.P2pID) } + ctx := context.Background() + rmnHomeReader, err := i.getRmnHomeReader(ctx, config) + if err != nil { + return nil, fmt.Errorf("failed to get RMNHome reader: %w", err) + } + pgd := newPeerGroupDialer( i.lggr.Named("PeerGroupDialer"), i.peerWrapper.PeerGroupFactory, @@ -104,7 +171,6 @@ func (i *bootstrapOracleCreator) Create(_ uint32, config cctypes.OCR3ConfigWithM oraclePeerIDs, config.ConfigDigest, ) - pgd.Start() bootstrapperArgs := libocr3.BootstrapperArgs{ BootstrapperFactory: i.peerWrapper.Peer2, @@ -138,7 +204,7 @@ func (i *bootstrapOracleCreator) Create(_ uint32, config cctypes.OCR3ConfigWithM []io.Closer{pgd, rmnHomeReader}, ) - return bootstrapperWithCustomClose, nil + return newBootstrapOracle(bootstrapperWithCustomClose, pgd, rmnHomeReader), nil } func (i *bootstrapOracleCreator) getRmnHomeReader(ctx context.Context, config cctypes.OCR3ConfigWithMeta) (ccipreaderpkg.RMNHome, error) { @@ -147,11 +213,11 @@ func (i *bootstrapOracleCreator) getRmnHomeReader(ctx context.Context, config cc Name: consts.ContractNameRMNHome, } - if err1 := i.contractReader.Bind(ctx, []types.BoundContract{rmnHomeBoundContract}); err1 != nil { + if err1 := i.homeChainContractReader.Bind(ctx, []types.BoundContract{rmnHomeBoundContract}); err1 != nil { return nil, fmt.Errorf("failed to bind RMNHome contract: %w", err1) } rmnHomeReader := ccipreaderpkg.NewRMNHomePoller( - i.contractReader, + i.homeChainContractReader, rmnHomeBoundContract, i.lggr, 5*time.Second, @@ -159,7 +225,7 @@ func (i *bootstrapOracleCreator) getRmnHomeReader(ctx context.Context, config cc return rmnHomeReader, nil } -// peerGroupDialer keeps watching for config changes and calls NewPeerGroup when needed. +// peerGroupDialer keeps watching for RMNHome config changes and calls NewPeerGroup when needed. // Required for managing RMN related peer group connections. type peerGroupDialer struct { lggr logger.Logger @@ -177,10 +243,22 @@ type peerGroupDialer struct { syncInterval time.Duration - mu *sync.Mutex - syncCtxCf context.CancelFunc + mu *sync.Mutex + syncCancel context.CancelFunc +} + +type syncAction struct { + actionType actionType + configDigest cciptypes.Bytes32 } +type actionType string + +const ( + ActionCreate actionType = "create" + ActionClose actionType = "close" +) + func newPeerGroupDialer( lggr logger.Logger, peerGroupFactory rmn.PeerGroupFactory, @@ -201,15 +279,15 @@ func newPeerGroupDialer( activePeerGroups: []rmn.PeerGroup{}, - syncInterval: time.Minute, // todo: make it configurable + syncInterval: 12 * time.Second, // todo: make it configurable - mu: &sync.Mutex{}, - syncCtxCf: nil, + mu: &sync.Mutex{}, + syncCancel: nil, } } func (d *peerGroupDialer) Start() { - if d.syncCtxCf != nil { + if d.syncCancel != nil { d.lggr.Warnw("peer group dialer already started, should not be called twice") return } @@ -217,7 +295,7 @@ func (d *peerGroupDialer) Start() { d.lggr.Infow("Starting peer group dialer") ctx, cf := context.WithCancel(context.Background()) - d.syncCtxCf = cf + d.syncCancel = cf go func() { d.sync() @@ -240,58 +318,149 @@ func (d *peerGroupDialer) Close() error { d.closeExistingPeerGroups() - if d.syncCtxCf != nil { - d.syncCtxCf() + if d.syncCancel != nil { + d.syncCancel() } return nil } +// Pure function for calculating sync actions +func calculateSyncActions( + currentConfigDigests []cciptypes.Bytes32, + activeConfigDigest cciptypes.Bytes32, + candidateConfigDigest cciptypes.Bytes32, +) []syncAction { + current := mapset.NewSet[cciptypes.Bytes32]() + for _, digest := range currentConfigDigests { + current.Add(digest) + } + + desired := mapset.NewSet[cciptypes.Bytes32]() + if !activeConfigDigest.IsEmpty() { + desired.Add(activeConfigDigest) + } + if !candidateConfigDigest.IsEmpty() { + desired.Add(candidateConfigDigest) + } + + closeCount := current.Difference(desired).Cardinality() + createCount := desired.Difference(current).Cardinality() + actions := make([]syncAction, 0, closeCount+createCount) + + // Configs to close: in current but not in desired + for digest := range current.Difference(desired).Iterator().C { + actions = append(actions, syncAction{ + actionType: ActionClose, + configDigest: digest, + }) + } + + // Configs to create: in desired but not in current + for digest := range desired.Difference(current).Iterator().C { + actions = append(actions, syncAction{ + actionType: ActionCreate, + configDigest: digest, + }) + } + + return actions +} + func (d *peerGroupDialer) sync() { d.mu.Lock() defer d.mu.Unlock() - if !d.shouldSync() { - d.lggr.Debugw("No need to sync peer groups") + activeDigest, candidateDigest := d.rmnHomeReader.GetAllConfigDigests() + actions := calculateSyncActions(d.activeConfigDigests, activeDigest, candidateDigest) + if len(actions) == 0 { + d.lggr.Debugw("No peer group actions needed") return } - d.lggr.Infow("Syncing peer groups") - d.closeExistingPeerGroups() + d.lggr.Infow("Syncing peer groups", "actions", actions) + + // Handle each action + for _, action := range actions { + switch action.actionType { + case ActionClose: + d.closePeerGroup(action.configDigest) + case ActionCreate: + if err := d.createPeerGroup(action.configDigest); err != nil { + d.lggr.Errorw("Failed to create peer group", + "configDigest", action.configDigest, + "err", err) + // Consider closing all groups on error + d.closeExistingPeerGroups() + return + } + } + } +} - if err := d.createNewPeerGroups(); err != nil { - d.lggr.Errorw("failed to create new peer groups", "err", err) - d.closeExistingPeerGroups() // close potentially opened peer groups +// Helper function to close specific peer group +func (d *peerGroupDialer) closePeerGroup(configDigest cciptypes.Bytes32) { + for i, digest := range d.activeConfigDigests { + if digest == configDigest { + if err := d.activePeerGroups[i].Close(); err != nil { + d.lggr.Warnw("Failed to close peer group", + "configDigest", configDigest, + "err", err) + } else { + d.lggr.Infow("Closed peer group successfully", + "configDigest", configDigest) + } + // Remove from active groups and digests + d.activePeerGroups = append(d.activePeerGroups[:i], d.activePeerGroups[i+1:]...) + d.activeConfigDigests = append(d.activeConfigDigests[:i], d.activeConfigDigests[i+1:]...) + return + } } } -func (d *peerGroupDialer) shouldSync() bool { - if len(d.activePeerGroups) == 0 { - return true +func (d *peerGroupDialer) createPeerGroup(rmnHomeConfigDigest cciptypes.Bytes32) error { + rmnNodesInfo, err := d.rmnHomeReader.GetRMNNodesInfo(rmnHomeConfigDigest) + if err != nil { + return fmt.Errorf("get RMN nodes info: %w", err) } - activeConfigDigest, candidateConfigDigest := d.rmnHomeReader.GetAllConfigDigests() - var configDigests [][32]byte + // Create generic endpoint config digest by hashing commit config digest and rmn home config digest + h := sha256.Sum256(append(d.commitConfigDigest[:], rmnHomeConfigDigest[:]...)) + genericEndpointConfigDigest := writePrefix(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo, h) - if !activeConfigDigest.IsEmpty() { - configDigests = append(configDigests, activeConfigDigest) + // Combine oracle peer IDs with RMN node peer IDs + peerIDs := make([]string, 0, len(d.oraclePeerIDs)+len(rmnNodesInfo)) + for _, p := range d.oraclePeerIDs { + peerIDs = append(peerIDs, p.String()) } - if !candidateConfigDigest.IsEmpty() { - configDigests = append(configDigests, candidateConfigDigest) + for _, n := range rmnNodesInfo { + peerIDs = append(peerIDs, n.PeerID.String()) } - if len(configDigests) != len(d.activeConfigDigests) { - return true - } - for i, rmnHomeConfigDigest := range configDigests { - if rmnHomeConfigDigest != d.activeConfigDigests[i] { - return true - } + lggr := d.lggr.With( + "genericEndpointConfigDigest", genericEndpointConfigDigest.String(), + "peerIDs", peerIDs, + "bootstrappers", d.bootstrapLocators, + ) + + lggr.Infow("Creating new peer group") + peerGroup, err := d.peerGroupFactory.NewPeerGroup( + [32]byte(genericEndpointConfigDigest), + peerIDs, + d.bootstrapLocators, + ) + if err != nil { + return fmt.Errorf("new peer group: %w", err) } + lggr.Infow("Created new peer group successfully") + + d.activePeerGroups = append(d.activePeerGroups, peerGroup) + d.activeConfigDigests = append(d.activeConfigDigests, genericEndpointConfigDigest) - return false + return nil } +// closeExistingPeerGroups closes all active peer groups func (d *peerGroupDialer) closeExistingPeerGroups() { for _, pg := range d.activePeerGroups { if err := pg.Close(); err != nil { @@ -305,61 +474,6 @@ func (d *peerGroupDialer) closeExistingPeerGroups() { d.activeConfigDigests = []cciptypes.Bytes32{} } -func (d *peerGroupDialer) createNewPeerGroups() error { - activeConfigDigest, candidateConfigDigest := d.rmnHomeReader.GetAllConfigDigests() - var configDigests [][32]byte - - if !activeConfigDigest.IsEmpty() { - configDigests = append(configDigests, activeConfigDigest) - } - if !candidateConfigDigest.IsEmpty() { - configDigests = append(configDigests, candidateConfigDigest) - } - - d.lggr.Infow("Creating new peer groups", "configDigests", configDigests) - - for _, rmnHomeConfigDigest := range configDigests { - rmnNodesInfo, err := d.rmnHomeReader.GetRMNNodesInfo(rmnHomeConfigDigest) - if err != nil { - return fmt.Errorf("get RMN nodes info: %w", err) - } - - h := sha256.Sum256(append(d.commitConfigDigest[:], rmnHomeConfigDigest[:]...)) - genericEndpointConfigDigest := writePrefix(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo, h) - - peerIDs := make([]string, 0, len(d.oraclePeerIDs)) - for _, p := range d.oraclePeerIDs { - peerIDs = append(peerIDs, p.String()) - } - for _, n := range rmnNodesInfo { - peerIDs = append(peerIDs, n.PeerID.String()) - } - - lggr := d.lggr.With( - "genericEndpointConfigDigest", genericEndpointConfigDigest.String(), - "peerIDs", peerIDs, - "bootstrappers", d.bootstrapLocators, - ) - - lggr.Infow("Bootstrapper is creating new peer group") - peerGroup, err := d.peerGroupFactory.NewPeerGroup( - [32]byte(genericEndpointConfigDigest), - peerIDs, - d.bootstrapLocators, - ) - if err != nil { - lggr.Errorw("failed to create new peer group", "err", err) - return fmt.Errorf("new peer group: %w", err) - } - lggr.Infow("Created new peer group successfully") - - d.activePeerGroups = append(d.activePeerGroups, peerGroup) - d.activeConfigDigests = append(d.activeConfigDigests, genericEndpointConfigDigest) - } - - return nil -} - func writePrefix(prefix ocr2types.ConfigDigestPrefix, hash cciptypes.Bytes32) cciptypes.Bytes32 { var prefixBytes [2]byte binary.BigEndian.PutUint16(prefixBytes[:], uint16(prefix)) diff --git a/core/capabilities/ccip/oraclecreator/bootstrap_test.go b/core/capabilities/ccip/oraclecreator/bootstrap_test.go new file mode 100644 index 00000000000..1b1cc9dc0d2 --- /dev/null +++ b/core/capabilities/ccip/oraclecreator/bootstrap_test.go @@ -0,0 +1,100 @@ +package oraclecreator + +import ( + "bytes" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" +) + +func TestCalculateSyncActions(t *testing.T) { + tests := []struct { + name string + currentDigests []cciptypes.Bytes32 + activeDigest cciptypes.Bytes32 + candidateDigest cciptypes.Bytes32 + expectedActions []syncAction + }{ + { + name: "no changes needed", + currentDigests: []cciptypes.Bytes32{{1}, {2}}, + activeDigest: cciptypes.Bytes32{1}, + candidateDigest: cciptypes.Bytes32{2}, + expectedActions: nil, + }, + { + name: "need to close candidate", + currentDigests: []cciptypes.Bytes32{{1}, {2}}, + activeDigest: cciptypes.Bytes32{1}, + candidateDigest: cciptypes.Bytes32{}, // empty + expectedActions: []syncAction{ + {actionType: ActionClose, configDigest: cciptypes.Bytes32{2}}, + }, + }, + { + name: "need to create candidate", + currentDigests: []cciptypes.Bytes32{{1}}, + activeDigest: cciptypes.Bytes32{1}, + candidateDigest: cciptypes.Bytes32{2}, + expectedActions: []syncAction{ + {actionType: ActionCreate, configDigest: cciptypes.Bytes32{2}}, + }, + }, + { + name: "both configs empty", + currentDigests: []cciptypes.Bytes32{{1}, {2}}, + activeDigest: cciptypes.Bytes32{}, + candidateDigest: cciptypes.Bytes32{}, + expectedActions: []syncAction{ + {actionType: ActionClose, configDigest: cciptypes.Bytes32{1}}, + {actionType: ActionClose, configDigest: cciptypes.Bytes32{2}}, + }, + }, + { + name: "replace both configs", + currentDigests: []cciptypes.Bytes32{{1}, {2}}, + activeDigest: cciptypes.Bytes32{3}, + candidateDigest: cciptypes.Bytes32{4}, + expectedActions: []syncAction{ + {actionType: ActionClose, configDigest: cciptypes.Bytes32{1}}, + {actionType: ActionClose, configDigest: cciptypes.Bytes32{2}}, + {actionType: ActionCreate, configDigest: cciptypes.Bytes32{3}}, + {actionType: ActionCreate, configDigest: cciptypes.Bytes32{4}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actions := calculateSyncActions( + tt.currentDigests, + tt.activeDigest, + tt.candidateDigest, + ) + + require.Equal(t, len(tt.expectedActions), len(actions)) + + // Sort both slices to ensure consistent comparison + sort.Slice(actions, func(i, j int) bool { + if actions[i].actionType != actions[j].actionType { + return actions[i].actionType < actions[j].actionType + } + return bytes.Compare(actions[i].configDigest[:], actions[j].configDigest[:]) < 0 + }) + sort.Slice(tt.expectedActions, func(i, j int) bool { + if tt.expectedActions[i].actionType != tt.expectedActions[j].actionType { + return tt.expectedActions[i].actionType < tt.expectedActions[j].actionType + } + return bytes.Compare(tt.expectedActions[i].configDigest[:], tt.expectedActions[j].configDigest[:]) < 0 + }) + + for i := range actions { + require.Equal(t, tt.expectedActions[i].actionType, actions[i].actionType) + require.Equal(t, tt.expectedActions[i].configDigest, actions[i].configDigest) + } + }) + } +}