Skip to content

Commit

Permalink
RMN - minor code improvements (#289)
Browse files Browse the repository at this point in the history
* stream names and updates processing refactoring

* rm unused GroupBy method

---------

Co-authored-by: Makram <[email protected]>
  • Loading branch information
dimkouv and makramkd authored Nov 1, 2024
1 parent d88cf46 commit 4b133b0
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 104 deletions.
62 changes: 38 additions & 24 deletions commit/merkleroot/rmn/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,37 +145,22 @@ func (c *controller) ComputeReportSignatures(
updateRequests []*rmnpb.FixedDestLaneUpdateRequest,
rmnRemoteCfg rmntypes.RemoteConfig,
) (*ReportSignatures, error) {
rmnNodeInfo := make(map[rmntypes.NodeID]rmntypes.HomeNodeInfo)

rmnNodes, err := c.rmnHomeReader.GetRMNNodesInfo(rmnRemoteCfg.ConfigDigest)
if err != nil {
return nil, fmt.Errorf("get rmn nodes info: %w", err)
}

c.lggr.Infow("got RMN nodes info", "nodes", rmnNodes)
c.lggr.Infow("requested updates", "updates", updateRequests)

// Group the lane update requests by their source chain and mark the RMN nodes that can sign each update
// based on whether it supports the source chain or not.
updatesPerChain := make(map[uint64]updateRequestWithMeta)
for _, updateReq := range updateRequests {
if _, exists := updatesPerChain[updateReq.LaneSource.SourceChainSelector]; exists {
return nil, errors.New("controller implementation assumes each lane update is for a different chain")
}

updatesPerChain[updateReq.LaneSource.SourceChainSelector] = updateRequestWithMeta{
Data: updateReq,
RmnNodes: mapset.NewSet[rmntypes.NodeID](),
}
rmnNodeInfo := make(map[rmntypes.NodeID]rmntypes.HomeNodeInfo, len(rmnNodes))
for _, node := range rmnNodes {
rmnNodeInfo[node.ID] = node
}

for _, node := range rmnNodes {
rmnNodeInfo[node.ID] = node
c.lggr.Infow("got RMN nodes info", "nodes", rmnNodeInfo)
c.lggr.Infow("requested updates", "updates", updateRequests)

// If RMN node supports the chain add it to the list of RMN nodes that can sign the update.
if node.SupportedSourceChains.Contains(cciptypes.ChainSelector(updateReq.LaneSource.SourceChainSelector)) {
updatesPerChain[updateReq.LaneSource.SourceChainSelector].RmnNodes.Add(node.ID)
}
}
updatesPerChain, err := populateUpdatesPerChain(updateRequests, rmnNodes)
if err != nil {
return nil, fmt.Errorf("process update requests: %w", err)
}

homeFMap, err := c.rmnHomeReader.GetF(rmnRemoteCfg.ConfigDigest)
Expand Down Expand Up @@ -240,6 +225,35 @@ func (c *controller) ComputeReportSignatures(
return rmnReportSignatures, nil
}

// populateUpdatesPerChain processes a list of update requests, groups the lane updates by their source chain
// and populates the items with metadata for each update request, including a set of RMN nodes supporting that request.
func populateUpdatesPerChain(
updateRequests []*rmnpb.FixedDestLaneUpdateRequest,
rmnNodes []rmntypes.HomeNodeInfo,
) (map[uint64]updateRequestWithMeta, error) {
updatesPerChain := make(map[uint64]updateRequestWithMeta)

for _, updateReq := range updateRequests {
if _, exists := updatesPerChain[updateReq.LaneSource.SourceChainSelector]; exists {
return nil, errors.New("controller implementation assumes each lane update is for a different chain")
}

updatesPerChain[updateReq.LaneSource.SourceChainSelector] = updateRequestWithMeta{
Data: updateReq,
RmnNodes: mapset.NewSet[rmntypes.NodeID](),
}

for _, node := range rmnNodes {
// If RMN node supports the chain add it to the list of RMN nodes that can sign the update.
if node.SupportedSourceChains.Contains(cciptypes.ChainSelector(updateReq.LaneSource.SourceChainSelector)) {
updatesPerChain[updateReq.LaneSource.SourceChainSelector].RmnNodes.Add(node.ID)
}
}
}

return updatesPerChain, nil
}

func (c *controller) InitConnection(
ctx context.Context,
commitConfigDigest cciptypes.Bytes32,
Expand Down
117 changes: 117 additions & 0 deletions commit/merkleroot/rmn/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ed25519"
"crypto/sha256"
"errors"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -287,6 +288,122 @@ func Test_selectRoots(t *testing.T) {
}
}

func Test_populateUpdatesPerChain(t *testing.T) {
testCases := []struct {
name string
updateRequests []*rmnpb.FixedDestLaneUpdateRequest
rmnNodes []rmntypes.HomeNodeInfo
rmnNodeInfo map[rmntypes.NodeID]rmntypes.HomeNodeInfo
expectedResult map[uint64]updateRequestWithMeta
expectedError error
}{
{
name: "single update request, single supported node",
updateRequests: []*rmnpb.FixedDestLaneUpdateRequest{
{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}},
},
rmnNodes: []rmntypes.HomeNodeInfo{
{
ID: 1,
SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1)),
},
},
expectedResult: map[uint64]updateRequestWithMeta{
1: {
Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}},
RmnNodes: mapset.NewSet[rmntypes.NodeID](rmntypes.NodeID(1)),
},
},
expectedError: nil,
},
{
name: "duplicate sourceChainSelector error",
updateRequests: []*rmnpb.FixedDestLaneUpdateRequest{
{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}},
{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}},
},
rmnNodes: []rmntypes.HomeNodeInfo{
{
ID: 1,
SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1)),
},
},
rmnNodeInfo: map[rmntypes.NodeID]rmntypes.HomeNodeInfo{},
expectedError: errors.New("controller implementation assumes each lane update is for a different chain"),
},
{
name: "Single Update Request, No Supported Nodes",
updateRequests: []*rmnpb.FixedDestLaneUpdateRequest{
{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}},
},
rmnNodes: []rmntypes.HomeNodeInfo{
{
ID: 2,
SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(2)),
},
},
expectedResult: map[uint64]updateRequestWithMeta{
1: {
Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}},
RmnNodes: mapset.NewSet[rmntypes.NodeID](),
},
},
expectedError: nil,
},
{
name: "multiple update requests, multiple nodes",
updateRequests: []*rmnpb.FixedDestLaneUpdateRequest{
{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}},
{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 2}},
{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 3}},
},
rmnNodes: []rmntypes.HomeNodeInfo{
{ID: 1, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1))},
{ID: 2, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1))},
{ID: 3, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(2))},
{ID: 4, SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](cciptypes.ChainSelector(1))},
},
expectedResult: map[uint64]updateRequestWithMeta{
1: {
Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 1}},
RmnNodes: mapset.NewSet[rmntypes.NodeID](rmntypes.NodeID(1), rmntypes.NodeID(2), rmntypes.NodeID(4)),
},
2: {
Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 2}},
RmnNodes: mapset.NewSet[rmntypes.NodeID](rmntypes.NodeID(3)),
},
3: {
Data: &rmnpb.FixedDestLaneUpdateRequest{LaneSource: &rmnpb.LaneSource{SourceChainSelector: 3}},
RmnNodes: mapset.NewSet[rmntypes.NodeID](),
},
},
expectedError: nil,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Run the function
result, err := populateUpdatesPerChain(tt.updateRequests, tt.rmnNodes)

// Check for expected error
if tt.expectedError != nil {
assert.EqualError(t, err, tt.expectedError.Error())
} else {
assert.NoError(t, err)
}

// Check for expected result
assert.Equal(t, tt.expectedResult, result)

// Check if rmnNodeInfo was populated correctly
for id, info := range tt.rmnNodeInfo {
assert.Equal(t, info, tt.rmnNodeInfo[id])
}
})
}
}

func TestClient_ComputeReportSignatures(t *testing.T) {
newTestSetup := func(t *testing.T) testSetup {
lggr := logger.Test(t)
Expand Down
5 changes: 1 addition & 4 deletions commit/merkleroot/rmn/peerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,7 @@ func (r *peerClient) getOrCreateRageP2PStream(rmnNode rmntypes.HomeNodeInfo) (St
}

rmnPeerID := rmnNode.PeerID.String()

// todo: versioning for stream names e.g. for 'v1_7'
streamName := fmt.Sprintf("ccip-rmn/v1_6/%s",
strings.TrimPrefix(r.genericEndpointConfigDigest.String(), "0x"))
streamName := rmnNode.StreamNamePrefix + strings.TrimPrefix(r.genericEndpointConfigDigest.String(), "0x")

r.lggr.Infow("creating new stream",
"streamName", streamName,
Expand Down
1 change: 1 addition & 0 deletions commit/merkleroot/rmn/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type HomeNodeInfo struct {
PeerID ragep2ptypes.PeerID // The peer ID of the node
SupportedSourceChains mapset.Set[cciptypes.ChainSelector] // Set of supported source chains by the node
OffchainPublicKey *ed25519.PublicKey // The public key is used to verify observations
StreamNamePrefix string // RageP2P stream name prefix e.g. "ccip-rmn/v1_6/"
}

// RemoteConfig contains the configuration fetched from the RMNRemote contract.
Expand Down
14 changes: 0 additions & 14 deletions internal/libs/slicelib/generic.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,5 @@
package slicelib

// GroupBy groups a slice based on a specific item property. The returned groups slice is deterministic.
func GroupBy[T any, K comparable](items []T, prop func(T) K) ([]K, map[K][]T) {
groups := make([]K, 0)
grouped := make(map[K][]T)
for _, item := range items {
k := prop(item)
if _, exists := grouped[k]; !exists {
groups = append(groups, k)
}
grouped[k] = append(grouped[k], item)
}
return groups, grouped
}

// CountUnique counts the unique items of the provided slice.
func CountUnique[T comparable](items []T) int {
m := make(map[T]struct{})
Expand Down
62 changes: 0 additions & 62 deletions internal/libs/slicelib/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,68 +6,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestGroupBy(t *testing.T) {
type person struct {
id string
name string
age int
}

testCases := []struct {
name string
items []person
expGroupNames []string
expGroups map[string][]person
}{
{
name: "empty slice",
items: []person{},
expGroupNames: []string{},
expGroups: map[string][]person{},
},
{
name: "no duplicate",
items: []person{
{id: "2", name: "Bob", age: 25},
{id: "1", name: "Alice", age: 23},
{id: "3", name: "Charlie", age: 22},
{id: "4", name: "Dim", age: 13},
},
expGroupNames: []string{"2", "1", "3", "4"}, // should be deterministic
expGroups: map[string][]person{
"1": {{id: "1", name: "Alice", age: 23}},
"2": {{id: "2", name: "Bob", age: 25}},
"3": {{id: "3", name: "Charlie", age: 22}},
"4": {{id: "4", name: "Dim", age: 13}},
},
},
{
name: "with duplicate",
items: []person{
{id: "1", name: "Alice", age: 23},
{id: "1", name: "Bob", age: 25},
{id: "3", name: "Charlie", age: 22},
},
expGroupNames: []string{"1", "3"},
expGroups: map[string][]person{
"1": {{id: "1", name: "Alice", age: 23}, {id: "1", name: "Bob", age: 25}},
"3": {{id: "3", name: "Charlie", age: 22}},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
keys, groups := GroupBy(tc.items, func(p person) string { return p.id })
assert.Equal(t, tc.expGroupNames, keys)
assert.Equal(t, len(tc.expGroups), len(groups))
for _, k := range keys {
assert.Equal(t, tc.expGroups[k], groups[k])
}
})
}
}

func TestCountUnique(t *testing.T) {
testCases := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions pkg/reader/rmn_home.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func convertOnChainConfigToRMNHomeChainConfig(
PeerID: ragep2ptypes.PeerID(node.PeerID),
OffchainPublicKey: &pubKey,
SupportedSourceChains: mapset.NewSet[cciptypes.ChainSelector](),
StreamNamePrefix: "ccip-rmn/v1_6/", // todo: when contract is updated, this should be fetched from the contract
}
}

Expand Down

0 comments on commit 4b133b0

Please sign in to comment.