Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staggered message sending with elimination of peers during transmission based on idontwants #1100

Draft
wants to merge 2 commits into
base: p2p-research
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,19 @@ proc validateAndRelay(g: GossipSub,
break
toSendPeers.excl(seenPeers)

#We first send to the outbound peers to avoid peers sending same message to each other
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate on why that is the case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was intended to avoid A sending to B and B sending to A at the same time,,, but yes, its not making much impact,,, due to the preceding IDontWants

var outboundPeers: seq[PubSubPeer]
for mpeer in toSendPeers:
if mpeer.outbound():
outboundPeers.add(mpeer)
if outboundPeers.len > 0:
g.broadcast(outboundPeers, RPCMsg(messages: @[msg]), isHighPriority = false, some(saltedId))
toSendPeers.excl(outboundPeers.toHashSet)

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, some(saltedId))
trace "forwarded message to peers", peers = toSendPeers.len + outboundPeers.len, msgId, peer

if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
Expand Down Expand Up @@ -529,6 +538,11 @@ method rpcHandler*(g: GossipSub,

libp2p_gossipsub_duplicate.inc()

#Dont relay to the peers from which we already received
#We do it for large messages only
if msg.data.len > msgId.len * 10:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the message is in seen, it should not be sent to anyone meaning we should not need to track it individually (doing so costs significant amounts of memory)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, right - the seen check has been done and we need to stop the message after the seen check . hm . it would likely be better that we keep track of in-flight / queued messages and cancel these sends specifically on idontwant rather than storing all idontwant hashes

Copy link
Contributor

@diegomrsantos diegomrsantos May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already have this here?

g.handleIDontWant(peer, control.idontwant)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, right - the seen check has been done and we need to stop the message after the seen check . hm . it would likely be better that we keep track of in-flight / queued messages and cancel these sends specifically on idontwant rather than storing all idontwant hashes

Haven't this been already tried here? #851 (comment)

Copy link
Contributor

@diegomrsantos diegomrsantos May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe heDontWants is meant to handle idontwant msgs received and we already have the seen data structure to avoid relaying duplicates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This data structure is probably to store ID of msgs other peers in the network have already seen but we haven't yet. Then when we do see it, we don't send it to them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO our send inserts the message in non-priority queue, and may consume some time before its aired. if we receive the message during this interval, we can still cancel transmit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but

g.handleIDontWant(peer, control.idontwant)
and checking idontwant when dequeuing from non-prio are enough. We don't seem to need this code for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could even end up not sending the msg to the current peer as it is enqueued and before sending we check idontwant.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this part does not have much impact!

peer.heDontWants[^1].incl(msgIdSalted)

# onto the next message
continue

Expand Down Expand Up @@ -700,15 +714,23 @@ method publish*(g: GossipSub,

g.mcache.put(msgId, msg)

#We first send to the outbound peers
var outboundPeers: seq[PubSubPeer]
for mpeer in peers:
if mpeer.outbound():
outboundPeers.add(mpeer)
if outboundPeers.len > 0:
g.broadcast(outboundPeers, RPCMsg(messages: @[msg]), isHighPriority = true)
peers.excl(outboundPeers.toHashSet)
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)

if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
libp2p_pubsub_messages_published.inc( (peers.len + outboundPeers.len).int64, labelValues = [topic])
else:
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])
libp2p_pubsub_messages_published.inc( (peers.len + outboundPeers.len).int64, labelValues = ["generic"])

trace "Published message to peers", peers=peers.len
return peers.len
trace "Published message to peers", peers=peers.len + outboundPeers.len
return (peers.len + outboundPeers.len)

proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
if id notin g.peers:
Expand Down
21 changes: 14 additions & 7 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import ./errors as pubsub_errors,
../../peerid,
../../peerinfo,
../../errors,
../../utility
../../utility,
../../utils/semaphore

import stew/results
export results
Expand Down Expand Up @@ -79,6 +80,9 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab
declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])

const
DefaultMaxSimultaneousTx* = 2

type
InitializationError* = object of LPError

Expand Down Expand Up @@ -127,6 +131,7 @@ type
rng*: ref HmacDrbgContext

knownTopics*: HashSet[string]
semTxLimit: AsyncSemaphore

method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
## handle peer disconnects
Expand All @@ -137,7 +142,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =

libp2p_pubsub_peers.set(p.peers.len.int64)

proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} =
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
##
## Parameters:
Expand All @@ -149,13 +154,14 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.rai
## priority messages have been sent.

trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize, isHighPriority)
peer.send(msg, p.anonymize, isHighPriority, saltedId)

proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg,
isHighPriority: bool) {.raises: [].} =
isHighPriority: bool,
sid: Option[SaltedId] = none(SaltedId)) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
##
## Parameters:
Expand Down Expand Up @@ -210,12 +216,12 @@ proc broadcast*(

if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg, isHighPriority)
p.send(peer, msg, isHighPriority, sid)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded, isHighPriority)
asyncSpawn peer.sendEncoded(encoded, isHighPriority, sid)

proc sendSubs*(p: PubSub,
peer: PubSubPeer,
Expand Down Expand Up @@ -310,7 +316,7 @@ method getOrCreatePeer*(
p.onPubSubPeerEvent(peer, event)

# create new pubsub peer
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize)
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, addr p.semTxLimit)
debug "created new pubsub peer", peerId

p.peers[peerId] = pubSubPeer
Expand Down Expand Up @@ -509,6 +515,7 @@ method initPubSub*(p: PubSub)
p.observers = new(seq[PubSubObserver])
if p.msgIdProvider == nil:
p.msgIdProvider = defaultMsgIdProvider
p.semTxLimit = newAsyncSemaphore(DefaultMaxSimultaneousTx)

method addValidator*(p: PubSub,
topic: varargs[string],
Expand Down
100 changes: 74 additions & 26 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import rpc/[messages, message, protobuf],
../../stream/connection,
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
../../utility,
../../utils/semaphore
import atomics

export peerid, connection, deques

Expand All @@ -37,6 +39,12 @@ when defined(pubsubpeer_queue_metrics):

declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity")

var
libp2p_gossipsub_staggerDontWantSave2: Atomic[int]
libp2p_gossipsub_staggerDontWantSave3: Atomic[int]

export libp2p_gossipsub_staggerDontWantSave2, libp2p_gossipsub_staggerDontWantSave3

const
DefaultMaxNumElementsInNonPriorityQueue* = 1024

Expand All @@ -59,11 +67,15 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

MessageWithSaltedId = object
message: seq[byte]
sid: SaltedId

RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
nonPriorityQueue: AsyncQueue[seq[byte]]
nonPriorityQueue: AsyncQueue[MessageWithSaltedId]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]

Expand Down Expand Up @@ -91,6 +103,7 @@ type
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]

semTxLimit: ptr AsyncSemaphore #Control Max simultaneous transmissions to speed up indivisual receptions
rpcmessagequeue: RpcMessageQueue
maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue.
disconnected: bool
Expand All @@ -107,6 +120,10 @@ when defined(libp2p_agents_metrics):
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent

proc newMessageWithSaltedId(msg: seq[byte], saltedId: SaltedId): MessageWithSaltedId =
result.message = msg
result.sid = saltedId

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
Expand Down Expand Up @@ -311,24 +328,49 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
debug "No send connection", p, msg = shortLog(msg)
return

trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
await sendMsgContinue(conn, conn.writeLp(msg))

proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] =
proc sendMsg(p: PubSubPeer, msg: seq[byte], saltedId: Option[SaltedId] = none(SaltedId)): Future[void] {.async.}=
if p.sendConn == nil or p.sendConn.closed():
await sendMsgSlow(p, msg)
if p.sendConn != nil and not p.sendConn.closed():
# Fast path that avoids copying msg (which happens for {.async.})
let conn = p.sendConn

trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
let f = conn.writeLp(msg)
if not f.completed():
sendMsgContinue(conn, f)
else:
f
else:
sendMsgSlow(p, msg)

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] =
if msg.len < 2000:
try:
let f = conn.writeLp(msg)
await f
except:
await conn.close()
else:
await p.semTxLimit[].acquire()
#We may have received DontWant for the message
if saltedId.isSome:
for heDontWant in p.heDontWants:
if saltedId.get in heDontWant:
p.semTxLimit[].release()
atomicInc(libp2p_gossipsub_staggerDontWantSave2)
return
try:
let f = conn.writeLp(msg)
let turns = (msg.len div 100_000) + 1
for i in 1..turns:
await f or sleepAsync(200.milliseconds) #sleep time should be adaptive to the peer bandwidth
if not f.completed and saltedId.isSome:
for heDontWant in p.heDontWants:
if saltedId.get in heDontWant:
atomicInc(libp2p_gossipsub_staggerDontWantSave3)
break
if not f.completed:
await f.cancelAndWait()
#asyncSpawn sendMsgContinue(conn, f)
p.semTxLimit[].release()

except LPStreamError as exc:
p.semTxLimit[].release()
await conn.close()

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)): Future[void] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is saltedId optional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is saltedId optional?

same SendMsg for control and published messages, so for some messages we did not need saltedID

Copy link
Contributor

@diegomrsantos diegomrsantos May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we can improve this design as I believe high-prio are only control and msgs published by the local peer. It means no other peer can send an idontwant for them. It means we can move the idontwant to the non-prio branch in this proc. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, If we were enqueueing those msgs and we didn't dequeue them fast enough, peers could receive them via peers we sent first. But as we always send them immediately, it seems this can't happen. Makes sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we can improve this design as I believe high-prio are only control and msgs published by the local peer. It means no other peer can send an idontwant for them. It means we can move the idontwant to the non-prio branch in this proc. Wdyt?

yes, we dont need to check hedontwants for the messages published by the local peer,,, but i guess IDontWant should be transmitted with the highest priority

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, i believe, IDontWant should be issued before this step. Because even if a message is not valid, we can react early to stop the spread of such message. And in case of valid messages, we get to learn early that our peers have received this message

Copy link
Contributor

@diegomrsantos diegomrsantos May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that checking idontwant before sending a msg could be done only for relayed msgs, at line 417 of this file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes for relayed messages only

## Asynchronously sends an encoded message to a specified `PubSubPeer`.
##
## Parameters:
Expand All @@ -354,7 +396,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
Future[void].completed()
elif isHighPriority or emptyQueues:
let f = p.sendMsg(msg)
let f = p.sendMsg(msg, saltedId)
if not f.finished:
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
when defined(pubsubpeer_queue_metrics):
Expand All @@ -369,10 +411,13 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
else:
Future[void].completed()
else:
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
f
if not saltedId.isSome:
Future[void].completed()
else:
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithSaltedId(msg, saltedId.get))
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
f

iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
Expand Down Expand Up @@ -409,7 +454,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.raises: [].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)) {.raises: [].} =
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
##
## Parameters:
Expand Down Expand Up @@ -438,11 +483,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.

if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority)
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority, saltedId)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded, isHighPriority)
asyncSpawn p.sendEncoded(encoded, isHighPriority, saltedId)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
Expand All @@ -467,7 +512,8 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
discard await race(p.rpcmessagequeue.sendPriorityQueue[^1])
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)
await p.sendMsg(msg.message, some(msg.sid))
#asyncSpawn p.sendMsg(msg.message, some(msg.sid))

proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
Expand All @@ -489,7 +535,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
proc new(T: typedesc[RpcMessageQueue]): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[seq[byte]]()
nonPriorityQueue: newAsyncQueue[MessageWithSaltedId]()
)

proc new*(
Expand All @@ -499,6 +545,7 @@ proc new*(
onEvent: OnEvent,
codec: string,
maxMessageSize: int,
sem: ptr AsyncSemaphore,
maxNumElementsInNonPriorityQueue: int = DefaultMaxNumElementsInNonPriorityQueue,
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =

Expand All @@ -516,3 +563,4 @@ proc new*(
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[SaltedId]))
result.startSendNonPriorityTask()
result.semTxLimit = sem
Loading