From 9028d258b7cea2fbe52db4c1144c919eca848b69 Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Tue, 12 Mar 2024 17:05:58 +0100 Subject: [PATCH] refactor(p2p): tune channel priorities and move channel definitions to p2p/channel_params.go (#759) * feat(p2p): throttled channel (cherry picked from commit 5e41958bf8e6c0e58f7eeb1363049967d38b8353) * feat: limit mempool broadcast to 5/s (cherry picked from commit cdde233c1372968bfbf03095ab6242f74a721062) * feat(p2p): channel recv rate limiting - not tested (cherry picked from commit f7f7ce7fdc0cfcefac9465a7dc9a9913d80d4afd) * feat(p2p): channel recv rate limiting - continued (cherry picked from commit 54e00f7393f13e8e540ff2cf42ffa5fece166d7d) * chore(p2p): regen channel mocks * feat(config): mempool tx-send-rate-limit, tx-recv-rate-limit, tx-recv-rate-punish-peer * chore: lint * chore(mempool): burst recv twice as big as burst send * chore: lint * chore: remove not needed log * chore: fixes after merge * refactor(p2p): move chan descs to p2p.channel_params and tune priorities * chore: fix after merge * chore(statesync): fix linter issues - remove unused consts --- go.mod | 2 +- go.sum | 4 +- internal/consensus/reactor.go | 65 +++------------- internal/consensus/reactor_test.go | 16 ++-- internal/evidence/reactor.go | 2 +- internal/p2p/channel_params.go | 116 ++++++++++++++++++++++------- internal/statesync/reactor.go | 48 +----------- node/setup.go | 8 +- 8 files changed, 117 insertions(+), 144 deletions(-) diff --git a/go.mod b/go.mod index 0d8c3a5b9d..2723784b59 100644 --- a/go.mod +++ b/go.mod @@ -306,5 +306,5 @@ require ( github.com/tendermint/go-amino v0.16.0 github.com/tyler-smith/go-bip39 v1.1.0 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a - golang.org/x/time v0.1.0 + golang.org/x/time v0.5.0 ) diff --git a/go.sum b/go.sum index 2d03878a93..4f74de4350 100644 --- a/go.sum +++ b/go.sum @@ -1200,8 +1200,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= -golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 48499df38b..222f92e978 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -30,54 +30,7 @@ var ( _ p2p.Wrapper = (*tmcons.Message)(nil) ) -// GetChannelDescriptor produces an instance of a descriptor for this -// package's required channels. -func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { - return map[p2p.ChannelID]*p2p.ChannelDescriptor{ - StateChannel: { - ID: StateChannel, - Priority: 8, - SendQueueCapacity: 64, - RecvMessageCapacity: maxMsgSize, - RecvBufferCapacity: 128, - Name: "state", - }, - DataChannel: { - // TODO: Consider a split between gossiping current block and catchup - // stuff. Once we gossip the whole block there is nothing left to send - // until next height or round. - ID: DataChannel, - Priority: 12, - SendQueueCapacity: 64, - RecvBufferCapacity: 512, - RecvMessageCapacity: maxMsgSize, - Name: "data", - }, - VoteChannel: { - ID: VoteChannel, - Priority: 10, - SendQueueCapacity: 64, - RecvBufferCapacity: 4096, - RecvMessageCapacity: maxMsgSize, - Name: "vote", - }, - VoteSetBitsChannel: { - ID: VoteSetBitsChannel, - Priority: 5, - SendQueueCapacity: 8, - RecvBufferCapacity: 128, - RecvMessageCapacity: maxMsgSize, - Name: "voteSet", - }, - } -} - const ( - StateChannel = p2p.ChannelID(0x20) - DataChannel = p2p.ChannelID(0x21) - VoteChannel = p2p.ChannelID(0x22) - VoteSetBitsChannel = p2p.ChannelID(0x23) - maxMsgSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. blocksToContributeToBecomeGoodPeer = 10000 @@ -173,23 +126,23 @@ func (r *Reactor) OnStart(ctx context.Context) error { var chBundle channelBundle var err error - chans := getChannelDescriptors() - chBundle.state, err = r.chCreator(ctx, chans[StateChannel]) + chans := p2p.ConsensusChannelDescriptors() + chBundle.state, err = r.chCreator(ctx, chans[p2p.ConsensusStateChannel]) if err != nil { return err } - chBundle.data, err = r.chCreator(ctx, chans[DataChannel]) + chBundle.data, err = r.chCreator(ctx, chans[p2p.ConsensusDataChannel]) if err != nil { return err } - chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel]) + chBundle.vote, err = r.chCreator(ctx, chans[p2p.ConsensusVoteChannel]) if err != nil { return err } - chBundle.voteSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel]) + chBundle.voteSet, err = r.chCreator(ctx, chans[p2p.VoteSetBitsChannel]) if err != nil { return err } @@ -785,13 +738,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha } switch envelope.ChannelID { - case StateChannel: + case p2p.ConsensusStateChannel: err = r.handleStateMessage(ctx, envelope, msg, chans.voteSet) - case DataChannel: + case p2p.ConsensusDataChannel: err = r.handleDataMessage(ctx, envelope, msg) - case VoteChannel: + case p2p.ConsensusVoteChannel: err = r.handleVoteMessage(ctx, envelope, msg) - case VoteSetBitsChannel: + case p2p.VoteSetBitsChannel: err = r.handleVoteSetBitsMessage(ctx, envelope, msg) default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 07cc40dc37..269ff7a578 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -88,10 +88,10 @@ func setup( blocksyncSubs: make(map[types.NodeID]eventbus.Subscription, numNodes), } - rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(StateChannel, size)) - rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(DataChannel, size)) - rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteChannel, size)) - rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteSetBitsChannel, size)) + rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusStateChannel, size)) + rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusDataChannel, size)) + rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusVoteChannel, size)) + rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.VoteSetBitsChannel, size)) ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) @@ -99,13 +99,13 @@ func setup( chCreator := func(nodeID types.NodeID) p2p.ChannelCreator { return func(_ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) { switch desc.ID { - case StateChannel: + case p2p.ConsensusStateChannel: return rts.stateChannels[nodeID], nil - case DataChannel: + case p2p.ConsensusDataChannel: return rts.dataChannels[nodeID], nil - case VoteChannel: + case p2p.ConsensusVoteChannel: return rts.voteChannels[nodeID], nil - case VoteSetBitsChannel: + case p2p.VoteSetBitsChannel: return rts.voteSetBitsChannels[nodeID], nil default: return nil, fmt.Errorf("invalid channel; %v", desc.ID) diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index ff8f6b6309..54dd75a810 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -27,7 +27,7 @@ const ( func GetChannelDescriptor() *p2p.ChannelDescriptor { return &p2p.ChannelDescriptor{ ID: EvidenceChannel, - Priority: 6, + Priority: 3, RecvMessageCapacity: maxMsgSize, RecvBufferCapacity: 32, Name: "evidence", diff --git a/internal/p2p/channel_params.go b/internal/p2p/channel_params.go index 14b3676b1d..41b8272982 100644 --- a/internal/p2p/channel_params.go +++ b/internal/p2p/channel_params.go @@ -17,6 +17,14 @@ import ( ) const ( + // + // Consensus channels + // + ConsensusStateChannel = ChannelID(0x20) + ConsensusDataChannel = ChannelID(0x21) + ConsensusVoteChannel = ChannelID(0x22) + VoteSetBitsChannel = ChannelID(0x23) + ErrorChannel = ChannelID(0x10) // BlockSyncChannel is a channelStore for blocks and status updates BlockSyncChannel = ChannelID(0x40) @@ -41,21 +49,24 @@ const ( lightBlockMsgSize = int(1e7) // ~1MB // paramMsgSize is the maximum size of a paramsResponseMessage paramMsgSize = int(1e5) // ~100kb + // consensusMsgSize is the maximum size of a consensus message + maxMsgSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. + ) // ChannelDescriptors returns a map of all supported descriptors func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { - return map[ChannelID]*ChannelDescriptor{ + channels := map[ChannelID]*ChannelDescriptor{ ErrorChannel: { ID: ErrorChannel, - Priority: 6, + Priority: 7, RecvMessageCapacity: blockMaxMsgSize, RecvBufferCapacity: 32, Name: "error", }, BlockSyncChannel: { ID: BlockSyncChannel, - Priority: 5, + Priority: 6, SendQueueCapacity: 1000, RecvBufferCapacity: 1024, RecvMessageCapacity: types.MaxBlockSizeBytes + @@ -65,7 +76,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { }, MempoolChannel: { ID: MempoolChannel, - Priority: 5, + Priority: 2, // 5 RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes), RecvBufferCapacity: 128, Name: "mempool", @@ -76,6 +87,21 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { RecvRateShouldErr: cfg.Mempool.TxRecvRatePunishPeer, EnqueueTimeout: cfg.Mempool.TxEnqueueTimeout, }, + } + + for k, v := range StatesyncChannelDescriptors() { + channels[k] = v + } + for k, v := range ConsensusChannelDescriptors() { + channels[k] = v + } + + return channels +} + +// ChannelDescriptors returns a map of all supported descriptors +func StatesyncChannelDescriptors() map[ChannelID]*ChannelDescriptor { + return map[ChannelID]*ChannelDescriptor{ SnapshotChannel: { ID: SnapshotChannel, Priority: 6, @@ -86,7 +112,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { }, ChunkChannel: { ID: ChunkChannel, - Priority: 3, + Priority: 4, SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, RecvBufferCapacity: 128, @@ -94,7 +120,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { }, LightBlockChannel: { ID: LightBlockChannel, - Priority: 5, + Priority: 6, SendQueueCapacity: 10, RecvMessageCapacity: lightBlockMsgSize, RecvBufferCapacity: 128, @@ -102,7 +128,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { }, ParamsChannel: { ID: ParamsChannel, - Priority: 2, + Priority: 3, SendQueueCapacity: 10, RecvMessageCapacity: paramMsgSize, RecvBufferCapacity: 128, @@ -111,6 +137,48 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { } } +// GetChannelDescriptor produces an instance of a descriptor for this +// package's required channels. +func ConsensusChannelDescriptors() map[ChannelID]*ChannelDescriptor { + return map[ChannelID]*ChannelDescriptor{ + ConsensusStateChannel: { + ID: ConsensusStateChannel, + Priority: 18, + SendQueueCapacity: 64, + RecvMessageCapacity: maxMsgSize, + RecvBufferCapacity: 128, + Name: "state", + }, + ConsensusDataChannel: { + // TODO: Consider a split between gossiping current block and catchup + // stuff. Once we gossip the whole block there is nothing left to send + // until next height or round. + ID: ConsensusDataChannel, + Priority: 22, + SendQueueCapacity: 64, + RecvBufferCapacity: 512, + RecvMessageCapacity: maxMsgSize, + Name: "data", + }, + ConsensusVoteChannel: { + ID: ConsensusVoteChannel, + Priority: 20, + SendQueueCapacity: 64, + RecvBufferCapacity: 4096, + RecvMessageCapacity: maxMsgSize, + Name: "vote", + }, + VoteSetBitsChannel: { + ID: VoteSetBitsChannel, + Priority: 15, + SendQueueCapacity: 8, + RecvBufferCapacity: 128, + RecvMessageCapacity: maxMsgSize, + Name: "voteSet", + }, + } +} + // ResolveChannelID returns channel ID according to message type // currently only is supported blocksync channelID, the remaining channelIDs should be added as it will be necessary func ResolveChannelID(msg proto.Message) ChannelID { @@ -121,6 +189,7 @@ func ResolveChannelID(msg proto.Message) ChannelID { *blocksync.StatusRequest, *blocksync.StatusResponse: return BlockSyncChannel + // State sync case *statesync.ChunkRequest, *statesync.ChunkResponse: return ChunkChannel @@ -133,30 +202,27 @@ func ResolveChannelID(msg proto.Message) ChannelID { case *statesync.LightBlockRequest, *statesync.LightBlockResponse: return LightBlockChannel - case *consensus.NewRoundStep, - *consensus.NewValidBlock, + // Consensus messages + case *consensus.VoteSetBits: + return VoteSetBitsChannel + case *consensus.Vote, *consensus.Commit: + return ConsensusVoteChannel + case *consensus.ProposalPOL, *consensus.Proposal, - *consensus.ProposalPOL, - *consensus.BlockPart, - *consensus.Vote, + *consensus.BlockPart: + return ConsensusDataChannel + case *consensus.NewRoundStep, *consensus.NewValidBlock, + *consensus.HasCommit, *consensus.HasVote, - *consensus.VoteSetMaj23, - *consensus.VoteSetBits, - *consensus.Commit, - *consensus.HasCommit: - // TODO: enable these channels when they are implemented - //*statesync.SnapshotsRequest, - //*statesync.SnapshotsResponse, - //*statesync.ChunkRequest, - //*statesync.ChunkResponse, - //*statesync.LightBlockRequest, - //*statesync.LightBlockResponse, - //*statesync.ParamsRequest, - //*statesync.ParamsResponse: + *consensus.VoteSetMaj23: + return ConsensusStateChannel + // pex case *p2pproto.PexRequest, *p2pproto.PexResponse, *p2pproto.Echo: + // evidence case *prototypes.Evidence: + // mempool case *mempool.Txs: return MempoolChannel } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 4dd794268d..602bfcfe0a 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -48,18 +48,6 @@ const ( // recentSnapshots is the number of recent snapshots to send and receive per peer. recentSnapshots = 10 - // snapshotMsgSize is the maximum size of a snapshotResponseMessage - snapshotMsgSize = int(4e6) // ~4MB - - // chunkMsgSize is the maximum size of a chunkResponseMessage - chunkMsgSize = int(16e6) // ~16MB - - // lightBlockMsgSize is the maximum size of a lightBlockResponseMessage - lightBlockMsgSize = int(1e7) // ~1MB - - // paramMsgSize is the maximum size of a paramsResponseMessage - paramMsgSize = int(1e5) // ~100kb - // lightBlockResponseTimeout is how long the dispatcher waits for a peer to // return a light block lightBlockResponseTimeout = 10 * time.Second @@ -83,41 +71,7 @@ const ( ) func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { - return map[p2p.ChannelID]*p2p.ChannelDescriptor{ - SnapshotChannel: { - ID: SnapshotChannel, - Priority: 6, - SendQueueCapacity: 10, - RecvMessageCapacity: snapshotMsgSize, - RecvBufferCapacity: 128, - Name: "snapshot", - }, - ChunkChannel: { - ID: ChunkChannel, - Priority: 3, - SendQueueCapacity: 4, - RecvMessageCapacity: chunkMsgSize, - RecvBufferCapacity: 128, - Name: "chunk", - }, - LightBlockChannel: { - ID: LightBlockChannel, - Priority: 5, - SendQueueCapacity: 10, - RecvMessageCapacity: lightBlockMsgSize, - RecvBufferCapacity: 128, - Name: "light-block", - }, - ParamsChannel: { - ID: ParamsChannel, - Priority: 2, - SendQueueCapacity: 10, - RecvMessageCapacity: paramMsgSize, - RecvBufferCapacity: 128, - Name: "params", - }, - } - + return p2p.StatesyncChannelDescriptors() } // Metricer defines an interface used for the rpc sync info query, please see statesync.metrics diff --git a/node/setup.go b/node/setup.go index 0262749f13..8d3dbf2bef 100644 --- a/node/setup.go +++ b/node/setup.go @@ -367,10 +367,10 @@ func makeNodeInfo( Version: version.TMCoreSemVer, Channels: tmsync.NewConcurrentSlice[uint16]( uint16(p2p.BlockSyncChannel), - uint16(consensus.StateChannel), - uint16(consensus.DataChannel), - uint16(consensus.VoteChannel), - uint16(consensus.VoteSetBitsChannel), + uint16(p2p.ConsensusStateChannel), + uint16(p2p.ConsensusDataChannel), + uint16(p2p.ConsensusVoteChannel), + uint16(p2p.VoteSetBitsChannel), uint16(p2p.MempoolChannel), uint16(evidence.EvidenceChannel), uint16(statesync.SnapshotChannel),