Skip to content

Commit

Permalink
RMN Change initial observations logic (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkouv authored Oct 30, 2024
1 parent 0caf764 commit 7fb015c
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions commit/merkleroot/rmn/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,25 +263,36 @@ func (c *controller) getRmnSignedObservations(
requestedNodes := make(map[uint64]mapset.Set[rmntypes.NodeID]) // sourceChain -> requested rmnNodeIDs
requestsPerNode := make(map[rmntypes.NodeID][]*rmnpb.FixedDestLaneUpdateRequest) // grouped requests for each node

// For each lane update request send an observation request to at most 'F+1' number of rmn nodes.
// At this point we can safely assume that we have at least F+1 supporting each source chain.
for sourceChain, updateRequest := range updateRequestsPerChain {
requestedNodes[sourceChain] = mapset.NewSet[rmntypes.NodeID]()
homeChainF, exist := homeFMap[cciptypes.ChainSelector(sourceChain)]
if !exist {
return nil, fmt.Errorf("no home F for chain %d", sourceChain)
// Send to every RMN node all the lane update requests it supports until all chains have a sufficient amount.
// of initial observers. Upon timer expiration, additional requests are sent to the rest of the RMN nodes.

chainsWithEnoughRequests := mapset.NewSet[uint64]()
for nodeID := range rmnNodeInfo {
if chainsWithEnoughRequests.Cardinality() == len(updateRequestsPerChain) {
break // We have enough initial observers for all source chains.
}

for nodeID := range updateRequest.RmnNodes.Iter() {
if consensus.Threshold(requestedNodes[sourceChain].Cardinality()) >= consensus.FPlus1(homeChainF) {
break // We have enough initial observers for this source chain.
for sourceChain, updateRequest := range updateRequestsPerChain {
// if this node cannot support the source chain, skip it
if !updateRequest.RmnNodes.Contains(nodeID) {
continue
}

requestedNodes[sourceChain].Add(nodeID)
if _, exists := requestsPerNode[nodeID]; !exists {
requestsPerNode[nodeID] = make([]*rmnpb.FixedDestLaneUpdateRequest, 0)
// add the node as a requested observer for this source chain
if _, ok := requestedNodes[sourceChain]; !ok {
requestedNodes[sourceChain] = mapset.NewSet[rmntypes.NodeID]()
}
requestedNodes[sourceChain].Add(nodeID)
requestsPerNode[nodeID] = append(requestsPerNode[nodeID], updateRequest.Data)

// if we already have enough requests for this source chain, mark it
homeChainF, exist := homeFMap[cciptypes.ChainSelector(sourceChain)]
if !exist {
return nil, fmt.Errorf("no home F for chain %d", sourceChain)
}
if consensus.Threshold(requestedNodes[sourceChain].Cardinality()) >= consensus.FPlus1(homeChainF) {
chainsWithEnoughRequests.Add(sourceChain)
}
}
}

Expand Down

0 comments on commit 7fb015c

Please sign in to comment.