diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 0b4d886f48..aadf14763c 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -57,10 +57,10 @@ proc addSeen*(f: FloodSub, saltedId: SaltedId): bool = proc firstSeen*(f: FloodSub, saltedId: SaltedId): Moment = f.seen.addedAt(saltedId) -proc handleSubscribe*(f: FloodSub, - peer: PubSubPeer, - topic: string, - subscribe: bool) = +proc handleSubscribe(f: FloodSub, + peer: PubSubPeer, + topic: string, + subscribe: bool) = logScope: peer topic @@ -106,10 +106,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) = method rpcHandler*(f: FloodSub, peer: PubSubPeer, data: seq[byte]) {.async.} = - var rpcMsg = decodeRpcMsg(data).valueOr: debug "failed to decode msg from peer", peer, err = error - raise newException(CatchableError, "") + raise newException(CatchableError, "Peer msg couldn't be decoded") trace "decoded msg from peer", peer, msg = rpcMsg.shortLog # trigger hooks diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index ba7e520c83..1a8be68a37 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -266,10 +266,10 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = procCall FloodSub(g).unsubscribePeer(peer) -proc handleSubscribe*(g: GossipSub, - peer: PubSubPeer, - topic: string, - subscribe: bool) = +proc handleSubscribe(g: GossipSub, + peer: PubSubPeer, + topic: string, + subscribe: bool) = logScope: peer topic @@ -395,9 +395,7 @@ proc validateAndRelay(g: GossipSub, g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[]) g.mesh.withValue(topic, peers): toSendPeers.incl(peers[]) - - # add direct peers - toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(topic)) + g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[]) # Don't send it to source peer, or peers that # sent it during validation @@ -468,6 +466,11 @@ method rpcHandler*(g: GossipSub, var rpcMsg = decodeRpcMsg(data).valueOr: debug "failed to decode msg from peer", peer, err = error await rateLimit(g, peer, msgSize) + # Raising in the handler closes the gossipsub connection (but doesn't + # disconnect the peer!) + # TODO evaluate behaviour penalty values + peer.behaviourPenalty += 0.1 + raise newException(CatchableError, "Peer msg couldn't be decoded") when defined(libp2p_expensive_metrics): @@ -477,12 +480,13 @@ method rpcHandler*(g: GossipSub, trace "decoded msg from peer", peer, msg = rpcMsg.shortLog await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize)) - # trigger hooks + # trigger hooks - these may modify the message peer.recvObservers(rpcMsg) if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true) peer.pingBudget.dec + for i in 0.. 0: - let - bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate - msToTransmit = max(data.len div bandwidth, 1) - maxPeersToFlodOpt = Opt.some(max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)) + + let maxPeersToFlood = + if g.parameters.bandwidthEstimatebps > 0: + let + bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate + msToTransmit = max(data.len div bandwidth, 1) + max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow) + else: + int.high() # unlimited for peer in g.gossipsub.getOrDefault(topic): - maxPeersToFlodOpt.withValue(maxPeersToFlod): - if peers.len >= maxPeersToFlod: break + if peers.len >= maxPeersToFlood: break + if peer.score >= g.parameters.publishThreshold: trace "publish: including flood/high score peer", peer peers.incl(peer) - if peers.len < g.parameters.dLow: - # not subscribed, or bad mesh, send to fanout peers - var fanoutPeers = g.fanout.getOrDefault(topic).toSeq() - if fanoutPeers.len < g.parameters.dLow: - g.replenishFanout(topic) - fanoutPeers = g.fanout.getOrDefault(topic).toSeq() + elif peers.len < g.parameters.dLow: + # not subscribed or bad mesh, send to fanout peers + # when flood-publishing, fanout won't help since all potential peers have + # already been added + g.replenishFanout(topic) # Make sure fanout is populated + + var fanoutPeers = g.fanout.getOrDefault(topic).toSeq() g.rng.shuffle(fanoutPeers) for fanPeer in fanoutPeers: peers.incl(fanPeer) if peers.len > g.parameters.d: break - # even if we couldn't publish, - # we still attempted to publish - # on the topic, so it makes sense - # to update the last topic publish - # time + # Attempting to publish counts as fanout send (even if the message + # ultimately is not sent) g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL) if peers.len == 0: @@ -690,7 +692,9 @@ method publish*(g: GossipSub, trace "Created new message", msg = shortLog(msg), peers = peers.len if g.addSeen(g.salt(msgId)): - # custom msgid providers might cause this + # If the message was received or published recently, don't re-publish it - + # this might happen when not using sequence id:s and / or with a custom + # message id provider trace "Dropping already-seen message" return 0 diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 7b6de15c67..9b82d33dae 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -204,7 +204,6 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe routingRecords - proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: let topic = prune.topicID @@ -611,10 +610,10 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] = x notin gossipPeers and x.score >= g.parameters.gossipThreshold - var target = g.parameters.dLazy - let factor = (g.parameters.gossipFactor.float * allPeers.len.float).int - if factor > target: - target = min(factor, allPeers.len) + # https://github.com/libp2p/specs/blob/98c5aa9421703fc31b0833ad8860a55db15be063/pubsub/gossipsub/gossipsub-v1.1.md#adaptive-gossip-dissemination + let + factor = (g.parameters.gossipFactor.float * allPeers.len.float).int + target = max(g.parameters.dLazy, factor) if target < allPeers.len: g.rng.shuffle(allPeers) diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index ef8a487455..13dcb8809d 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -172,8 +172,6 @@ type subscribedDirectPeers*: PeerTable # directpeers that we keep alive backingOff*: BackoffTable # peers to backoff from when replenishing the mesh lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics - gossip*: Table[string, seq[ControlIHave]] # pending gossip - control*: Table[string, ControlMessage] # pending control messages mcache*: MCache # messages cache validationSeen*: ValidationSeenTable # peers who sent us message in validation heartbeatFut*: Future[void] # cancellation future for heartbeat interval