diff --git a/.mockery.yaml b/.mockery.yaml
index 2c8938c95..ad4d491b1 100644
--- a/.mockery.yaml
+++ b/.mockery.yaml
@@ -17,8 +17,6 @@ packages:
github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn:
interfaces:
Controller:
- PeerGroup:
- PeerGroupFactory:
Stream:
github.com/smartcontractkit/chainlink-ccip/execute/internal/gas:
interfaces:
@@ -28,6 +26,10 @@ packages:
CCIPReader:
PriceReader:
RMNHome:
+ github.com/smartcontractkit/chainlink-ccip/pkg/peergroup:
+ interfaces:
+ PeerGroupFactory:
+ PeerGroup:
github.com/smartcontractkit/chainlink-ccip/pkg/contractreader:
interfaces:
Extended:
diff --git a/README.md b/README.md
index e202ea482..ed5732f7a 100644
--- a/README.md
+++ b/README.md
@@ -1,12 +1,23 @@
+
+
+
+
+
+[![License](https://img.shields.io/static/v1?label=license&message=BUSL%201.1&color=green)](https://github.com/smartcontractkit/chainlink-ccip/blob/master/LICENSE)
+[![Code Documentation](https://img.shields.io/static/v1?label=code-docs&message=latest&color=blue)](docs/ccip_protocol.md)
+[![API Documentation](https://img.shields.io/static/v1?label=api-docs&message=latest&color=blue)](https://docs.chain.link/ccip)
+
+
# chainlink-ccip
-This is the repo that implements the OCR3 CCIP plugins. This includes the commit and execution plugins.
+This repo contains [OCR3 plugins][ocr3] for CCIP. See the [documentation](docs/ccip_protocol.md) for more.
## Getting Started
### Go Version
-This repo uses Go 1.22. You can install Go from their [installation page](https://go.dev/doc/install).
+The version of go is specified in the project's [go.mod](go.mod) file.
+You can install Go from their [installation page](https://go.dev/doc/install).
### Running the Linter
@@ -50,3 +61,5 @@ the `ccip-develop` branch of chainlink-ccip. You can do this by:
5. Go back to your chainlink-ccip PR and re-run the integration test workflow.
6. Once the integration test passes, merge your chainlink-ccip PR into `ccip-develop`, however do not delete the branch on the remote.
7. Create a new PR in ccip that points to the newly merged commit in the `ccip-develop` tree and merge that.
+
+[ocr3]: https://github.com/smartcontractkit/libocr/blob/master/offchainreporting2plus/ocr3types/plugin.go#L108
diff --git a/commit/merkleroot/observation.go b/commit/merkleroot/observation.go
index 5097e614b..f5923f190 100644
--- a/commit/merkleroot/observation.go
+++ b/commit/merkleroot/observation.go
@@ -29,6 +29,8 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pkg/consts"
readerpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
+
+ ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)
func (w *Processor) ObservationQuorum(
@@ -86,21 +88,18 @@ func (w *Processor) initializeRMNController(ctx context.Context, prevOutcome Out
return fmt.Errorf("failed to get RMN nodes info: %w", err)
}
- peerIDs := make([]string, 0, len(rmnNodesInfo))
- for _, node := range rmnNodesInfo {
- w.lggr.Infow("Adding RMN node to peerIDs", "node", node.PeerID.String())
- peerIDs = append(peerIDs, node.PeerID.String())
- }
+ oraclePeerIDs := make([]ragep2ptypes.PeerID, 0, len(w.oracleIDToP2pID))
for _, p2pID := range w.oracleIDToP2pID {
w.lggr.Infow("Adding oracle node to peerIDs", "p2pID", p2pID.String())
- peerIDs = append(peerIDs, p2pID.String())
+ oraclePeerIDs = append(oraclePeerIDs, p2pID)
}
if err := w.rmnController.InitConnection(
ctx,
cciptypes.Bytes32(w.reportingCfg.ConfigDigest),
prevOutcome.RMNRemoteCfg.ConfigDigest,
- peerIDs,
+ oraclePeerIDs,
+ rmnNodesInfo,
); err != nil {
return fmt.Errorf("failed to init connection to RMN: %w", err)
}
diff --git a/commit/merkleroot/observation_test.go b/commit/merkleroot/observation_test.go
index ba00e8ff1..f31c54670 100644
--- a/commit/merkleroot/observation_test.go
+++ b/commit/merkleroot/observation_test.go
@@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/ragep2p/types"
+ ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
@@ -623,13 +624,15 @@ func Test_Processor_initializeRMNController(t *testing.T) {
{ID: 1, PeerID: types.PeerID{1, 2, 3}},
{ID: 10, PeerID: types.PeerID{1, 2, 31}},
}
+ oracleIDs := []ragep2ptypes.PeerID{}
rmnHomeReader.EXPECT().GetRMNNodesInfo(cfg.ConfigDigest).Return(rmnNodes, nil)
rmnController.EXPECT().InitConnection(
ctx,
cciptypes.Bytes32(p.reportingCfg.ConfigDigest),
cfg.ConfigDigest,
- []string{rmnNodes[0].PeerID.String(), rmnNodes[1].PeerID.String()},
+ oracleIDs,
+ rmnNodes,
).Return(nil)
err = p.initializeRMNController(ctx, Outcome{RMNRemoteCfg: cfg})
diff --git a/commit/merkleroot/processor.go b/commit/merkleroot/processor.go
index 90b78c85b..4c893dddf 100644
--- a/commit/merkleroot/processor.go
+++ b/commit/merkleroot/processor.go
@@ -1,14 +1,12 @@
package merkleroot
import (
- "errors"
- "fmt"
-
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
libocrtypes "github.com/smartcontractkit/libocr/ragep2p/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
+ "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn"
"github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
@@ -85,23 +83,5 @@ func (p *Processor) Close() error {
return nil
}
- errs := make([]error, 0)
-
- // close rmn controller
- if p.rmnController != nil {
- if err := p.rmnController.Close(); err != nil {
- errs = append(errs, fmt.Errorf("close RMN controller: %w", err))
- p.lggr.Errorw("Failed to close RMN controller", "err", err)
- }
- }
-
- // close rmn home reader
- if p.rmnHomeReader != nil {
- if err := p.rmnHomeReader.Close(); err != nil {
- errs = append(errs, fmt.Errorf("close RMNHome reader: %w", err))
- p.lggr.Errorw("Failed to close RMNHome reader", "err", err)
- }
- }
-
- return errors.Join(errs...)
+ return services.CloseAll(p.rmnController, p.rmnHomeReader)
}
diff --git a/commit/merkleroot/rmn/controller.go b/commit/merkleroot/rmn/controller.go
index 9a7688c06..5cdd2e330 100644
--- a/commit/merkleroot/rmn/controller.go
+++ b/commit/merkleroot/rmn/controller.go
@@ -19,6 +19,7 @@ import (
"google.golang.org/protobuf/proto"
chainsel "github.com/smartcontractkit/chain-selectors"
+ ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -59,7 +60,8 @@ type Controller interface {
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
- peerIDs []string, // union of oraclePeerIDs and rmnNodePeerIDs (oracles required for peer discovery)
+ oraclePeerIDs []ragep2ptypes.PeerID,
+ rmnNodes []rmntypes.HomeNodeInfo,
) error
// Close closes the connection to the generic peer group endpoint and all the underlying streams.
@@ -242,9 +244,10 @@ func (c *controller) InitConnection(
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
- peerIDs []string,
+ oraclePeerIDs []ragep2ptypes.PeerID,
+ rmnNodes []rmntypes.HomeNodeInfo,
) error {
- return c.peerClient.InitConnection(ctx, commitConfigDigest, rmnHomeConfigDigest, peerIDs)
+ return c.peerClient.InitConnection(ctx, commitConfigDigest, rmnHomeConfigDigest, oraclePeerIDs, rmnNodes)
}
func (c *controller) Close() error {
@@ -263,25 +266,36 @@ func (c *controller) getRmnSignedObservations(
requestedNodes := make(map[uint64]mapset.Set[rmntypes.NodeID]) // sourceChain -> requested rmnNodeIDs
requestsPerNode := make(map[rmntypes.NodeID][]*rmnpb.FixedDestLaneUpdateRequest) // grouped requests for each node
- // For each lane update request send an observation request to at most 'F+1' number of rmn nodes.
- // At this point we can safely assume that we have at least F+1 supporting each source chain.
- for sourceChain, updateRequest := range updateRequestsPerChain {
- requestedNodes[sourceChain] = mapset.NewSet[rmntypes.NodeID]()
- homeChainF, exist := homeFMap[cciptypes.ChainSelector(sourceChain)]
- if !exist {
- return nil, fmt.Errorf("no home F for chain %d", sourceChain)
+ // Send to every RMN node all the lane update requests it supports until all chains have a sufficient amount.
+ // of initial observers. Upon timer expiration, additional requests are sent to the rest of the RMN nodes.
+
+ chainsWithEnoughRequests := mapset.NewSet[uint64]()
+ for nodeID := range rmnNodeInfo {
+ if chainsWithEnoughRequests.Cardinality() == len(updateRequestsPerChain) {
+ break // We have enough initial observers for all source chains.
}
- for nodeID := range updateRequest.RmnNodes.Iter() {
- if consensus.Threshold(requestedNodes[sourceChain].Cardinality()) >= consensus.FPlus1(homeChainF) {
- break // We have enough initial observers for this source chain.
+ for sourceChain, updateRequest := range updateRequestsPerChain {
+ // if this node cannot support the source chain, skip it
+ if !updateRequest.RmnNodes.Contains(nodeID) {
+ continue
}
- requestedNodes[sourceChain].Add(nodeID)
- if _, exists := requestsPerNode[nodeID]; !exists {
- requestsPerNode[nodeID] = make([]*rmnpb.FixedDestLaneUpdateRequest, 0)
+ // add the node as a requested observer for this source chain
+ if _, ok := requestedNodes[sourceChain]; !ok {
+ requestedNodes[sourceChain] = mapset.NewSet[rmntypes.NodeID]()
}
+ requestedNodes[sourceChain].Add(nodeID)
requestsPerNode[nodeID] = append(requestsPerNode[nodeID], updateRequest.Data)
+
+ // if we already have enough requests for this source chain, mark it
+ homeChainF, exist := homeFMap[cciptypes.ChainSelector(sourceChain)]
+ if !exist {
+ return nil, fmt.Errorf("no home F for chain %d", sourceChain)
+ }
+ if consensus.Threshold(requestedNodes[sourceChain].Cardinality()) >= consensus.FPlus1(homeChainF) {
+ chainsWithEnoughRequests.Add(sourceChain)
+ }
}
}
diff --git a/commit/merkleroot/rmn/controller_test.go b/commit/merkleroot/rmn/controller_test.go
index af10377b4..6a387c0de 100644
--- a/commit/merkleroot/rmn/controller_test.go
+++ b/commit/merkleroot/rmn/controller_test.go
@@ -18,6 +18,7 @@ import (
"google.golang.org/protobuf/proto"
chainsel "github.com/smartcontractkit/chain-selectors"
+ ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
readerpkg_mock "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/reader"
@@ -692,7 +693,12 @@ func (m *mockPeerClient) resetReceivedRequests() {
m.receivedRequests = make(map[rmntypes.NodeID][]*rmnpb.Request)
}
-func (m *mockPeerClient) InitConnection(_ context.Context, _ cciptypes.Bytes32, _ cciptypes.Bytes32, _ []string) error {
+func (m *mockPeerClient) InitConnection(
+ _ context.Context,
+ _ cciptypes.Bytes32,
+ _ cciptypes.Bytes32,
+ _ []ragep2ptypes.PeerID,
+ _ []rmntypes.HomeNodeInfo) error {
return nil
}
diff --git a/commit/merkleroot/rmn/peerclient.go b/commit/merkleroot/rmn/peerclient.go
index 5d883b551..8e4da38d0 100644
--- a/commit/merkleroot/rmn/peerclient.go
+++ b/commit/merkleroot/rmn/peerclient.go
@@ -4,20 +4,21 @@ package rmn
import (
"context"
- "crypto/sha256"
- "encoding/binary"
"fmt"
"strings"
"sync"
+ "time"
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/networking"
- ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
rmntypes "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/types"
+ "github.com/smartcontractkit/chainlink-ccip/pkg/peergroup"
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
+
+ ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)
var ErrNoConn = fmt.Errorf("no connection, please call InitConnection before further interaction")
@@ -30,7 +31,8 @@ type PeerClient interface {
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
rmnHomeConfigDigest cciptypes.Bytes32,
- peerIDs []string, // union of oraclePeerIDs and rmnNodePeerIDs (oracles required for peer discovery)
+ oraclePeerIDs []ragep2ptypes.PeerID,
+ rmnNodes []rmntypes.HomeNodeInfo,
) error
Close() error
@@ -53,28 +55,32 @@ type PeerResponse struct {
type peerClient struct {
lggr logger.Logger
- peerGroupFactory PeerGroupFactory
+ peerGroupCreator *peergroup.Creator
respChan chan PeerResponse
- peerGroup PeerGroup // nil initially, until InitConnection is called
+ peerGroup peergroup.PeerGroup
genericEndpointConfigDigest cciptypes.Bytes32
rageP2PStreams map[rmntypes.NodeID]Stream
bootstrappers []commontypes.BootstrapperLocator
- mu *sync.RWMutex
+ // ocrRoundInterval is the estimated interval between OCR rounds.
+ ocrRoundInterval time.Duration
+ mu *sync.RWMutex
}
func NewPeerClient(
lggr logger.Logger,
- peerGroupFactory PeerGroupFactory,
+ peerGroupFactory peergroup.PeerGroupFactory,
bootstrappers []commontypes.BootstrapperLocator,
+ ocrRoundInterval time.Duration,
) PeerClient {
return &peerClient{
lggr: lggr,
- peerGroupFactory: peerGroupFactory,
+ peerGroupCreator: peergroup.NewCreator(lggr, peerGroupFactory, bootstrappers),
respChan: make(chan PeerResponse),
peerGroup: nil,
rageP2PStreams: make(map[rmntypes.NodeID]Stream),
genericEndpointConfigDigest: cciptypes.Bytes32{},
bootstrappers: bootstrappers,
+ ocrRoundInterval: ocrRoundInterval,
mu: &sync.RWMutex{},
}
}
@@ -82,7 +88,9 @@ func NewPeerClient(
func (r *peerClient) InitConnection(
_ context.Context,
commitConfigDigest, rmnHomeConfigDigest cciptypes.Bytes32,
- peerIDs []string,
+ oraclePeerIDs []ragep2ptypes.PeerID,
+ rmnNodes []rmntypes.HomeNodeInfo,
+
) error {
if err := r.Close(); err != nil {
return fmt.Errorf("close existing peer group: %w", err)
@@ -91,25 +99,20 @@ func (r *peerClient) InitConnection(
r.mu.Lock()
defer r.mu.Unlock()
- h := sha256.Sum256(append(commitConfigDigest[:], rmnHomeConfigDigest[:]...))
- r.genericEndpointConfigDigest = writePrefix(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo, h)
- r.lggr.Infow("Creating new peer group",
- "genericEndpointConfigDigest", r.genericEndpointConfigDigest.String(),
- "peerIDs", peerIDs,
- "bootstrappers", r.bootstrappers,
- )
-
- peerGroup, err := r.peerGroupFactory.NewPeerGroup(
- [32]byte(r.genericEndpointConfigDigest),
- peerIDs,
- r.bootstrappers,
- )
-
+ result, err := r.peerGroupCreator.Create(peergroup.CreateOpts{
+ CommitConfigDigest: commitConfigDigest,
+ RMNHomeConfigDigest: rmnHomeConfigDigest,
+ // Note: For RMN peer client, we receive the combined peer IDs directly
+ // and don't need to separate oracle/RMN peers
+ OraclePeerIDs: oraclePeerIDs,
+ RMNNodes: rmnNodes,
+ })
if err != nil {
- return fmt.Errorf("new peer group: %w", err)
+ return fmt.Errorf("create peer group: %w", err)
}
- r.peerGroup = peerGroup
+ r.peerGroup = result.PeerGroup
+ r.genericEndpointConfigDigest = result.ConfigDigest
return nil
}
@@ -126,6 +129,8 @@ func (r *peerClient) Close() error {
return fmt.Errorf("close peer group: %w", err)
}
+ r.peerGroup = nil
+ r.rageP2PStreams = make(map[rmntypes.NodeID]Stream)
return nil
}
@@ -168,7 +173,7 @@ func (r *peerClient) getOrCreateRageP2PStream(rmnNode rmntypes.HomeNodeInfo) (St
)
var err error
- stream, err = r.peerGroup.NewStream(rmnPeerID, newStreamConfig(r.lggr, streamName))
+ stream, err = r.peerGroup.NewStream(rmnPeerID, newStreamConfig(r.lggr, streamName, r.ocrRoundInterval))
if err != nil {
return nil, fmt.Errorf("new stream %s: %w", streamName, err)
}
@@ -192,28 +197,7 @@ func (r *peerClient) Recv() <-chan PeerResponse {
return r.respChan
}
-// writePrefix writes the prefix to the first 2 bytes of the hash.
-func writePrefix(prefix ocr2types.ConfigDigestPrefix, hash cciptypes.Bytes32) cciptypes.Bytes32 {
- var prefixBytes [2]byte
- binary.BigEndian.PutUint16(prefixBytes[:], uint16(prefix))
-
- hCopy := hash
- hCopy[0] = prefixBytes[0]
- hCopy[1] = prefixBytes[1]
-
- return hCopy
-}
-
// Redeclare interfaces for mocking purposes.
-
-type PeerGroupFactory interface {
- networking.PeerGroupFactory
-}
-
-type PeerGroup interface {
- networking.PeerGroup
-}
-
type Stream interface {
networking.Stream
}
diff --git a/commit/merkleroot/rmn/peerclient_test.go b/commit/merkleroot/rmn/peerclient_test.go
deleted file mode 100644
index ddce737fd..000000000
--- a/commit/merkleroot/rmn/peerclient_test.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package rmn
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-
- ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
-
- cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
-)
-
-func Test_writePrefix(t *testing.T) {
- prefix := 5130 // 0x140a -> {20, 10}
- h := cciptypes.Bytes32{1, 2, 3, 4, 5}
- exp := cciptypes.Bytes32{20, 10, 3, 4, 5}
- got := writePrefix(ocr2types.ConfigDigestPrefix(prefix), h)
- assert.Equal(t, exp, got)
-}
diff --git a/commit/merkleroot/rmn/streamconfig.go b/commit/merkleroot/rmn/streamconfig.go
index 5421ac577..6128f366c 100644
--- a/commit/merkleroot/rmn/streamconfig.go
+++ b/commit/merkleroot/rmn/streamconfig.go
@@ -22,40 +22,38 @@ const (
// initialObservationRequest + observationRequestWithOtherSourcesAfterTimeout + reportSignatureRequest
maxNumOfMsgsPerRound = 3
- // values below chosen by research team
rateScale = 1.2
- capacityScale = 3
+ capacityScale = 5
// bufferSize should be set to 1 as advised by the RMN team.
outgoingBufferSize = 1
incomingBufferSize = 1
-
- estimatedRoundInterval = time.Second
)
var (
- maxObservationRequestBytes int
- maxReportSigRequestBytes int
+ maxObservationResponseBytes int
+ maxReportSigResponseBytes int
)
func newStreamConfig(
lggr logger.Logger,
streamName string,
+ estimatedOcrRoundInterval time.Duration,
) networking.NewStreamArgs1 {
cfg := networking.NewStreamArgs1{
StreamName: streamName,
OutgoingBufferSize: outgoingBufferSize,
IncomingBufferSize: incomingBufferSize,
MaxMessageLength: maxMessageLength(),
- MessagesLimit: messagesLimit(),
- BytesLimit: bytesLimit(),
+ MessagesLimit: messagesLimit(estimatedOcrRoundInterval),
+ BytesLimit: bytesLimit(estimatedOcrRoundInterval),
}
lggr.Infow("new stream config",
"streamName", streamName,
"cfg", cfg,
- "maxObservationRequestBytes", maxObservationRequestBytes,
- "maxReportSigRequestBytes", maxReportSigRequestBytes,
+ "maxObservationResponseBytes", maxObservationResponseBytes,
+ "maxReportSigResponseBytes", maxReportSigResponseBytes,
)
return cfg
@@ -63,20 +61,20 @@ func newStreamConfig(
func maxMessageLength() int {
return max(
- maxObservationRequestBytes,
- maxReportSigRequestBytes,
+ maxObservationResponseBytes,
+ maxReportSigResponseBytes,
)
}
-func messagesLimit() ragep2p.TokenBucketParams {
+func messagesLimit(estimatedRoundInterval time.Duration) ragep2p.TokenBucketParams {
return ragep2p.TokenBucketParams{
Rate: rateScale * (float64(maxNumOfMsgsPerRound) / estimatedRoundInterval.Seconds()),
- Capacity: maxNumOfMsgsPerRound * capacityScale,
+ Capacity: maxNumOfMsgsPerRound * uint32(capacityScale/estimatedRoundInterval.Seconds()),
}
}
-func bytesLimit() ragep2p.TokenBucketParams {
- maxSumLenOutboundPerRound := (2 * maxObservationRequestBytes) + maxReportSigRequestBytes
+func bytesLimit(estimatedRoundInterval time.Duration) ragep2p.TokenBucketParams {
+ maxSumLenOutboundPerRound := maxObservationResponseBytes + maxReportSigResponseBytes
return ragep2p.TokenBucketParams{
Rate: (float64(maxSumLenOutboundPerRound) / estimatedRoundInterval.Seconds()) * rateScale,
@@ -86,45 +84,6 @@ func bytesLimit() ragep2p.TokenBucketParams {
// compute max observation request size and max report signatures request size
func init() {
- req := &rmnpb.Request{
- Request: &rmnpb.Request_ObservationRequest{
- ObservationRequest: &rmnpb.ObservationRequest{
- LaneDest: &rmnpb.LaneDest{
- DestChainSelector: math.MaxUint64,
- OfframpAddress: make([]byte, 32),
- },
- FixedDestLaneUpdateRequests: make([]*rmnpb.FixedDestLaneUpdateRequest, 0, estimatedMaxNumberOfSourceChains),
- },
- },
- }
- for i := 0; i < estimatedMaxNumberOfSourceChains; i++ {
- req.GetObservationRequest().FixedDestLaneUpdateRequests = append(
- req.GetObservationRequest().FixedDestLaneUpdateRequests, &rmnpb.FixedDestLaneUpdateRequest{
- LaneSource: &rmnpb.LaneSource{
- SourceChainSelector: math.MaxUint64,
- OnrampAddress: make([]byte, 32),
- },
- ClosedInterval: &rmnpb.ClosedInterval{
- MinMsgNr: math.MaxUint64,
- MaxMsgNr: math.MaxUint64,
- },
- },
- )
- }
- reqBytes, err := proto.Marshal(req)
- if err != nil {
- panic(err)
- }
- maxObservationRequestBytes = len(reqBytes)
-
- req = &rmnpb.Request{
- Request: &rmnpb.Request_ReportSignatureRequest{
- ReportSignatureRequest: &rmnpb.ReportSignatureRequest{
- Context: &rmnpb.ReportContext{},
- AttributedSignedObservations: make([]*rmnpb.AttributedSignedObservation, 0, estimatedMaxNumberOfSourceChains),
- },
- },
- }
fixedDestLaneUpdates := make([]*rmnpb.FixedDestLaneUpdate, 0, estimatedMaxNumberOfSourceChains)
for i := 0; i < estimatedMaxNumberOfSourceChains; i++ {
fixedDestLaneUpdates = append(fixedDestLaneUpdates, &rmnpb.FixedDestLaneUpdate{
@@ -132,33 +91,54 @@ func init() {
SourceChainSelector: math.MaxUint64,
OnrampAddress: make([]byte, 32),
},
- ClosedInterval: &rmnpb.ClosedInterval{MinMsgNr: math.MaxUint64, MaxMsgNr: math.MaxUint64},
- Root: make([]byte, 32),
+ ClosedInterval: &rmnpb.ClosedInterval{
+ MinMsgNr: math.MaxUint64,
+ MaxMsgNr: math.MaxUint64,
+ },
+ Root: make([]byte, 32),
})
}
- for i := 0; i < estimatedMaxNumberOfSourceChains; i++ {
- req.GetReportSignatureRequest().AttributedSignedObservations = append(
- req.GetReportSignatureRequest().AttributedSignedObservations, &rmnpb.AttributedSignedObservation{
- SignedObservation: &rmnpb.SignedObservation{
- Observation: &rmnpb.Observation{
- RmnHomeContractConfigDigest: make([]byte, 32),
- LaneDest: &rmnpb.LaneDest{
- DestChainSelector: math.MaxUint64,
- OfframpAddress: make([]byte, 32),
- },
- FixedDestLaneUpdates: fixedDestLaneUpdates,
- Timestamp: math.MaxUint64,
+ obsResponse := &rmnpb.Response{
+ RequestId: math.MaxUint64,
+ Response: &rmnpb.Response_SignedObservation{
+ SignedObservation: &rmnpb.SignedObservation{
+ Observation: &rmnpb.Observation{
+ RmnHomeContractConfigDigest: make([]byte, 32),
+ LaneDest: &rmnpb.LaneDest{
+ DestChainSelector: math.MaxUint64,
+ OfframpAddress: make([]byte, 32),
},
- Signature: make([]byte, 256),
+ FixedDestLaneUpdates: fixedDestLaneUpdates,
+ Timestamp: math.MaxUint64,
},
- SignerNodeIndex: math.MaxUint32,
- })
+ Signature: make([]byte, 256),
+ },
+ },
}
- reqBytes, err = proto.Marshal(req)
+ sigResponse := &rmnpb.Response{
+ RequestId: math.MaxUint64,
+ Response: &rmnpb.Response_ReportSignature{
+ ReportSignature: &rmnpb.ReportSignature{
+ Signature: &rmnpb.EcdsaSignature{
+ R: make([]byte, 32),
+ S: make([]byte, 32),
+ },
+ },
+ },
+ }
+
+ obsResponseBytes, err := proto.Marshal(obsResponse)
if err != nil {
panic(err)
}
- maxReportSigRequestBytes = len(reqBytes)
+
+ sigResponseBytes, err := proto.Marshal(sigResponse)
+ if err != nil {
+ panic(err)
+ }
+
+ maxObservationResponseBytes = len(obsResponseBytes)
+ maxReportSigResponseBytes = len(sigResponseBytes)
}
diff --git a/commit/merkleroot/rmn/streamconfig_test.go b/commit/merkleroot/rmn/streamconfig_test.go
index e68a393b8..837efe3c3 100644
--- a/commit/merkleroot/rmn/streamconfig_test.go
+++ b/commit/merkleroot/rmn/streamconfig_test.go
@@ -2,47 +2,80 @@ package rmn
import (
"testing"
+ "time"
"github.com/stretchr/testify/assert"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)
+//nolint:dupl
func Test_newStreamConfig(t *testing.T) {
+ const kB = 1024
lggr := logger.Test(t)
-
streamName := "myCoolStream"
- cfg := newStreamConfig(lggr, streamName)
-
- assert.Equal(t, streamName, cfg.StreamName)
-
- assert.Equal(t, 1, cfg.OutgoingBufferSize)
-
- assert.Equal(t, 1, cfg.IncomingBufferSize)
+ t.Run("one second rounds", func(t *testing.T) {
+ cfg := newStreamConfig(lggr, streamName, time.Second)
+ assert.Equal(t, streamName, cfg.StreamName)
+ assert.Equal(t, 1, cfg.OutgoingBufferSize)
+ assert.Equal(t, 1, cfg.IncomingBufferSize)
+ // message length
+ assert.Greater(t, cfg.MaxMessageLength, 52*kB)
+ assert.Less(t, cfg.MaxMessageLength, 53*kB)
+ // message rate
+ assert.Greater(t, cfg.MessagesLimit.Rate, 3.5)
+ assert.Less(t, cfg.MessagesLimit.Rate, 3.6)
+ // message capacity
+ assert.Equal(t, 15, int(cfg.MessagesLimit.Capacity))
+ // bytes rate
+ assert.Greater(t, cfg.BytesLimit.Rate, float64(63*kB))
+ assert.Less(t, cfg.BytesLimit.Rate, float64(64*kB))
+ // bytes capacity
+ assert.Greater(t, int(cfg.BytesLimit.Capacity), 263*kB)
+ assert.Less(t, int(cfg.BytesLimit.Capacity), 265*kB)
+ })
- // message length
- assert.Greater(t, cfg.MaxMessageLength, 25*megaByte)
- assert.Less(t, cfg.MaxMessageLength, 27*megaByte)
+ t.Run("500ms rounds", func(t *testing.T) {
+ cfg := newStreamConfig(lggr, streamName, 500*time.Millisecond)
+ assert.Equal(t, streamName, cfg.StreamName)
+ assert.Equal(t, 1, cfg.OutgoingBufferSize)
+ assert.Equal(t, 1, cfg.IncomingBufferSize)
+ // message length
+ assert.Greater(t, cfg.MaxMessageLength, 52*kB)
+ assert.Less(t, cfg.MaxMessageLength, 53*kB)
+ // message rate
+ assert.Greater(t, cfg.MessagesLimit.Rate, 7.1)
+ assert.Less(t, cfg.MessagesLimit.Rate, 7.2)
+ // message capacity
+ assert.Equal(t, 30, int(cfg.MessagesLimit.Capacity))
+ // bytes rate
+ assert.Greater(t, cfg.BytesLimit.Rate, float64(126*kB))
+ assert.Less(t, cfg.BytesLimit.Rate, float64(128*kB))
+ // bytes capacity
+ assert.Greater(t, int(cfg.BytesLimit.Capacity), 263*kB)
+ assert.Less(t, int(cfg.BytesLimit.Capacity), 265*kB)
+ })
- // message rate
- assert.Greater(t, cfg.MessagesLimit.Rate, 3.4)
- assert.Less(t, cfg.MessagesLimit.Rate, 3.8)
+ t.Run("4s rounds", func(t *testing.T) {
+ cfg := newStreamConfig(lggr, streamName, 4*time.Second)
+ assert.Equal(t, streamName, cfg.StreamName)
+ assert.Equal(t, 1, cfg.OutgoingBufferSize)
+ assert.Equal(t, 1, cfg.IncomingBufferSize)
+ // message length
+ assert.Greater(t, cfg.MaxMessageLength, 52*kB)
+ assert.Less(t, cfg.MaxMessageLength, 53*kB)
+ // message rate
+ assert.Greater(t, cfg.MessagesLimit.Rate, 0.8)
+ assert.Less(t, cfg.MessagesLimit.Rate, 0.9)
+ // message capacity
+ assert.Equal(t, 3, int(cfg.MessagesLimit.Capacity))
+ // bytes rate
+ assert.Greater(t, cfg.BytesLimit.Rate, float64(15*kB))
+ assert.Less(t, cfg.BytesLimit.Rate, float64(17*kB))
+ // bytes capacity
+ assert.Greater(t, int(cfg.BytesLimit.Capacity), 263*kB)
+ assert.Less(t, int(cfg.BytesLimit.Capacity), 265*kB)
+ })
- // message capacity
- assert.Equal(t, 9, int(cfg.MessagesLimit.Capacity))
-
- // bytes rate
- assert.Greater(t, cfg.BytesLimit.Rate, float64(30*megaByte))
- assert.Less(t, cfg.BytesLimit.Rate, float64(33*megaByte))
-
- // bytes capacity
- assert.Greater(t, int(cfg.BytesLimit.Capacity), 77*megaByte)
- assert.Less(t, int(cfg.BytesLimit.Capacity), 80*megaByte)
}
-
-const (
- byt = 1
- kiloByte = 1024 * byt
- megaByte = 1024 * kiloByte
-)
diff --git a/commit/plugin.go b/commit/plugin.go
index 926130c9c..c2ca22c55 100644
--- a/commit/plugin.go
+++ b/commit/plugin.go
@@ -3,7 +3,6 @@ package commit
import (
"context"
"fmt"
- "io"
"time"
"github.com/smartcontractkit/libocr/commontypes"
@@ -379,12 +378,12 @@ func (p *Plugin) Outcome(
}
func (p *Plugin) Close() error {
- return services.CloseAll([]io.Closer{
+ return services.CloseAll(
p.merkleRootProcessor,
p.tokenPriceProcessor,
p.chainFeeProcessor,
p.discoveryProcessor,
- }...)
+ )
}
func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) Outcome {
diff --git a/docs/ccip_protocol.md b/docs/ccip_protocol.md
index 3c20dc8d6..62f6d438d 100644
--- a/docs/ccip_protocol.md
+++ b/docs/ccip_protocol.md
@@ -1,3 +1,76 @@
-# CCIP Protocol
+# Cross-Chain Interoperability Protocol (CCIP)
-TODO: information about the CCIP protocol with links to feature details.
+A protocol designed to transfer data and value across blockchains in a
+trust-minimized manner. This is done using a combination of checks on the source
+chain(s), destination chains, and off-chain security. This document outlines
+key security considerations and details the messaging protocol at the data and
+event processing levels. As this repository is for the off-chain
+portion of CCIP, we'll focus on off-chain components and the on-chain
+integration points.
+
+The user interface is not described here. It is largely unchanged compared to
+the earlier architecture and the [public docoumentation][public-docs] provides
+a comprehensive guide to using CCIP.
+
+## OffChain Reporting (OCR)
+
+The [libOCR][ocr-repo] framework provides security to the system with a
+Distributed Oracle Network (DON) utilizing a [byzantine fault tolerant][bft]
+protocol. Security is enhanced by the [Reporting Plugin][ocr-interface] when
+the plugins ensure observation thresholds on data used by the system.
+See the [Consensus and Sub-consensus](consensus.md) page for more
+information about how the CCIP plugins use and extend the typical OCR
+framework.
+
+## Plugins
+
+The protocol is implemented in two phases, each with a corresponding libOCR
+plugin: commit and execute.
+
+### Commit
+
+In the commit phase, user created messages are collected from every source
+chain, a merkle root is generated for messages from each source, and the roots
+are put into a [Commit Report][commit-report-src]. The report is sent to the
+on-chain code for final processing where a commit report log is written to the
+blockchain.
+
+An independant Risk Management Network (RMN) serves as an additional security
+layer to validate merkle roots. RMN can be thought of as a
+validation network with veto power. In the previous version of CCIP RMN was
+an independent component which provided a blessing or curse after the commit
+report was written to the network. With this architecture the plugin queries
+the RMN network directly so that the RMN signatures can be included in the
+initial report.
+
+The report includes additional gas and token price data required by the
+[billing algorithm](billing.md). Strictly speaking, it is not part of the
+protocol and could be implemented separately. For convenience it is included in
+the commit plugin.
+
+More detail about the implementation can be found in the [README](commit#readme).
+
+### Execute
+
+In the execute phase, the plugin searches for commit reports with pending
+executions. All messages for these commit reports are gathered, and a
+[special merkle proof][merklemulti] is computed for all messages ready for
+execution. These proofs are put into the [Execute Plugin Report][exec-report-src].
+
+Due to the [Role DON](consensus.md#role-don) architecture, this
+process has to be done across several rounds of consensus:
+
+1. Destination readers look for commit reports.
+2. Based on the commit reports, source readers fetch the messages.
+3. Based on the messages, the destination reader determines execution order.
+
+The commit report is sent to the on-chain code for final processing. This
+step includes token transfers, data handling and user contract interaction.
+
+[public-docs]: https://docs.chain.link/ccip
+[ocr-repo]: https://github.com/smartcontractkit/libocr/tree/master
+[bft]: https://en.wikipedia.org/wiki/Byzantine_fault
+[ocr-interface]: https://github.com/smartcontractkit/libocr/blob/adbe57025f12b9958907bb203acba14360bf8ed2/offchainreporting2plus/ocr3types/plugin.go#L165
+[commit-report-src]: https://github.com/smartcontractkit/chainlink-ccip/blob/0f6dce5d1fdb67b3127332ac729191f2c1c790ff/pkg/types/ccipocr3/plugin_commit_types.go#L19
+[merklemulti]: https://github.com/smartcontractkit/chainlink-common/tree/main/pkg/merklemulti
+[exec-report-src]: https://github.com/smartcontractkit/chainlink-common/tree/main/pkg/merklemulti
diff --git a/docs/execution_order.md b/docs/execution_order.md
deleted file mode 100644
index e69de29bb..000000000
diff --git a/docs/token_data.md b/docs/token_data.md
deleted file mode 100644
index e69de29bb..000000000
diff --git a/execute/factory.go b/execute/factory.go
index c44a4bac8..ae6cf37a4 100644
--- a/execute/factory.go
+++ b/execute/factory.go
@@ -137,7 +137,7 @@ func (p PluginFactory) NewReportingPlugin(
costlyMessageObserver := exectypes.NewCostlyMessageObserverWithDefaults(
p.lggr,
- false, // TODO: enable
+ true,
ccipReader,
offchainConfig.RelativeBoostPerWaitHour,
p.estimateProvider,
diff --git a/internal/plugincommon/chainfee.go b/internal/plugincommon/chainfee.go
deleted file mode 100644
index f04ca7822..000000000
--- a/internal/plugincommon/chainfee.go
+++ /dev/null
@@ -1 +0,0 @@
-package plugincommon
diff --git a/internal/plugincommon/discovery/processor.go b/internal/plugincommon/discovery/processor.go
index dcd6f9f26..6fee6ea68 100644
--- a/internal/plugincommon/discovery/processor.go
+++ b/internal/plugincommon/discovery/processor.go
@@ -91,26 +91,45 @@ func (cdp *ContractDiscoveryProcessor) ValidateObservation(
return fmt.Errorf("unable to get supported chains for Oracle %d: %w", ao.OracleID, err)
}
- // check that the oracle is allowed to observe the dest chain.
- if !supportedChains.Contains(cdp.dest) {
- return fmt.Errorf("oracle %d is not allowed to observe chain %s", ao.OracleID, cdp.dest)
+ for contract, addrs := range ao.Observation.Addresses {
+ // some contract addresses come from the destination, others are from the source.
+ switch contract {
+ // discovered on the chain that the contract is deployed on.
+ case consts.ContractNameFeeQuoter,
+ consts.ContractNameRouter:
+ for chain := range addrs {
+ if !supportedChains.Contains(chain) {
+ return fmt.Errorf(
+ "oracle %d is not allowed to observe chain %s for %s", ao.OracleID, chain, contract)
+ }
+ }
+ // discovered on the destination chain.
+ case consts.ContractNameOnRamp,
+ consts.ContractNameOffRamp,
+ consts.ContractNameNonceManager,
+ consts.ContractNameRMNRemote:
+
+ if !supportedChains.Contains(cdp.dest) {
+ return fmt.Errorf(
+ "oracle %d is not allowed to observe contract (%s) on the destination chain %s",
+ ao.OracleID, contract, cdp.dest)
+ }
+ default:
+ return fmt.Errorf("unknown contract name %s", contract)
+ }
}
- // NOTE: once oracles can also discover things on source chains, we must
- // check that they can read whatever source chain is used to determine the
- // address, e.g source fee quoter / router.
-
return nil
}
// aggObs is used to store multiple observations for each value being observed.
type aggObs struct {
fChain map[cciptypes.ChainSelector][]int
- onrampAddrs map[cciptypes.ChainSelector][][]byte
- feeQuoterAddrs map[cciptypes.ChainSelector][][]byte
- nonceManagerAddrs map[cciptypes.ChainSelector][][]byte
- rmnRemoteAddrs map[cciptypes.ChainSelector][][]byte
- routerAddrs map[cciptypes.ChainSelector][][]byte
+ onrampAddrs map[cciptypes.ChainSelector][]cciptypes.UnknownAddress
+ feeQuoterAddrs map[cciptypes.ChainSelector][]cciptypes.UnknownAddress
+ nonceManagerAddrs map[cciptypes.ChainSelector][]cciptypes.UnknownAddress
+ rmnRemoteAddrs map[cciptypes.ChainSelector][]cciptypes.UnknownAddress
+ routerAddrs map[cciptypes.ChainSelector][]cciptypes.UnknownAddress
}
// aggregateObservations combines observations for multiple objects into aggObs, which is a convenient
@@ -122,11 +141,11 @@ func aggregateObservations(
) aggObs {
obs := aggObs{
fChain: make(map[cciptypes.ChainSelector][]int),
- onrampAddrs: make(map[cciptypes.ChainSelector][][]byte),
- feeQuoterAddrs: make(map[cciptypes.ChainSelector][][]byte),
- nonceManagerAddrs: make(map[cciptypes.ChainSelector][][]byte),
- rmnRemoteAddrs: make(map[cciptypes.ChainSelector][][]byte),
- routerAddrs: make(map[cciptypes.ChainSelector][][]byte),
+ onrampAddrs: make(map[cciptypes.ChainSelector][]cciptypes.UnknownAddress),
+ feeQuoterAddrs: make(map[cciptypes.ChainSelector][]cciptypes.UnknownAddress),
+ nonceManagerAddrs: make(map[cciptypes.ChainSelector][]cciptypes.UnknownAddress),
+ rmnRemoteAddrs: make(map[cciptypes.ChainSelector][]cciptypes.UnknownAddress),
+ routerAddrs: make(map[cciptypes.ChainSelector][]cciptypes.UnknownAddress),
}
for _, ao := range aos {
for chainSel, f := range ao.Observation.FChain {
diff --git a/internal/plugincommon/discovery/processor_test.go b/internal/plugincommon/discovery/processor_test.go
index 32e1af2eb..ed099b075 100644
--- a/internal/plugincommon/discovery/processor_test.go
+++ b/internal/plugincommon/discovery/processor_test.go
@@ -39,26 +39,26 @@ func TestContractDiscoveryProcessor_Observation_SupportsDest_HappyPath(t *testin
dest: 1,
source: 2,
}
- expectedNonceManager := []byte("nonceManager")
- expectedOnRamp := map[cciptypes.ChainSelector][]byte{
- source: []byte("onRamp"),
+ expectedNonceManager := cciptypes.UnknownAddress("nonceManager")
+ expectedOnRamp := map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ source: cciptypes.UnknownAddress("onRamp"),
}
- expectedFeeQuoter := map[cciptypes.ChainSelector][]byte{
- source: []byte("from_source_onramp"),
- dest: []byte("from_dest_offramp"),
+ expectedFeeQuoter := map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ source: cciptypes.UnknownAddress("from_source_onramp"),
+ dest: cciptypes.UnknownAddress("from_dest_offramp"),
}
- expectedRMNRemote := []byte("rmnRemote")
- expectedRouter := []byte("router")
+ expectedRMNRemote := cciptypes.UnknownAddress("rmnRemote")
+ expectedRouter := cciptypes.UnknownAddress("router")
expectedContracts := reader.ContractAddresses{
- consts.ContractNameNonceManager: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameNonceManager: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
dest: expectedNonceManager,
},
consts.ContractNameOnRamp: expectedOnRamp,
consts.ContractNameFeeQuoter: expectedFeeQuoter,
- consts.ContractNameRMNRemote: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameRMNRemote: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
dest: expectedRMNRemote,
},
- consts.ContractNameRouter: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameRouter: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
source: expectedRouter,
},
}
@@ -218,22 +218,22 @@ func TestContractDiscoveryProcessor_Outcome_HappyPath(t *testing.T) {
source1: fSource1,
source2: fSource2,
}
- expectedNonceManager := []byte("nonceManager")
- expectedOnRamp := map[cciptypes.ChainSelector][]byte{
- dest: []byte("onRamp"),
+ expectedNonceManager := cciptypes.UnknownAddress("nonceManager")
+ expectedOnRamp := map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ dest: cciptypes.UnknownAddress("onRamp"),
}
- expectedRMNRemote := []byte("rmnRemote")
+ expectedRMNRemote := cciptypes.UnknownAddress("rmnRemote")
expectedContracts := reader.ContractAddresses{
- consts.ContractNameNonceManager: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameNonceManager: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
dest: expectedNonceManager,
},
consts.ContractNameOnRamp: expectedOnRamp,
- consts.ContractNameRMNRemote: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameRMNRemote: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
dest: expectedRMNRemote,
},
- consts.ContractNameFeeQuoter: map[cciptypes.ChainSelector][]byte{
- source1: []byte("feeQuoter1"),
- source2: []byte("feeQuoter2"),
+ consts.ContractNameFeeQuoter: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ source1: cciptypes.UnknownAddress("feeQuoter1"),
+ source2: cciptypes.UnknownAddress("feeQuoter2"),
},
consts.ContractNameRouter: {},
}
@@ -254,7 +254,7 @@ func TestContractDiscoveryProcessor_Outcome_HappyPath(t *testing.T) {
obsSrc := discoverytypes.Observation{
FChain: expectedFChain,
- Addresses: map[string]map[cciptypes.ChainSelector][]byte{
+ Addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
consts.ContractNameOnRamp: expectedOnRamp,
consts.ContractNameNonceManager: {
dest: expectedNonceManager,
@@ -302,21 +302,21 @@ func TestContractDiscovery_Outcome_HappyPath_FRoleDONAndFDestChainAreDifferent(t
source1: fSource1,
source2: fSource2,
}
- expectedNonceManager := []byte("nonceManager")
- expectedOnRamp := map[cciptypes.ChainSelector][]byte{
- dest: []byte("onRamp"),
+ expectedNonceManager := cciptypes.UnknownAddress("nonceManager")
+ expectedOnRamp := map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ dest: cciptypes.UnknownAddress("onRamp"),
}
- expectedRMNRemote := []byte("rmnRemote")
+ expectedRMNRemote := cciptypes.UnknownAddress("rmnRemote")
expectedContracts := reader.ContractAddresses{
- consts.ContractNameNonceManager: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameNonceManager: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
dest: expectedNonceManager,
},
consts.ContractNameOnRamp: expectedOnRamp,
- consts.ContractNameRMNRemote: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameRMNRemote: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
dest: expectedRMNRemote,
},
consts.ContractNameRouter: {
- dest: []byte("router"),
+ dest: cciptypes.UnknownAddress("router"),
},
consts.ContractNameFeeQuoter: {}, // no consensus
}
@@ -337,16 +337,16 @@ func TestContractDiscovery_Outcome_HappyPath_FRoleDONAndFDestChainAreDifferent(t
fChainObs := discoverytypes.Observation{
FChain: expectedFChain,
- Addresses: map[string]map[cciptypes.ChainSelector][]byte{
+ Addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
consts.ContractNameFeeQuoter: {
- source1: []byte("fee_quoter_1"),
- source2: []byte("fee_quoter_2"),
+ source1: cciptypes.UnknownAddress("fee_quoter_1"),
+ source2: cciptypes.UnknownAddress("fee_quoter_2"),
},
},
}
destObs := discoverytypes.Observation{
FChain: expectedFChain,
- Addresses: map[string]map[cciptypes.ChainSelector][]byte{
+ Addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
consts.ContractNameOnRamp: expectedOnRamp,
consts.ContractNameNonceManager: {
dest: expectedNonceManager,
@@ -355,7 +355,7 @@ func TestContractDiscovery_Outcome_HappyPath_FRoleDONAndFDestChainAreDifferent(t
dest: expectedRMNRemote,
},
consts.ContractNameRouter: {
- dest: []byte("router"),
+ dest: cciptypes.UnknownAddress("router"),
},
},
}
@@ -396,11 +396,11 @@ func TestContractDiscoveryProcessor_Outcome_NotEnoughObservations(t *testing.T)
source1: fSource1,
source2: fSource2,
}
- observedNonceManager := []byte("nonceManager")
- observedRMNRemote := []byte("rmnRemote")
- observedOnRamp := map[cciptypes.ChainSelector][]byte{
- source1: []byte("onRamp"),
- source2: []byte("onRamp"),
+ observedNonceManager := cciptypes.UnknownAddress("nonceManager")
+ observedRMNRemote := cciptypes.UnknownAddress("rmnRemote")
+ observedOnRamp := map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ source1: cciptypes.UnknownAddress("onRamp"),
+ source2: cciptypes.UnknownAddress("onRamp"),
}
// we expect no contracts here due to not enough observations to come to consensus.
expectedContracts := reader.ContractAddresses{
@@ -430,7 +430,7 @@ func TestContractDiscoveryProcessor_Outcome_NotEnoughObservations(t *testing.T)
}
destObs := discoverytypes.Observation{
FChain: expectedFChain,
- Addresses: map[string]map[cciptypes.ChainSelector][]byte{
+ Addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
consts.ContractNameOnRamp: observedOnRamp,
consts.ContractNameNonceManager: {
dest: observedNonceManager,
@@ -479,17 +479,17 @@ func TestContractDiscoveryProcessor_Outcome_ErrorSyncingContracts(t *testing.T)
source1: fSource1,
source2: fSource2,
}
- expectedNonceManager := []byte("nonceManager")
- expectedOnRamp := map[cciptypes.ChainSelector][]byte{
- dest: []byte("onRamp"),
+ expectedNonceManager := cciptypes.UnknownAddress("nonceManager")
+ expectedOnRamp := map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ dest: cciptypes.UnknownAddress("onRamp"),
}
- expectedRMNRemote := []byte("rmnRemote")
+ expectedRMNRemote := cciptypes.UnknownAddress("rmnRemote")
expectedContracts := reader.ContractAddresses{
- consts.ContractNameNonceManager: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameNonceManager: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
dest: expectedNonceManager,
},
consts.ContractNameOnRamp: expectedOnRamp,
- consts.ContractNameRMNRemote: map[cciptypes.ChainSelector][]byte{
+ consts.ContractNameRMNRemote: map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
dest: expectedRMNRemote,
},
consts.ContractNameFeeQuoter: {},
@@ -513,7 +513,7 @@ func TestContractDiscoveryProcessor_Outcome_ErrorSyncingContracts(t *testing.T)
obs := discoverytypes.Observation{
FChain: expectedFChain,
- Addresses: map[string]map[cciptypes.ChainSelector][]byte{
+ Addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
consts.ContractNameOnRamp: expectedOnRamp,
consts.ContractNameNonceManager: {
dest: expectedNonceManager,
@@ -634,35 +634,102 @@ func TestContractDiscoveryProcessor_ValidateObservation_ErrorGettingSupportedCha
}
func TestContractDiscoveryProcessor_ValidateObservation_OracleNotAllowedToObserve(t *testing.T) {
- mockHomeChain := mock_home_chain.NewMockHomeChain(t)
- lggr := logger.Test(t)
dest := cciptypes.ChainSelector(1)
- fRoleDON := 1
- oracleID := commontypes.OracleID(1)
- peerID := ragep2ptypes.PeerID([32]byte{1, 2, 3})
- supportedChains := mapset.NewSet(cciptypes.ChainSelector(2)) // Different chain
- oracleIDToP2PID := map[commontypes.OracleID]ragep2ptypes.PeerID{
- oracleID: peerID,
+ cases := []struct {
+ name string
+ supportedChains []cciptypes.ChainSelector
+ addresses reader.ContractAddresses
+ errStr string
+ }{
+ {
+ name: "no observations no error",
+ },
+ {
+ name: "onramps are only discovered on dest (error)",
+ addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ consts.ContractNameOnRamp: {
+ dest + 1: cciptypes.UnknownAddress("1"),
+ dest + 2: cciptypes.UnknownAddress("2"),
+ dest + 3: cciptypes.UnknownAddress("3"),
+ },
+ },
+ errStr: "oracle 1 is not allowed to observe contract (OnRamp) on the destination chain ChainSelector(1)",
+ },
+ {
+ name: "onramps are only discovered on dest (pass)",
+ supportedChains: []cciptypes.ChainSelector{dest},
+ addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ consts.ContractNameOnRamp: {
+ dest + 1: cciptypes.UnknownAddress("1"),
+ dest + 2: cciptypes.UnknownAddress("2"),
+ dest + 3: cciptypes.UnknownAddress("3"),
+ },
+ },
+ },
+ {
+ name: "FeeQuoter is discovered on the same chain (error)",
+ supportedChains: []cciptypes.ChainSelector{dest},
+ addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ consts.ContractNameFeeQuoter: {
+ dest + 1: cciptypes.UnknownAddress("1"),
+ dest + 2: cciptypes.UnknownAddress("2"),
+ dest + 3: cciptypes.UnknownAddress("3"),
+ },
+ },
+ errStr: "oracle 1 is not allowed to observe chain ChainSelector",
+ },
+ {
+ name: "FeeQuoter is discovered on the same chain (pass)",
+ supportedChains: []cciptypes.ChainSelector{dest + 1, dest + 2, dest + 3},
+ addresses: map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress{
+ consts.ContractNameFeeQuoter: {
+ dest + 1: cciptypes.UnknownAddress("1"),
+ dest + 2: cciptypes.UnknownAddress("2"),
+ dest + 3: cciptypes.UnknownAddress("3"),
+ },
+ },
+ },
}
- mockHomeChain.EXPECT().GetSupportedChainsForPeer(peerID).Return(supportedChains, nil)
- defer mockHomeChain.AssertExpectations(t)
-
- cdp := NewContractDiscoveryProcessor(
- lggr,
- nil, // reader, not needed for this test
- mockHomeChain,
- dest,
- fRoleDON,
- oracleIDToP2PID,
- )
-
- ao := plugincommon.AttributedObservation[discoverytypes.Observation]{
- OracleID: oracleID,
+ for _, tc := range cases {
+ tc := tc
+ t.Run(tc.name, func(t *testing.T) {
+ lggr := logger.Test(t)
+ fRoleDON := 1
+ oracleID := commontypes.OracleID(1)
+ peerID := ragep2ptypes.PeerID([32]byte{1, 2, 3})
+
+ oracleIDToP2PID := map[commontypes.OracleID]ragep2ptypes.PeerID{
+ oracleID: peerID,
+ }
+
+ mockHomeChain := mock_home_chain.NewMockHomeChain(t)
+ mockHomeChain.EXPECT().GetSupportedChainsForPeer(peerID).Return(mapset.NewSet(tc.supportedChains...), nil)
+ defer mockHomeChain.AssertExpectations(t)
+
+ cdp := NewContractDiscoveryProcessor(
+ lggr,
+ nil, // reader, not needed for this test
+ mockHomeChain,
+ dest,
+ fRoleDON,
+ oracleIDToP2PID,
+ )
+
+ ao := plugincommon.AttributedObservation[discoverytypes.Observation]{
+ OracleID: oracleID,
+ Observation: discoverytypes.Observation{
+ Addresses: tc.addresses,
+ },
+ }
+
+ err := cdp.ValidateObservation(discoverytypes.Outcome{}, discoverytypes.Query{}, ao)
+ if tc.errStr == "" {
+ assert.NoError(t, err)
+ } else {
+ assert.ErrorContains(t, err, tc.errStr)
+ }
+ })
}
-
- err := cdp.ValidateObservation(discoverytypes.Outcome{}, discoverytypes.Query{}, ao)
- assert.Error(t, err)
- assert.Contains(t, err.Error(), fmt.Sprintf("oracle %d is not allowed to observe chain %s", ao.OracleID, cdp.dest))
}
diff --git a/mocks/commit/merkleroot/rmn/controller.go b/mocks/commit/merkleroot/rmn/controller.go
index 6c5b5679d..5ee5de4c3 100644
--- a/mocks/commit/merkleroot/rmn/controller.go
+++ b/mocks/commit/merkleroot/rmn/controller.go
@@ -9,6 +9,8 @@ import (
mock "github.com/stretchr/testify/mock"
+ ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
+
rmn "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn"
rmnpb "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/rmnpb"
@@ -135,17 +137,17 @@ func (_c *MockController_ComputeReportSignatures_Call) RunAndReturn(run func(con
return _c
}
-// InitConnection provides a mock function with given fields: ctx, commitConfigDigest, rmnHomeConfigDigest, peerIDs
-func (_m *MockController) InitConnection(ctx context.Context, commitConfigDigest ccipocr3.Bytes32, rmnHomeConfigDigest ccipocr3.Bytes32, peerIDs []string) error {
- ret := _m.Called(ctx, commitConfigDigest, rmnHomeConfigDigest, peerIDs)
+// InitConnection provides a mock function with given fields: ctx, commitConfigDigest, rmnHomeConfigDigest, oraclePeerIDs, rmnNodes
+func (_m *MockController) InitConnection(ctx context.Context, commitConfigDigest ccipocr3.Bytes32, rmnHomeConfigDigest ccipocr3.Bytes32, oraclePeerIDs []ragep2ptypes.PeerID, rmnNodes []types.HomeNodeInfo) error {
+ ret := _m.Called(ctx, commitConfigDigest, rmnHomeConfigDigest, oraclePeerIDs, rmnNodes)
if len(ret) == 0 {
panic("no return value specified for InitConnection")
}
var r0 error
- if rf, ok := ret.Get(0).(func(context.Context, ccipocr3.Bytes32, ccipocr3.Bytes32, []string) error); ok {
- r0 = rf(ctx, commitConfigDigest, rmnHomeConfigDigest, peerIDs)
+ if rf, ok := ret.Get(0).(func(context.Context, ccipocr3.Bytes32, ccipocr3.Bytes32, []ragep2ptypes.PeerID, []types.HomeNodeInfo) error); ok {
+ r0 = rf(ctx, commitConfigDigest, rmnHomeConfigDigest, oraclePeerIDs, rmnNodes)
} else {
r0 = ret.Error(0)
}
@@ -162,14 +164,15 @@ type MockController_InitConnection_Call struct {
// - ctx context.Context
// - commitConfigDigest ccipocr3.Bytes32
// - rmnHomeConfigDigest ccipocr3.Bytes32
-// - peerIDs []string
-func (_e *MockController_Expecter) InitConnection(ctx interface{}, commitConfigDigest interface{}, rmnHomeConfigDigest interface{}, peerIDs interface{}) *MockController_InitConnection_Call {
- return &MockController_InitConnection_Call{Call: _e.mock.On("InitConnection", ctx, commitConfigDigest, rmnHomeConfigDigest, peerIDs)}
+// - oraclePeerIDs []ragep2ptypes.PeerID
+// - rmnNodes []types.HomeNodeInfo
+func (_e *MockController_Expecter) InitConnection(ctx interface{}, commitConfigDigest interface{}, rmnHomeConfigDigest interface{}, oraclePeerIDs interface{}, rmnNodes interface{}) *MockController_InitConnection_Call {
+ return &MockController_InitConnection_Call{Call: _e.mock.On("InitConnection", ctx, commitConfigDigest, rmnHomeConfigDigest, oraclePeerIDs, rmnNodes)}
}
-func (_c *MockController_InitConnection_Call) Run(run func(ctx context.Context, commitConfigDigest ccipocr3.Bytes32, rmnHomeConfigDigest ccipocr3.Bytes32, peerIDs []string)) *MockController_InitConnection_Call {
+func (_c *MockController_InitConnection_Call) Run(run func(ctx context.Context, commitConfigDigest ccipocr3.Bytes32, rmnHomeConfigDigest ccipocr3.Bytes32, oraclePeerIDs []ragep2ptypes.PeerID, rmnNodes []types.HomeNodeInfo)) *MockController_InitConnection_Call {
_c.Call.Run(func(args mock.Arguments) {
- run(args[0].(context.Context), args[1].(ccipocr3.Bytes32), args[2].(ccipocr3.Bytes32), args[3].([]string))
+ run(args[0].(context.Context), args[1].(ccipocr3.Bytes32), args[2].(ccipocr3.Bytes32), args[3].([]ragep2ptypes.PeerID), args[4].([]types.HomeNodeInfo))
})
return _c
}
@@ -179,7 +182,7 @@ func (_c *MockController_InitConnection_Call) Return(_a0 error) *MockController_
return _c
}
-func (_c *MockController_InitConnection_Call) RunAndReturn(run func(context.Context, ccipocr3.Bytes32, ccipocr3.Bytes32, []string) error) *MockController_InitConnection_Call {
+func (_c *MockController_InitConnection_Call) RunAndReturn(run func(context.Context, ccipocr3.Bytes32, ccipocr3.Bytes32, []ragep2ptypes.PeerID, []types.HomeNodeInfo) error) *MockController_InitConnection_Call {
_c.Call.Return(run)
return _c
}
diff --git a/mocks/commit/merkleroot/rmn/peer_group.go b/mocks/pkg/peergroup/peer_group.go
similarity index 99%
rename from mocks/commit/merkleroot/rmn/peer_group.go
rename to mocks/pkg/peergroup/peer_group.go
index 2a7022933..15f13e51c 100644
--- a/mocks/commit/merkleroot/rmn/peer_group.go
+++ b/mocks/pkg/peergroup/peer_group.go
@@ -1,6 +1,6 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
-package rmn
+package peergroup
import (
networking "github.com/smartcontractkit/libocr/networking"
diff --git a/mocks/commit/merkleroot/rmn/peer_group_factory.go b/mocks/pkg/peergroup/peer_group_factory.go
similarity index 99%
rename from mocks/commit/merkleroot/rmn/peer_group_factory.go
rename to mocks/pkg/peergroup/peer_group_factory.go
index 4eb3cc4c2..ef99b06fe 100644
--- a/mocks/commit/merkleroot/rmn/peer_group_factory.go
+++ b/mocks/pkg/peergroup/peer_group_factory.go
@@ -1,6 +1,6 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
-package rmn
+package peergroup
import (
commontypes "github.com/smartcontractkit/libocr/commontypes"
diff --git a/pkg/peergroup/factory.go b/pkg/peergroup/factory.go
new file mode 100644
index 000000000..e5b5fab8e
--- /dev/null
+++ b/pkg/peergroup/factory.go
@@ -0,0 +1,109 @@
+package peergroup
+
+import (
+ "crypto/sha256"
+ "encoding/binary"
+ "fmt"
+
+ "github.com/smartcontractkit/libocr/networking"
+
+ "github.com/smartcontractkit/libocr/commontypes"
+ ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
+ ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
+
+ "github.com/smartcontractkit/chainlink-common/pkg/logger"
+
+ rmntypes "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/types"
+ cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
+)
+
+// Creator handles peer group creation
+type Creator struct {
+ lggr logger.Logger
+ factory PeerGroupFactory
+ bootstrappers []commontypes.BootstrapperLocator
+}
+
+func NewCreator(
+ lggr logger.Logger,
+ factory PeerGroupFactory,
+ bootstrappers []commontypes.BootstrapperLocator,
+) *Creator {
+ return &Creator{
+ lggr: lggr,
+ factory: factory,
+ bootstrappers: bootstrappers,
+ }
+}
+
+// CreateOpts defines options for creating a peer group
+type CreateOpts struct {
+ CommitConfigDigest cciptypes.Bytes32
+ RMNHomeConfigDigest cciptypes.Bytes32
+ OraclePeerIDs []ragep2ptypes.PeerID
+ RMNNodes []rmntypes.HomeNodeInfo
+}
+
+// Result contains the created peer group and its config digest
+type Result struct {
+ PeerGroup PeerGroup
+ ConfigDigest cciptypes.Bytes32
+}
+
+// Create handles peer group creation with the given options
+func (c *Creator) Create(opts CreateOpts) (Result, error) {
+ // Calculate generic endpoint config digest
+ h := sha256.Sum256(append(opts.CommitConfigDigest[:], opts.RMNHomeConfigDigest[:]...))
+ genericEndpointConfigDigest := writePrefix(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo, h)
+
+ // Combine peer IDs
+ peerIDs := make([]string, 0, len(opts.OraclePeerIDs)+len(opts.RMNNodes))
+ for _, p := range opts.OraclePeerIDs {
+ peerIDs = append(peerIDs, p.String())
+ }
+ for _, n := range opts.RMNNodes {
+ peerIDs = append(peerIDs, n.PeerID.String())
+ }
+
+ c.lggr.Infow("Creating new peer group",
+ "genericEndpointConfigDigest", genericEndpointConfigDigest.String(),
+ "peerIDs", peerIDs,
+ "bootstrappers", c.bootstrappers,
+ )
+
+ peerGroup, err := c.factory.NewPeerGroup(
+ [32]byte(genericEndpointConfigDigest),
+ peerIDs,
+ c.bootstrappers,
+ )
+ if err != nil {
+ return Result{}, fmt.Errorf("new peer group: %w", err)
+ }
+
+ c.lggr.Infow("Created new peer group successfully")
+
+ return Result{
+ PeerGroup: peerGroup,
+ ConfigDigest: genericEndpointConfigDigest,
+ }, nil
+}
+
+// writePrefix is kept in the same package for direct use by the Creator
+func writePrefix(prefix ocr2types.ConfigDigestPrefix, hash cciptypes.Bytes32) cciptypes.Bytes32 {
+ var prefixBytes [2]byte
+ binary.BigEndian.PutUint16(prefixBytes[:], uint16(prefix))
+
+ hCopy := hash
+ hCopy[0] = prefixBytes[0]
+ hCopy[1] = prefixBytes[1]
+
+ return hCopy
+}
+
+type PeerGroupFactory interface {
+ networking.PeerGroupFactory
+}
+
+type PeerGroup interface {
+ networking.PeerGroup
+}
diff --git a/pkg/peergroup/factory_test.go b/pkg/peergroup/factory_test.go
new file mode 100644
index 000000000..b0fce8d0d
--- /dev/null
+++ b/pkg/peergroup/factory_test.go
@@ -0,0 +1,239 @@
+package peergroup
+
+import (
+ "encoding/hex"
+ "fmt"
+ "testing"
+
+ "github.com/smartcontractkit/libocr/commontypes"
+ ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
+ ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+
+ "github.com/smartcontractkit/chainlink-common/pkg/logger"
+
+ rmntypes "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/types"
+ cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
+
+ // pgmocks "github.com/smartcontractkit/chainlink-ccip/mocks/commit/merkleroot/rmn"
+ "github.com/smartcontractkit/libocr/networking"
+
+ pgfactorymocks "github.com/smartcontractkit/chainlink-ccip/mocks/pkg/peergroup"
+)
+
+type mockPeerGroup struct {
+ mock.Mock
+}
+
+func (m *mockPeerGroup) NewStream(
+ remotePeerID string,
+ newStreamArgs networking.NewStreamArgs) (networking.Stream, error) {
+ args := m.Called(remotePeerID, newStreamArgs)
+ return args.Get(0).(networking.Stream), args.Error(1)
+}
+
+func (m *mockPeerGroup) Close() error {
+ return nil
+}
+
+func Test_writePrefix(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ prefix ocr2types.ConfigDigestPrefix
+ hash cciptypes.Bytes32
+ wantFirst byte
+ wantNext byte
+ }{
+ {
+ name: "zero hash",
+ prefix: ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo,
+ hash: cciptypes.Bytes32{},
+ wantFirst: byte(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo >> 8),
+ wantNext: byte(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo),
+ },
+ {
+ name: "non-zero hash",
+ prefix: ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo,
+ hash: cciptypes.Bytes32{0xFF, 0xFF, 0x3},
+ wantFirst: byte(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo >> 8),
+ wantNext: byte(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo),
+ },
+ }
+
+ for _, tt := range tests {
+ tt := tt
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ result := writePrefix(tt.prefix, tt.hash)
+
+ assert.Equal(t, tt.wantFirst, result[0], "first byte should match prefix")
+ assert.Equal(t, tt.wantNext, result[1], "second byte should match prefix")
+
+ // Rest of hash should match input except first two bytes
+ for i := 2; i < len(tt.hash); i++ {
+ assert.Equal(t, tt.hash[i], result[i], "remaining bytes should be unchanged")
+ }
+ })
+ }
+}
+
+func TestCreator_Create(t *testing.T) {
+ t.Parallel()
+
+ oracle1 := peerIDFromStr(t, "1")
+ oracle2 := peerIDFromStr(t, "2")
+ rmn1 := peerIDFromStr(t, "3")
+ rmn2 := peerIDFromStr(t, "4")
+
+ bootstrappers := []commontypes.BootstrapperLocator{{PeerID: "bootstrap1"}}
+ mockPG := &mockPeerGroup{}
+
+ tests := []struct {
+ name string
+ opts CreateOpts
+ expectedPeers []string
+ setupMocks func(*pgfactorymocks.MockPeerGroupFactory)
+ wantErr bool
+ }{
+ {
+ name: "success with both oracle and RMN peers",
+ opts: CreateOpts{
+ CommitConfigDigest: cciptypes.Bytes32{0x1},
+ RMNHomeConfigDigest: cciptypes.Bytes32{0x2},
+ OraclePeerIDs: []ragep2ptypes.PeerID{
+ oracle1,
+ oracle2,
+ },
+ RMNNodes: []rmntypes.HomeNodeInfo{
+ {ID: 1, PeerID: rmn1},
+ {ID: 2, PeerID: rmn2},
+ },
+ },
+ expectedPeers: []string{"oracle1", "oracle2", "rmn1", "rmn2"},
+ setupMocks: func(m *pgfactorymocks.MockPeerGroupFactory) {
+ m.EXPECT().NewPeerGroup(
+ mock.Anything,
+ mock.Anything,
+ mock.Anything,
+ ).Return(mockPG, nil)
+ },
+ },
+ {
+ name: "success with only oracle peers",
+ opts: CreateOpts{
+ CommitConfigDigest: cciptypes.Bytes32{0x1},
+ RMNHomeConfigDigest: cciptypes.Bytes32{0x2},
+ OraclePeerIDs: []ragep2ptypes.PeerID{
+ oracle1,
+ },
+ },
+ expectedPeers: []string{"oracle1"},
+ setupMocks: func(m *pgfactorymocks.MockPeerGroupFactory) {
+ m.EXPECT().NewPeerGroup(
+ mock.Anything,
+ mock.Anything,
+ mock.Anything,
+ ).Return(mockPG, nil)
+ },
+ },
+ {
+ name: "success with empty peer lists",
+ opts: CreateOpts{
+ CommitConfigDigest: cciptypes.Bytes32{0x1},
+ RMNHomeConfigDigest: cciptypes.Bytes32{0x2},
+ },
+ expectedPeers: []string{},
+ setupMocks: func(m *pgfactorymocks.MockPeerGroupFactory) {
+ m.EXPECT().NewPeerGroup(
+ mock.MatchedBy(func(digest ocr2types.ConfigDigest) bool {
+ return digest[0] == byte(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo>>8) &&
+ digest[1] == byte(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo)
+ }),
+ []string{},
+ bootstrappers,
+ ).Return(mockPG, nil)
+ },
+ },
+ {
+ name: "factory returns error",
+ opts: CreateOpts{
+ CommitConfigDigest: cciptypes.Bytes32{0x1},
+ RMNHomeConfigDigest: cciptypes.Bytes32{0x2},
+ },
+ setupMocks: func(m *pgfactorymocks.MockPeerGroupFactory) {
+ m.EXPECT().NewPeerGroup(
+ mock.Anything,
+ mock.Anything,
+ mock.Anything,
+ ).Return(nil, assert.AnError)
+ },
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range tests {
+ tt := tt
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ factory := pgfactorymocks.NewMockPeerGroupFactory(t)
+ tt.setupMocks(factory)
+
+ creator := NewCreator(logger.Nop(), factory, bootstrappers)
+
+ result, err := creator.Create(tt.opts)
+
+ if tt.wantErr {
+ require.Error(t, err)
+ return
+ }
+
+ require.NoError(t, err)
+ assert.NotNil(t, result.PeerGroup)
+ assert.NotEmpty(t, result.ConfigDigest)
+
+ // Config digest should have correct prefix
+ assert.Equal(t,
+ byte(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo>>8),
+ result.ConfigDigest[0],
+ "config digest should have correct prefix first byte",
+ )
+ assert.Equal(t,
+ byte(ocr2types.ConfigDigestPrefixCCIPMultiRoleRMNCombo),
+ result.ConfigDigest[1],
+ "config digest should have correct prefix second byte",
+ )
+ })
+ }
+}
+
+func TestNewCreator(t *testing.T) {
+ t.Parallel()
+
+ lggr := logger.Nop()
+ factory := pgfactorymocks.NewMockPeerGroupFactory(t)
+ bootstrappers := []commontypes.BootstrapperLocator{{PeerID: "bootstrap1"}}
+
+ creator := NewCreator(lggr, factory, bootstrappers)
+
+ assert.NotNil(t, creator)
+ assert.Equal(t, factory, creator.factory)
+ assert.Equal(t, bootstrappers, creator.bootstrappers)
+ assert.NotNil(t, creator.lggr)
+}
+
+func peerIDFromStr(t *testing.T, s string) ragep2ptypes.PeerID {
+ t.Helper()
+ // Pad with zeros to make it 32 bytes
+ padded := fmt.Sprintf("%064s", s)
+ bytes, err := hex.DecodeString(padded)
+ require.NoError(t, err)
+ var result [32]byte
+ copy(result[:], bytes)
+ return ragep2ptypes.PeerID(result)
+}
diff --git a/pkg/reader/ccip.go b/pkg/reader/ccip.go
index 46f8d6483..fb3eddbc1 100644
--- a/pkg/reader/ccip.go
+++ b/pkg/reader/ccip.go
@@ -280,6 +280,56 @@ func (r *ccipChainReader) ExecutedMessageRanges(
return executed, nil
}
+// Temporary struct to properly deserialize cciptypes.Message before we have support for cciptypes.BigInt
+type ccipMessageTokenAmount struct {
+ SourcePoolAddress cciptypes.UnknownAddress
+ DestTokenAddress cciptypes.UnknownAddress
+ ExtraData cciptypes.Bytes
+ Amount *big.Int
+ DestExecData cciptypes.Bytes
+}
+
+func (t *ccipMessageTokenAmount) ToOnRampToken() cciptypes.RampTokenAmount {
+ return cciptypes.RampTokenAmount{
+ SourcePoolAddress: t.SourcePoolAddress,
+ DestTokenAddress: t.DestTokenAddress,
+ ExtraData: t.ExtraData,
+ Amount: cciptypes.NewBigInt(t.Amount),
+ DestExecData: t.DestExecData,
+ }
+}
+
+type ccipMessage struct {
+ Header cciptypes.RampMessageHeader
+ Sender cciptypes.UnknownAddress
+ Data cciptypes.Bytes
+ Receiver cciptypes.UnknownAddress
+ ExtraArgs cciptypes.Bytes
+ FeeToken cciptypes.UnknownAddress
+ FeeTokenAmount *big.Int
+ FeeValueJuels *big.Int
+ TokenAmounts []ccipMessageTokenAmount
+}
+
+func (m *ccipMessage) ToMessage() cciptypes.Message {
+ tk := make([]cciptypes.RampTokenAmount, len(m.TokenAmounts))
+ for i := range m.TokenAmounts {
+ tk[i] = m.TokenAmounts[i].ToOnRampToken()
+ }
+
+ return cciptypes.Message{
+ Header: m.Header,
+ Sender: m.Sender,
+ Data: m.Data,
+ Receiver: m.Receiver,
+ ExtraArgs: m.ExtraArgs,
+ FeeToken: m.FeeToken,
+ FeeTokenAmount: cciptypes.NewBigInt(m.FeeTokenAmount),
+ FeeValueJuels: cciptypes.NewBigInt(m.FeeValueJuels),
+ TokenAmounts: tk,
+ }
+}
+
func (r *ccipChainReader) MsgsBetweenSeqNums(
ctx context.Context, sourceChainSelector cciptypes.ChainSelector, seqNumRange cciptypes.SeqNumRange,
) ([]cciptypes.Message, error) {
@@ -294,7 +344,7 @@ func (r *ccipChainReader) MsgsBetweenSeqNums(
type SendRequestedEvent struct {
DestChainSelector cciptypes.ChainSelector
- Message cciptypes.Message
+ Message ccipMessage
}
seq, err := r.contractReaders[sourceChainSelector].ExtendedQueryKey(
@@ -339,7 +389,7 @@ func (r *ccipChainReader) MsgsBetweenSeqNums(
msg.Message.Header.OnRamp = onRampAddress
if valid {
- msgs = append(msgs, msg.Message)
+ msgs = append(msgs, msg.Message.ToMessage())
}
}
diff --git a/pkg/reader/ccip_interface.go b/pkg/reader/ccip_interface.go
index 7b22128b4..c575aba79 100644
--- a/pkg/reader/ccip_interface.go
+++ b/pkg/reader/ccip_interface.go
@@ -23,7 +23,7 @@ var (
// ContractAddresses is a map of contract names across all chain selectors and their address.
// Currently only one contract per chain per name is supported.
-type ContractAddresses map[string]map[cciptypes.ChainSelector][]byte
+type ContractAddresses map[string]map[cciptypes.ChainSelector]cciptypes.UnknownAddress
func (ca ContractAddresses) Append(contract string, chain cciptypes.ChainSelector, address []byte) ContractAddresses {
resp := ca
@@ -31,7 +31,7 @@ func (ca ContractAddresses) Append(contract string, chain cciptypes.ChainSelecto
resp = make(ContractAddresses)
}
if resp[contract] == nil {
- resp[contract] = make(map[cciptypes.ChainSelector][]byte)
+ resp[contract] = make(map[cciptypes.ChainSelector]cciptypes.UnknownAddress)
}
resp[contract][chain] = address
return resp