diff --git a/p2p/node/api.go b/p2p/node/api.go index 4f140737e6..9da5d96a0e 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -166,14 +166,26 @@ func (p *P2PNode) requestFromPeers(topic *pubsubManager.Topic, requestData inter case <-p.ctx.Done(): return default: - peers := p.peerManager.GetPeers(topic) + // Use stream peers if the node has accumulated + // c_streamPeerThreshold number of streams otherwise look up peers + // from the database and create streams with them + peers := p.peerManager.GetStreamPeers() + if len(peers) < c_streamPeerThreshold { + peersMap := p.peerManager.GetPeers(topic) + peers = make([]peer.ID, 0) + for peer := range peersMap { + peers = append(peers, peer) + } + } else { + peers = peers[:pubsubManager.C_defaultRequestDegree] + } log.Global.WithFields(log.Fields{ "peers": peers, "topic": topic, }).Debug("Requesting data from peers") var requestWg sync.WaitGroup - for peerID := range peers { + for _, peerID := range peers { // if we have exceeded the outbound rate limit for this peer, skip them for now if err := protocol.ProcRequestRate(peerID, false); err != nil { log.Global.Warnf("Exceeded request rate to peer %s", peerID) diff --git a/p2p/node/node.go b/p2p/node/node.go index 9b412df21e..b9b830f89d 100644 --- a/p2p/node/node.go +++ b/p2p/node/node.go @@ -37,7 +37,8 @@ import ( const ( // c_defaultCacheSize is the default size for the p2p cache - c_defaultCacheSize = 32 + c_defaultCacheSize = 32 + c_streamPeerThreshold = 25 ) // P2PNode represents a libp2p node diff --git a/p2p/node/peerManager/peerManager.go b/p2p/node/peerManager/peerManager.go index e5a96260e7..68295ecf71 100644 --- a/p2p/node/peerManager/peerManager.go +++ b/p2p/node/peerManager/peerManager.go @@ -488,6 +488,10 @@ func (pm *BasicPeerManager) GetPeers(topic *pubsubManager.Topic) map[p2p.PeerID] return pm.queryDHT(topic, peerList, topic.GetRequestDegree()-lenPeer) } +func (pm *BasicPeerManager) GetStreamPeers() []peer.ID { + return pm.streamManager.GetStreamPeers() +} + func (pm *BasicPeerManager) queryDHT(topic *pubsubManager.Topic, peerList map[p2p.PeerID]struct{}, peerCount int) map[p2p.PeerID]struct{} { // create a Cid from the slice location shardCid := pubsubManager.TopicToCid(topic) diff --git a/p2p/node/streamManager/streamManager.go b/p2p/node/streamManager/streamManager.go index ebfb53f10d..c7cc68d96e 100644 --- a/p2p/node/streamManager/streamManager.go +++ b/p2p/node/streamManager/streamManager.go @@ -53,6 +53,9 @@ type StreamManager interface { // GetStream returns a valid stream, either creating a new one or returning an existing one GetStream(peer.ID) (network.Stream, error) + // GetStreamPeers returns the peers that the node has open streams with + GetStreamPeers() []peer.ID + // CloseStream goes through all the steps to properly close and remove a stream's resources CloseStream(peer.ID) error @@ -215,6 +218,10 @@ func (sm *basicStreamManager) GetStream(peerID p2p.PeerID) (network.Stream, erro return wrappedStream.stream, err } +func (sm *basicStreamManager) GetStreamPeers() []peer.ID { + return sm.streamCache.Keys() +} + func (sm *basicStreamManager) SetP2PBackend(host quaiprotocol.QuaiP2PNode) { sm.p2pBackend = host }