Skip to content

Commit

Permalink
Reusing streams for efficient requests
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Nov 19, 2024
1 parent 6f9fa95 commit d238ec5
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 3 deletions.
16 changes: 14 additions & 2 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,26 @@ func (p *P2PNode) requestFromPeers(topic *pubsubManager.Topic, requestData inter
case <-p.ctx.Done():
return
default:
peers := p.peerManager.GetPeers(topic)
// Use stream peers if the node has accumulated
// c_streamPeerThreshold number of streams otherwise look up peers
// from the database and create streams with them
peers := p.peerManager.GetStreamPeers()
if len(peers) < c_streamPeerThreshold {
peersMap := p.peerManager.GetPeers(topic)
peers = make([]peer.ID, 0)
for peer := range peersMap {
peers = append(peers, peer)
}
} else {
peers = peers[:pubsubManager.C_defaultRequestDegree]
}
log.Global.WithFields(log.Fields{
"peers": peers,
"topic": topic,
}).Debug("Requesting data from peers")

var requestWg sync.WaitGroup
for peerID := range peers {
for _, peerID := range peers {
// if we have exceeded the outbound rate limit for this peer, skip them for now
if err := protocol.ProcRequestRate(peerID, false); err != nil {
log.Global.Warnf("Exceeded request rate to peer %s", peerID)
Expand Down
3 changes: 2 additions & 1 deletion p2p/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (

const (
// c_defaultCacheSize is the default size for the p2p cache
c_defaultCacheSize = 32
c_defaultCacheSize = 32
c_streamPeerThreshold = 25
)

// P2PNode represents a libp2p node
Expand Down
4 changes: 4 additions & 0 deletions p2p/node/peerManager/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ func (pm *BasicPeerManager) GetPeers(topic *pubsubManager.Topic) map[p2p.PeerID]
return pm.queryDHT(topic, peerList, topic.GetRequestDegree()-lenPeer)
}

func (pm *BasicPeerManager) GetStreamPeers() []peer.ID {
return pm.streamManager.GetStreamPeers()
}

func (pm *BasicPeerManager) queryDHT(topic *pubsubManager.Topic, peerList map[p2p.PeerID]struct{}, peerCount int) map[p2p.PeerID]struct{} {
// create a Cid from the slice location
shardCid := pubsubManager.TopicToCid(topic)
Expand Down
7 changes: 7 additions & 0 deletions p2p/node/streamManager/streamManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type StreamManager interface {
// GetStream returns a valid stream, either creating a new one or returning an existing one
GetStream(peer.ID) (network.Stream, error)

// GetStreamPeers returns the peers that the node has open streams with
GetStreamPeers() []peer.ID

// CloseStream goes through all the steps to properly close and remove a stream's resources
CloseStream(peer.ID) error

Expand Down Expand Up @@ -215,6 +218,10 @@ func (sm *basicStreamManager) GetStream(peerID p2p.PeerID) (network.Stream, erro
return wrappedStream.stream, err
}

func (sm *basicStreamManager) GetStreamPeers() []peer.ID {
return sm.streamCache.Keys()
}

func (sm *basicStreamManager) SetP2PBackend(host quaiprotocol.QuaiP2PNode) {
sm.p2pBackend = host
}
Expand Down

0 comments on commit d238ec5

Please sign in to comment.