Skip to content

Commit

Permalink
Improve RMN stream configuration (#277)
Browse files Browse the repository at this point in the history
* rmn stream config
* lint
* add comments on buffer sizes
  • Loading branch information
dimkouv authored Oct 30, 2024
1 parent 528ecac commit 0caf764
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 18 deletions.
19 changes: 1 addition & 18 deletions commit/merkleroot/rmn/peerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/networking"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/libocr/ragep2p"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

Expand Down Expand Up @@ -169,23 +168,7 @@ func (r *peerClient) getOrCreateRageP2PStream(rmnNode rmntypes.HomeNodeInfo) (St
)

var err error
stream, err = r.peerGroup.NewStream(
rmnPeerID,
networking.NewStreamArgs1{ // todo: make it configurable
StreamName: streamName,
OutgoingBufferSize: 1,
IncomingBufferSize: 1,
MaxMessageLength: 4_194_304, // 4MB
MessagesLimit: ragep2p.TokenBucketParams{
Rate: 50,
Capacity: 200,
},
BytesLimit: ragep2p.TokenBucketParams{
Rate: 20_971_520, // 20MB
Capacity: 104_857_600, // 100MB
},
},
)
stream, err = r.peerGroup.NewStream(rmnPeerID, newStreamConfig(r.lggr, streamName))
if err != nil {
return nil, fmt.Errorf("new stream %s: %w", streamName, err)
}
Expand Down
164 changes: 164 additions & 0 deletions commit/merkleroot/rmn/streamconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package rmn

import (
"math"
"time"

"github.com/smartcontractkit/libocr/networking"
"github.com/smartcontractkit/libocr/ragep2p"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/rmnpb"
)

const (
// estimatedMaxNumberOfSourceChains is the estimated maximum number of source chains
// that the current stream configuration supports and can be increased if required.
// This value does not correlate to the maximum number of source chains that CCIP can support.
estimatedMaxNumberOfSourceChains = 500

// initialObservationRequest + observationRequestWithOtherSourcesAfterTimeout + reportSignatureRequest
maxNumOfMsgsPerRound = 3

// values below chosen by research team
rateScale = 1.2
capacityScale = 3

// bufferSize should be set to 1 as advised by the RMN team.
outgoingBufferSize = 1
incomingBufferSize = 1

estimatedRoundInterval = time.Second
)

var (
maxObservationRequestBytes int
maxReportSigRequestBytes int
)

func newStreamConfig(
lggr logger.Logger,
streamName string,
) networking.NewStreamArgs1 {
cfg := networking.NewStreamArgs1{
StreamName: streamName,
OutgoingBufferSize: outgoingBufferSize,
IncomingBufferSize: incomingBufferSize,
MaxMessageLength: maxMessageLength(),
MessagesLimit: messagesLimit(),
BytesLimit: bytesLimit(),
}

lggr.Infow("new stream config",
"streamName", streamName,
"cfg", cfg,
"maxObservationRequestBytes", maxObservationRequestBytes,
"maxReportSigRequestBytes", maxReportSigRequestBytes,
)

return cfg
}

func maxMessageLength() int {
return max(
maxObservationRequestBytes,
maxReportSigRequestBytes,
)
}

func messagesLimit() ragep2p.TokenBucketParams {
return ragep2p.TokenBucketParams{
Rate: rateScale * (float64(maxNumOfMsgsPerRound) / estimatedRoundInterval.Seconds()),
Capacity: maxNumOfMsgsPerRound * capacityScale,
}
}

func bytesLimit() ragep2p.TokenBucketParams {
maxSumLenOutboundPerRound := (2 * maxObservationRequestBytes) + maxReportSigRequestBytes

return ragep2p.TokenBucketParams{
Rate: (float64(maxSumLenOutboundPerRound) / estimatedRoundInterval.Seconds()) * rateScale,
Capacity: uint32(maxSumLenOutboundPerRound) * capacityScale,
}
}

// compute max observation request size and max report signatures request size
func init() {
req := &rmnpb.Request{
Request: &rmnpb.Request_ObservationRequest{
ObservationRequest: &rmnpb.ObservationRequest{
LaneDest: &rmnpb.LaneDest{
DestChainSelector: math.MaxUint64,
OfframpAddress: make([]byte, 32),
},
FixedDestLaneUpdateRequests: make([]*rmnpb.FixedDestLaneUpdateRequest, 0, estimatedMaxNumberOfSourceChains),
},
},
}
for i := 0; i < estimatedMaxNumberOfSourceChains; i++ {
req.GetObservationRequest().FixedDestLaneUpdateRequests = append(
req.GetObservationRequest().FixedDestLaneUpdateRequests, &rmnpb.FixedDestLaneUpdateRequest{
LaneSource: &rmnpb.LaneSource{
SourceChainSelector: math.MaxUint64,
OnrampAddress: make([]byte, 32),
},
ClosedInterval: &rmnpb.ClosedInterval{
MinMsgNr: math.MaxUint64,
MaxMsgNr: math.MaxUint64,
},
},
)
}
reqBytes, err := proto.Marshal(req)
if err != nil {
panic(err)
}
maxObservationRequestBytes = len(reqBytes)

req = &rmnpb.Request{
Request: &rmnpb.Request_ReportSignatureRequest{
ReportSignatureRequest: &rmnpb.ReportSignatureRequest{
Context: &rmnpb.ReportContext{},
AttributedSignedObservations: make([]*rmnpb.AttributedSignedObservation, 0, estimatedMaxNumberOfSourceChains),
},
},
}
fixedDestLaneUpdates := make([]*rmnpb.FixedDestLaneUpdate, 0, estimatedMaxNumberOfSourceChains)
for i := 0; i < estimatedMaxNumberOfSourceChains; i++ {
fixedDestLaneUpdates = append(fixedDestLaneUpdates, &rmnpb.FixedDestLaneUpdate{
LaneSource: &rmnpb.LaneSource{
SourceChainSelector: math.MaxUint64,
OnrampAddress: make([]byte, 32),
},
ClosedInterval: &rmnpb.ClosedInterval{MinMsgNr: math.MaxUint64, MaxMsgNr: math.MaxUint64},
Root: make([]byte, 32),
})
}

for i := 0; i < estimatedMaxNumberOfSourceChains; i++ {
req.GetReportSignatureRequest().AttributedSignedObservations = append(
req.GetReportSignatureRequest().AttributedSignedObservations, &rmnpb.AttributedSignedObservation{
SignedObservation: &rmnpb.SignedObservation{
Observation: &rmnpb.Observation{
RmnHomeContractConfigDigest: make([]byte, 32),
LaneDest: &rmnpb.LaneDest{
DestChainSelector: math.MaxUint64,
OfframpAddress: make([]byte, 32),
},
FixedDestLaneUpdates: fixedDestLaneUpdates,
Timestamp: math.MaxUint64,
},
Signature: make([]byte, 256),
},
SignerNodeIndex: math.MaxUint32,
})
}

reqBytes, err = proto.Marshal(req)
if err != nil {
panic(err)
}
maxReportSigRequestBytes = len(reqBytes)
}
48 changes: 48 additions & 0 deletions commit/merkleroot/rmn/streamconfig_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package rmn

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

func Test_newStreamConfig(t *testing.T) {
lggr := logger.Test(t)

streamName := "myCoolStream"

cfg := newStreamConfig(lggr, streamName)

assert.Equal(t, streamName, cfg.StreamName)

assert.Equal(t, 1, cfg.OutgoingBufferSize)

assert.Equal(t, 1, cfg.IncomingBufferSize)

// message length
assert.Greater(t, cfg.MaxMessageLength, 25*megaByte)
assert.Less(t, cfg.MaxMessageLength, 27*megaByte)

// message rate
assert.Greater(t, cfg.MessagesLimit.Rate, 3.4)
assert.Less(t, cfg.MessagesLimit.Rate, 3.8)

// message capacity
assert.Equal(t, 9, int(cfg.MessagesLimit.Capacity))

// bytes rate
assert.Greater(t, cfg.BytesLimit.Rate, float64(30*megaByte))
assert.Less(t, cfg.BytesLimit.Rate, float64(33*megaByte))

// bytes capacity
assert.Greater(t, int(cfg.BytesLimit.Capacity), 77*megaByte)
assert.Less(t, int(cfg.BytesLimit.Capacity), 80*megaByte)
}

const (
byt = 1
kiloByte = 1024 * byt
megaByte = 1024 * kiloByte
)

0 comments on commit 0caf764

Please sign in to comment.