Skip to content

Commit

Permalink
refactor(p2p): tune channel priorities and move channel definitions t…
Browse files Browse the repository at this point in the history
…o p2p/channel_params.go (#759)

* feat(p2p): throttled channel

(cherry picked from commit 5e41958)

* feat: limit mempool broadcast to 5/s

(cherry picked from commit cdde233)

* feat(p2p): channel recv rate limiting - not tested

(cherry picked from commit f7f7ce7)

* feat(p2p): channel recv rate limiting - continued

(cherry picked from commit 54e00f7)

* chore(p2p): regen channel mocks

* feat(config): mempool tx-send-rate-limit, tx-recv-rate-limit, tx-recv-rate-punish-peer

* chore: lint

* chore(mempool): burst recv twice as big as burst send

* chore: lint

* chore: remove not needed log

* chore: fixes after merge

* refactor(p2p): move chan descs to p2p.channel_params and tune priorities

* chore: fix after merge

* chore(statesync): fix linter issues - remove unused consts
  • Loading branch information
lklimek authored Mar 12, 2024
1 parent bc54195 commit 9028d25
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 144 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -306,5 +306,5 @@ require (
github.com/tendermint/go-amino v0.16.0
github.com/tyler-smith/go-bip39 v1.1.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/time v0.1.0
golang.org/x/time v0.5.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1200,8 +1200,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
65 changes: 9 additions & 56 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,54 +30,7 @@ var (
_ p2p.Wrapper = (*tmcons.Message)(nil)
)

// GetChannelDescriptor produces an instance of a descriptor for this
// package's required channels.
func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
return map[p2p.ChannelID]*p2p.ChannelDescriptor{
StateChannel: {
ID: StateChannel,
Priority: 8,
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "state",
},
DataChannel: {
// TODO: Consider a split between gossiping current block and catchup
// stuff. Once we gossip the whole block there is nothing left to send
// until next height or round.
ID: DataChannel,
Priority: 12,
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
Name: "data",
},
VoteChannel: {
ID: VoteChannel,
Priority: 10,
SendQueueCapacity: 64,
RecvBufferCapacity: 4096,
RecvMessageCapacity: maxMsgSize,
Name: "vote",
},
VoteSetBitsChannel: {
ID: VoteSetBitsChannel,
Priority: 5,
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
Name: "voteSet",
},
}
}

const (
StateChannel = p2p.ChannelID(0x20)
DataChannel = p2p.ChannelID(0x21)
VoteChannel = p2p.ChannelID(0x22)
VoteSetBitsChannel = p2p.ChannelID(0x23)

maxMsgSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.

blocksToContributeToBecomeGoodPeer = 10000
Expand Down Expand Up @@ -173,23 +126,23 @@ func (r *Reactor) OnStart(ctx context.Context) error {
var chBundle channelBundle
var err error

chans := getChannelDescriptors()
chBundle.state, err = r.chCreator(ctx, chans[StateChannel])
chans := p2p.ConsensusChannelDescriptors()
chBundle.state, err = r.chCreator(ctx, chans[p2p.ConsensusStateChannel])
if err != nil {
return err
}

chBundle.data, err = r.chCreator(ctx, chans[DataChannel])
chBundle.data, err = r.chCreator(ctx, chans[p2p.ConsensusDataChannel])
if err != nil {
return err
}

chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel])
chBundle.vote, err = r.chCreator(ctx, chans[p2p.ConsensusVoteChannel])
if err != nil {
return err
}

chBundle.voteSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel])
chBundle.voteSet, err = r.chCreator(ctx, chans[p2p.VoteSetBitsChannel])
if err != nil {
return err
}
Expand Down Expand Up @@ -785,13 +738,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
}

switch envelope.ChannelID {
case StateChannel:
case p2p.ConsensusStateChannel:
err = r.handleStateMessage(ctx, envelope, msg, chans.voteSet)
case DataChannel:
case p2p.ConsensusDataChannel:
err = r.handleDataMessage(ctx, envelope, msg)
case VoteChannel:
case p2p.ConsensusVoteChannel:
err = r.handleVoteMessage(ctx, envelope, msg)
case VoteSetBitsChannel:
case p2p.VoteSetBitsChannel:
err = r.handleVoteSetBitsMessage(ctx, envelope, msg)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
Expand Down
16 changes: 8 additions & 8 deletions internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,24 @@ func setup(
blocksyncSubs: make(map[types.NodeID]eventbus.Subscription, numNodes),
}

rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(StateChannel, size))
rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(DataChannel, size))
rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteChannel, size))
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteSetBitsChannel, size))
rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusStateChannel, size))
rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusDataChannel, size))
rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.ConsensusVoteChannel, size))
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(p2p.VoteSetBitsChannel, size))

ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
return func(_ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
switch desc.ID {
case StateChannel:
case p2p.ConsensusStateChannel:
return rts.stateChannels[nodeID], nil
case DataChannel:
case p2p.ConsensusDataChannel:
return rts.dataChannels[nodeID], nil
case VoteChannel:
case p2p.ConsensusVoteChannel:
return rts.voteChannels[nodeID], nil
case VoteSetBitsChannel:
case p2p.VoteSetBitsChannel:
return rts.voteSetBitsChannels[nodeID], nil
default:
return nil, fmt.Errorf("invalid channel; %v", desc.ID)
Expand Down
2 changes: 1 addition & 1 deletion internal/evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
func GetChannelDescriptor() *p2p.ChannelDescriptor {
return &p2p.ChannelDescriptor{
ID: EvidenceChannel,
Priority: 6,
Priority: 3,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32,
Name: "evidence",
Expand Down
116 changes: 91 additions & 25 deletions internal/p2p/channel_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ import (
)

const (
//
// Consensus channels
//
ConsensusStateChannel = ChannelID(0x20)
ConsensusDataChannel = ChannelID(0x21)
ConsensusVoteChannel = ChannelID(0x22)
VoteSetBitsChannel = ChannelID(0x23)

ErrorChannel = ChannelID(0x10)
// BlockSyncChannel is a channelStore for blocks and status updates
BlockSyncChannel = ChannelID(0x40)
Expand All @@ -41,21 +49,24 @@ const (
lightBlockMsgSize = int(1e7) // ~1MB
// paramMsgSize is the maximum size of a paramsResponseMessage
paramMsgSize = int(1e5) // ~100kb
// consensusMsgSize is the maximum size of a consensus message
maxMsgSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.

)

// ChannelDescriptors returns a map of all supported descriptors
func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
return map[ChannelID]*ChannelDescriptor{
channels := map[ChannelID]*ChannelDescriptor{
ErrorChannel: {
ID: ErrorChannel,
Priority: 6,
Priority: 7,
RecvMessageCapacity: blockMaxMsgSize,
RecvBufferCapacity: 32,
Name: "error",
},
BlockSyncChannel: {
ID: BlockSyncChannel,
Priority: 5,
Priority: 6,
SendQueueCapacity: 1000,
RecvBufferCapacity: 1024,
RecvMessageCapacity: types.MaxBlockSizeBytes +
Expand All @@ -65,7 +76,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
},
MempoolChannel: {
ID: MempoolChannel,
Priority: 5,
Priority: 2, // 5
RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes),
RecvBufferCapacity: 128,
Name: "mempool",
Expand All @@ -76,6 +87,21 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
RecvRateShouldErr: cfg.Mempool.TxRecvRatePunishPeer,
EnqueueTimeout: cfg.Mempool.TxEnqueueTimeout,
},
}

for k, v := range StatesyncChannelDescriptors() {
channels[k] = v
}
for k, v := range ConsensusChannelDescriptors() {
channels[k] = v
}

return channels
}

// ChannelDescriptors returns a map of all supported descriptors
func StatesyncChannelDescriptors() map[ChannelID]*ChannelDescriptor {
return map[ChannelID]*ChannelDescriptor{
SnapshotChannel: {
ID: SnapshotChannel,
Priority: 6,
Expand All @@ -86,23 +112,23 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
},
ChunkChannel: {
ID: ChunkChannel,
Priority: 3,
Priority: 4,
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
RecvBufferCapacity: 128,
Name: "chunk",
},
LightBlockChannel: {
ID: LightBlockChannel,
Priority: 5,
Priority: 6,
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128,
Name: "light-block",
},
ParamsChannel: {
ID: ParamsChannel,
Priority: 2,
Priority: 3,
SendQueueCapacity: 10,
RecvMessageCapacity: paramMsgSize,
RecvBufferCapacity: 128,
Expand All @@ -111,6 +137,48 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
}
}

// GetChannelDescriptor produces an instance of a descriptor for this
// package's required channels.
func ConsensusChannelDescriptors() map[ChannelID]*ChannelDescriptor {
return map[ChannelID]*ChannelDescriptor{
ConsensusStateChannel: {
ID: ConsensusStateChannel,
Priority: 18,
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "state",
},
ConsensusDataChannel: {
// TODO: Consider a split between gossiping current block and catchup
// stuff. Once we gossip the whole block there is nothing left to send
// until next height or round.
ID: ConsensusDataChannel,
Priority: 22,
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
Name: "data",
},
ConsensusVoteChannel: {
ID: ConsensusVoteChannel,
Priority: 20,
SendQueueCapacity: 64,
RecvBufferCapacity: 4096,
RecvMessageCapacity: maxMsgSize,
Name: "vote",
},
VoteSetBitsChannel: {
ID: VoteSetBitsChannel,
Priority: 15,
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
Name: "voteSet",
},
}
}

// ResolveChannelID returns channel ID according to message type
// currently only is supported blocksync channelID, the remaining channelIDs should be added as it will be necessary
func ResolveChannelID(msg proto.Message) ChannelID {
Expand All @@ -121,6 +189,7 @@ func ResolveChannelID(msg proto.Message) ChannelID {
*blocksync.StatusRequest,
*blocksync.StatusResponse:
return BlockSyncChannel
// State sync
case *statesync.ChunkRequest,
*statesync.ChunkResponse:
return ChunkChannel
Expand All @@ -133,30 +202,27 @@ func ResolveChannelID(msg proto.Message) ChannelID {
case *statesync.LightBlockRequest,
*statesync.LightBlockResponse:
return LightBlockChannel
case *consensus.NewRoundStep,
*consensus.NewValidBlock,
// Consensus messages
case *consensus.VoteSetBits:
return VoteSetBitsChannel
case *consensus.Vote, *consensus.Commit:
return ConsensusVoteChannel
case *consensus.ProposalPOL,
*consensus.Proposal,
*consensus.ProposalPOL,
*consensus.BlockPart,
*consensus.Vote,
*consensus.BlockPart:
return ConsensusDataChannel
case *consensus.NewRoundStep, *consensus.NewValidBlock,
*consensus.HasCommit,
*consensus.HasVote,
*consensus.VoteSetMaj23,
*consensus.VoteSetBits,
*consensus.Commit,
*consensus.HasCommit:
// TODO: enable these channels when they are implemented
//*statesync.SnapshotsRequest,
//*statesync.SnapshotsResponse,
//*statesync.ChunkRequest,
//*statesync.ChunkResponse,
//*statesync.LightBlockRequest,
//*statesync.LightBlockResponse,
//*statesync.ParamsRequest,
//*statesync.ParamsResponse:
*consensus.VoteSetMaj23:
return ConsensusStateChannel
// pex
case *p2pproto.PexRequest,
*p2pproto.PexResponse,
*p2pproto.Echo:
// evidence
case *prototypes.Evidence:
// mempool
case *mempool.Txs:
return MempoolChannel
}
Expand Down
Loading

0 comments on commit 9028d25

Please sign in to comment.