Skip to content

Commit

Permalink
Deduplication of the peer group logic (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnogo authored Oct 31, 2024
1 parent 35a8474 commit ac32780
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 92 deletions.
6 changes: 4 additions & 2 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ packages:
github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn:
interfaces:
Controller:
PeerGroup:
PeerGroupFactory:
Stream:
github.com/smartcontractkit/chainlink-ccip/execute/internal/gas:
interfaces:
Expand All @@ -28,6 +26,10 @@ packages:
CCIPReader:
PriceReader:
RMNHome:
github.com/smartcontractkit/chainlink-ccip/pkg/peergroup:
interfaces:
PeerGroupFactory:
PeerGroup:
github.com/smartcontractkit/chainlink-ccip/pkg/contractreader:
interfaces:
Extended:
Expand Down
13 changes: 6 additions & 7 deletions commit/merkleroot/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pkg/consts"
readerpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

func (w *Processor) ObservationQuorum(
Expand Down Expand Up @@ -86,21 +88,18 @@ func (w *Processor) initializeRMNController(ctx context.Context, prevOutcome Out
return fmt.Errorf("failed to get RMN nodes info: %w", err)
}

peerIDs := make([]string, 0, len(rmnNodesInfo))
for _, node := range rmnNodesInfo {
w.lggr.Infow("Adding RMN node to peerIDs", "node", node.PeerID.String())
peerIDs = append(peerIDs, node.PeerID.String())
}
oraclePeerIDs := make([]ragep2ptypes.PeerID, 0, len(w.oracleIDToP2pID))
for _, p2pID := range w.oracleIDToP2pID {
w.lggr.Infow("Adding oracle node to peerIDs", "p2pID", p2pID.String())
peerIDs = append(peerIDs, p2pID.String())
oraclePeerIDs = append(oraclePeerIDs, p2pID)
}

if err := w.rmnController.InitConnection(
ctx,
cciptypes.Bytes32(w.reportingCfg.ConfigDigest),
prevOutcome.RMNRemoteCfg.ConfigDigest,
peerIDs,
oraclePeerIDs,
rmnNodesInfo,
); err != nil {
return fmt.Errorf("failed to init connection to RMN: %w", err)
}
Expand Down
5 changes: 4 additions & 1 deletion commit/merkleroot/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/ragep2p/types"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand Down Expand Up @@ -623,13 +624,15 @@ func Test_Processor_initializeRMNController(t *testing.T) {
{ID: 1, PeerID: types.PeerID{1, 2, 3}},
{ID: 10, PeerID: types.PeerID{1, 2, 31}},
}
oracleIDs := []ragep2ptypes.PeerID{}
rmnHomeReader.EXPECT().GetRMNNodesInfo(cfg.ConfigDigest).Return(rmnNodes, nil)

rmnController.EXPECT().InitConnection(
ctx,
cciptypes.Bytes32(p.reportingCfg.ConfigDigest),
cfg.ConfigDigest,
[]string{rmnNodes[0].PeerID.String(), rmnNodes[1].PeerID.String()},
oracleIDs,
rmnNodes,
).Return(nil)

err = p.initializeRMNController(ctx, Outcome{RMNRemoteCfg: cfg})
Expand Down
9 changes: 6 additions & 3 deletions commit/merkleroot/rmn/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/protobuf/proto"

chainsel "github.com/smartcontractkit/chain-selectors"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

Expand Down Expand Up @@ -59,7 +60,8 @@ type Controller interface {
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
peerIDs []string, // union of oraclePeerIDs and rmnNodePeerIDs (oracles required for peer discovery)
oraclePeerIDs []ragep2ptypes.PeerID,
rmnNodes []rmntypes.HomeNodeInfo,
) error

// Close closes the connection to the generic peer group endpoint and all the underlying streams.
Expand Down Expand Up @@ -242,9 +244,10 @@ func (c *controller) InitConnection(
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
peerIDs []string,
oraclePeerIDs []ragep2ptypes.PeerID,
rmnNodes []rmntypes.HomeNodeInfo,
) error {
return c.peerClient.InitConnection(ctx, commitConfigDigest, rmnHomeConfigDigest, peerIDs)
return c.peerClient.InitConnection(ctx, commitConfigDigest, rmnHomeConfigDigest, oraclePeerIDs, rmnNodes)
}

func (c *controller) Close() error {
Expand Down
8 changes: 7 additions & 1 deletion commit/merkleroot/rmn/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/protobuf/proto"

chainsel "github.com/smartcontractkit/chain-selectors"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"

readerpkg_mock "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/reader"

Expand Down Expand Up @@ -692,7 +693,12 @@ func (m *mockPeerClient) resetReceivedRequests() {
m.receivedRequests = make(map[rmntypes.NodeID][]*rmnpb.Request)
}

func (m *mockPeerClient) InitConnection(_ context.Context, _ cciptypes.Bytes32, _ cciptypes.Bytes32, _ []string) error {
func (m *mockPeerClient) InitConnection(
_ context.Context,
_ cciptypes.Bytes32,
_ cciptypes.Bytes32,
_ []ragep2ptypes.PeerID,
_ []rmntypes.HomeNodeInfo) error {
return nil
}

Expand Down
71 changes: 25 additions & 46 deletions commit/merkleroot/rmn/peerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ package rmn

import (
"context"
"crypto/sha256"
"encoding/binary"
"fmt"
"strings"
"sync"
"time"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/networking"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

rmntypes "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/types"
"github.com/smartcontractkit/chainlink-ccip/pkg/peergroup"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

var ErrNoConn = fmt.Errorf("no connection, please call InitConnection before further interaction")
Expand All @@ -31,7 +31,8 @@ type PeerClient interface {
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
peerIDs []string, // union of oraclePeerIDs and rmnNodePeerIDs (oracles required for peer discovery)
oraclePeerIDs []ragep2ptypes.PeerID,
rmnNodes []rmntypes.HomeNodeInfo,
) error

Close() error
Expand All @@ -54,9 +55,9 @@ type PeerResponse struct {

type peerClient struct {
lggr logger.Logger
peerGroupFactory PeerGroupFactory
peerGroupCreator *peergroup.Creator
respChan chan PeerResponse
peerGroup PeerGroup // nil initially, until InitConnection is called
peerGroup peergroup.PeerGroup
genericEndpointConfigDigest cciptypes.Bytes32
rageP2PStreams map[rmntypes.NodeID]Stream
bootstrappers []commontypes.BootstrapperLocator
Expand All @@ -67,13 +68,13 @@ type peerClient struct {

func NewPeerClient(
lggr logger.Logger,
peerGroupFactory PeerGroupFactory,
peerGroupFactory peergroup.PeerGroupFactory,
bootstrappers []commontypes.BootstrapperLocator,
ocrRoundInterval time.Duration,
) PeerClient {
return &peerClient{
lggr: lggr,
peerGroupFactory: peerGroupFactory,
peerGroupCreator: peergroup.NewCreator(lggr, peerGroupFactory, bootstrappers),
respChan: make(chan PeerResponse),
peerGroup: nil,
rageP2PStreams: make(map[rmntypes.NodeID]Stream),
Expand All @@ -87,7 +88,9 @@ func NewPeerClient(
func (r *peerClient) InitConnection(
_ context.Context,
commitConfigDigest, rmnHomeConfigDigest cciptypes.Bytes32,
peerIDs []string,
oraclePeerIDs []ragep2ptypes.PeerID,
rmnNodes []rmntypes.HomeNodeInfo,

) error {
if err := r.Close(); err != nil {
return fmt.Errorf("close existing peer group: %w", err)
Expand All @@ -96,25 +99,20 @@ func (r *peerClient) InitConnection(
r.mu.Lock()
defer r.mu.Unlock()

h := sha256.Sum256(append(commitConfigDigest[:], rmnHomeConfigDigest[:]...))
r.genericEndpointConfigDigest = writePrefix(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo, h)
r.lggr.Infow("Creating new peer group",
"genericEndpointConfigDigest", r.genericEndpointConfigDigest.String(),
"peerIDs", peerIDs,
"bootstrappers", r.bootstrappers,
)

peerGroup, err := r.peerGroupFactory.NewPeerGroup(
[32]byte(r.genericEndpointConfigDigest),
peerIDs,
r.bootstrappers,
)

result, err := r.peerGroupCreator.Create(peergroup.CreateOpts{
CommitConfigDigest: commitConfigDigest,
RMNHomeConfigDigest: rmnHomeConfigDigest,
// Note: For RMN peer client, we receive the combined peer IDs directly
// and don't need to separate oracle/RMN peers
OraclePeerIDs: oraclePeerIDs,
RMNNodes: rmnNodes,
})
if err != nil {
return fmt.Errorf("new peer group: %w", err)
return fmt.Errorf("create peer group: %w", err)
}

r.peerGroup = peerGroup
r.peerGroup = result.PeerGroup
r.genericEndpointConfigDigest = result.ConfigDigest
return nil
}

Expand All @@ -131,6 +129,8 @@ func (r *peerClient) Close() error {
return fmt.Errorf("close peer group: %w", err)
}

r.peerGroup = nil
r.rageP2PStreams = make(map[rmntypes.NodeID]Stream)
return nil
}

Expand Down Expand Up @@ -197,28 +197,7 @@ func (r *peerClient) Recv() <-chan PeerResponse {
return r.respChan
}

// writePrefix writes the prefix to the first 2 bytes of the hash.
func writePrefix(prefix ocr2types.ConfigDigestPrefix, hash cciptypes.Bytes32) cciptypes.Bytes32 {
var prefixBytes [2]byte
binary.BigEndian.PutUint16(prefixBytes[:], uint16(prefix))

hCopy := hash
hCopy[0] = prefixBytes[0]
hCopy[1] = prefixBytes[1]

return hCopy
}

// Redeclare interfaces for mocking purposes.

type PeerGroupFactory interface {
networking.PeerGroupFactory
}

type PeerGroup interface {
networking.PeerGroup
}

type Stream interface {
networking.Stream
}
19 changes: 0 additions & 19 deletions commit/merkleroot/rmn/peerclient_test.go

This file was deleted.

25 changes: 14 additions & 11 deletions mocks/commit/merkleroot/rmn/controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ac32780

Please sign in to comment.