From 7fb015cb657e90e29a1b7a9a7f0b182251f590f5 Mon Sep 17 00:00:00 2001 From: dimitris Date: Wed, 30 Oct 2024 17:48:23 +0200 Subject: [PATCH] RMN Change initial observations logic (#279) --- commit/merkleroot/rmn/controller.go | 37 +++++++++++++++++++---------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/commit/merkleroot/rmn/controller.go b/commit/merkleroot/rmn/controller.go index 9a7688c06..46d4f85a9 100644 --- a/commit/merkleroot/rmn/controller.go +++ b/commit/merkleroot/rmn/controller.go @@ -263,25 +263,36 @@ func (c *controller) getRmnSignedObservations( requestedNodes := make(map[uint64]mapset.Set[rmntypes.NodeID]) // sourceChain -> requested rmnNodeIDs requestsPerNode := make(map[rmntypes.NodeID][]*rmnpb.FixedDestLaneUpdateRequest) // grouped requests for each node - // For each lane update request send an observation request to at most 'F+1' number of rmn nodes. - // At this point we can safely assume that we have at least F+1 supporting each source chain. - for sourceChain, updateRequest := range updateRequestsPerChain { - requestedNodes[sourceChain] = mapset.NewSet[rmntypes.NodeID]() - homeChainF, exist := homeFMap[cciptypes.ChainSelector(sourceChain)] - if !exist { - return nil, fmt.Errorf("no home F for chain %d", sourceChain) + // Send to every RMN node all the lane update requests it supports until all chains have a sufficient amount. + // of initial observers. Upon timer expiration, additional requests are sent to the rest of the RMN nodes. + + chainsWithEnoughRequests := mapset.NewSet[uint64]() + for nodeID := range rmnNodeInfo { + if chainsWithEnoughRequests.Cardinality() == len(updateRequestsPerChain) { + break // We have enough initial observers for all source chains. } - for nodeID := range updateRequest.RmnNodes.Iter() { - if consensus.Threshold(requestedNodes[sourceChain].Cardinality()) >= consensus.FPlus1(homeChainF) { - break // We have enough initial observers for this source chain. + for sourceChain, updateRequest := range updateRequestsPerChain { + // if this node cannot support the source chain, skip it + if !updateRequest.RmnNodes.Contains(nodeID) { + continue } - requestedNodes[sourceChain].Add(nodeID) - if _, exists := requestsPerNode[nodeID]; !exists { - requestsPerNode[nodeID] = make([]*rmnpb.FixedDestLaneUpdateRequest, 0) + // add the node as a requested observer for this source chain + if _, ok := requestedNodes[sourceChain]; !ok { + requestedNodes[sourceChain] = mapset.NewSet[rmntypes.NodeID]() } + requestedNodes[sourceChain].Add(nodeID) requestsPerNode[nodeID] = append(requestsPerNode[nodeID], updateRequest.Data) + + // if we already have enough requests for this source chain, mark it + homeChainF, exist := homeFMap[cciptypes.ChainSelector(sourceChain)] + if !exist { + return nil, fmt.Errorf("no home F for chain %d", sourceChain) + } + if consensus.Threshold(requestedNodes[sourceChain].Cardinality()) >= consensus.FPlus1(homeChainF) { + chainsWithEnoughRequests.Add(sourceChain) + } } }