-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Staggered message sending with elimination of peers during transmission based on idontwants #1100
base: p2p-research
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -432,10 +432,19 @@ proc validateAndRelay(g: GossipSub, | |||||
break | ||||||
toSendPeers.excl(peersWhoSentIdontwant) # avoids len(s) == length` the length of the HashSet changed while iterating over it [AssertionDefect] | ||||||
|
||||||
#We first send to the outbound peers to avoid peers sending same message to each other | ||||||
var outboundPeers: seq[PubSubPeer] | ||||||
for mpeer in toSendPeers: | ||||||
if mpeer.outbound(): | ||||||
outboundPeers.add(mpeer) | ||||||
if outboundPeers.len > 0: | ||||||
g.broadcast(outboundPeers, RPCMsg(messages: @[msg]), isHighPriority = false, some(saltedId)) | ||||||
toSendPeers.excl(outboundPeers.toHashSet) | ||||||
|
||||||
# In theory, if topics are the same in all messages, we could batch - we'd | ||||||
# also have to be careful to only include validated messages | ||||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) | ||||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer | ||||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, some(saltedId)) | ||||||
trace "forwarded message to peers", peers = toSendPeers.len + outboundPeers.len, msgId, peer | ||||||
|
||||||
if g.knownTopics.contains(topic): | ||||||
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) | ||||||
|
@@ -544,6 +553,11 @@ method rpcHandler*(g: GossipSub, | |||||
|
||||||
libp2p_gossipsub_duplicate.inc() | ||||||
|
||||||
#Dont relay to the peers from which we already received | ||||||
#We do it for large messages only | ||||||
if msg.data.len > msgId.len * 10: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the message is in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, right - the seen check has been done and we need to stop the message after the seen check . hm . it would likely be better that we keep track of in-flight / queued messages and cancel these sends specifically on idontwant rather than storing all idontwant hashes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we already have this here?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Haven't this been already tried here? #851 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This data structure is probably to store ID of msgs other peers in the network have already seen but we haven't yet. Then when we do see it, we don't send it to them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO our send inserts the message in non-priority queue, and may consume some time before its aired. if we receive the message during this interval, we can still cancel transmit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, but
idontwant when dequeuing from non-prio are enough. We don't seem to need this code for that.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we could even end up not sending the msg to the current peer as it is enqueued and before sending we check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, this part does not have much impact! |
||||||
peer.heDontWants[^1].incl(msgIdSalted) | ||||||
|
||||||
# onto the next message | ||||||
continue | ||||||
|
||||||
|
@@ -715,15 +729,23 @@ method publish*(g: GossipSub, | |||||
|
||||||
g.mcache.put(msgId, msg) | ||||||
|
||||||
#We first send to the outbound peers | ||||||
var outboundPeers: seq[PubSubPeer] | ||||||
for mpeer in peers: | ||||||
if mpeer.outbound(): | ||||||
outboundPeers.add(mpeer) | ||||||
if outboundPeers.len > 0: | ||||||
g.broadcast(outboundPeers, RPCMsg(messages: @[msg]), isHighPriority = true) | ||||||
peers.excl(outboundPeers.toHashSet) | ||||||
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) | ||||||
|
||||||
if g.knownTopics.contains(topic): | ||||||
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) | ||||||
libp2p_pubsub_messages_published.inc( (peers.len + outboundPeers.len).int64, labelValues = [topic]) | ||||||
else: | ||||||
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) | ||||||
libp2p_pubsub_messages_published.inc( (peers.len + outboundPeers.len).int64, labelValues = ["generic"]) | ||||||
|
||||||
trace "Published message to peers", peers=peers.len | ||||||
return peers.len | ||||||
trace "Published message to peers", peers=peers.len + outboundPeers.len | ||||||
return (peers.len + outboundPeers.len) | ||||||
|
||||||
proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} = | ||||||
if id notin g.peers: | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,9 @@ import rpc/[messages, message, protobuf], | |
../../stream/connection, | ||
../../crypto/crypto, | ||
../../protobuf/minprotobuf, | ||
../../utility | ||
../../utility, | ||
../../utils/semaphore | ||
import atomics | ||
|
||
export peerid, connection, deques | ||
|
||
|
@@ -37,6 +39,12 @@ when defined(pubsubpeer_queue_metrics): | |
|
||
declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity") | ||
|
||
var | ||
libp2p_gossipsub_staggerDontWantSave2: Atomic[int] | ||
libp2p_gossipsub_staggerDontWantSave3: Atomic[int] | ||
|
||
export libp2p_gossipsub_staggerDontWantSave2, libp2p_gossipsub_staggerDontWantSave3 | ||
|
||
const | ||
DefaultMaxNumElementsInNonPriorityQueue* = 1024 | ||
|
||
|
@@ -59,11 +67,15 @@ type | |
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init | ||
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} | ||
|
||
MessageWithSaltedId = object | ||
message: seq[byte] | ||
sid: SaltedId | ||
|
||
RpcMessageQueue* = ref object | ||
# Tracks async tasks for sending high-priority peer-published messages. | ||
sendPriorityQueue: Deque[Future[void]] | ||
# Queue for lower-priority messages, like "IWANT" replies and relay messages. | ||
nonPriorityQueue: AsyncQueue[seq[byte]] | ||
nonPriorityQueue: AsyncQueue[MessageWithSaltedId] | ||
# Task for processing non-priority message queue. | ||
sendNonPriorityTask: Future[void] | ||
|
||
|
@@ -91,6 +103,7 @@ type | |
behaviourPenalty*: float64 # the eventual penalty score | ||
overheadRateLimitOpt*: Opt[TokenBucket] | ||
|
||
semTxLimit: ptr AsyncSemaphore #Control Max simultaneous transmissions to speed up indivisual receptions | ||
rpcmessagequeue: RpcMessageQueue | ||
maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue. | ||
disconnected: bool | ||
|
@@ -107,6 +120,10 @@ when defined(libp2p_agents_metrics): | |
#so we have to read the parents short agent.. | ||
p.sendConn.getWrapped().shortAgent | ||
|
||
proc newMessageWithSaltedId(msg: seq[byte], saltedId: SaltedId): MessageWithSaltedId = | ||
result.message = msg | ||
result.sid = saltedId | ||
|
||
proc getAgent*(peer: PubSubPeer): string = | ||
return | ||
when defined(libp2p_agents_metrics): | ||
|
@@ -311,24 +328,49 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} = | |
debug "No send connection", p, msg = shortLog(msg) | ||
return | ||
|
||
trace "sending encoded msg to peer", conn, encoded = shortLog(msg) | ||
await sendMsgContinue(conn, conn.writeLp(msg)) | ||
|
||
proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] = | ||
proc sendMsg(p: PubSubPeer, msg: seq[byte], saltedId: Option[SaltedId] = none(SaltedId)): Future[void] {.async.}= | ||
if p.sendConn == nil or p.sendConn.closed(): | ||
await sendMsgSlow(p, msg) | ||
if p.sendConn != nil and not p.sendConn.closed(): | ||
# Fast path that avoids copying msg (which happens for {.async.}) | ||
let conn = p.sendConn | ||
|
||
trace "sending encoded msg to peer", conn, encoded = shortLog(msg) | ||
let f = conn.writeLp(msg) | ||
if not f.completed(): | ||
sendMsgContinue(conn, f) | ||
else: | ||
f | ||
else: | ||
sendMsgSlow(p, msg) | ||
|
||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] = | ||
if msg.len < 2000: | ||
try: | ||
let f = conn.writeLp(msg) | ||
await f | ||
except: | ||
await conn.close() | ||
else: | ||
await p.semTxLimit[].acquire() | ||
#We may have received DontWant for the message | ||
if saltedId.isSome: | ||
for heDontWant in p.heDontWants: | ||
if saltedId.get in heDontWant: | ||
p.semTxLimit[].release() | ||
atomicInc(libp2p_gossipsub_staggerDontWantSave2) | ||
return | ||
try: | ||
let f = conn.writeLp(msg) | ||
let turns = (msg.len div 100_000) + 1 | ||
for i in 1..turns: | ||
await f or sleepAsync(200.milliseconds) #sleep time should be adaptive to the peer bandwidth | ||
if not f.completed and saltedId.isSome: | ||
for heDontWant in p.heDontWants: | ||
if saltedId.get in heDontWant: | ||
atomicInc(libp2p_gossipsub_staggerDontWantSave3) | ||
break | ||
if not f.completed: | ||
await f.cancelAndWait() | ||
#asyncSpawn sendMsgContinue(conn, f) | ||
p.semTxLimit[].release() | ||
|
||
except LPStreamError as exc: | ||
p.semTxLimit[].release() | ||
await conn.close() | ||
|
||
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)): Future[void] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is saltedId optional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
same SendMsg for control and published messages, so for some messages we did not need saltedID There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, we can improve this design as I believe high-prio are only control and msgs published by the local peer. It means no other peer can send an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, If we were enqueueing those msgs and we didn't dequeue them fast enough, peers could receive them via peers we sent first. But as we always send them immediately, it seems this can't happen. Makes sense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes, we dont need to check hedontwants for the messages published by the local peer,,, but i guess IDontWant should be transmitted with the highest priority There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, i believe, IDontWant should be issued before this step. Because even if a message is not valid, we can react early to stop the spread of such message. And in case of valid messages, we get to learn early that our peers have received this message There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant that checking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes for relayed messages only |
||
## Asynchronously sends an encoded message to a specified `PubSubPeer`. | ||
## | ||
## Parameters: | ||
|
@@ -354,7 +396,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v | |
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len | ||
Future[void].completed() | ||
elif isHighPriority or emptyQueues: | ||
let f = p.sendMsg(msg) | ||
let f = p.sendMsg(msg, saltedId) | ||
if not f.finished: | ||
p.rpcmessagequeue.sendPriorityQueue.addLast(f) | ||
when defined(pubsubpeer_queue_metrics): | ||
|
@@ -369,10 +411,13 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v | |
else: | ||
Future[void].completed() | ||
else: | ||
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg) | ||
when defined(pubsubpeer_queue_metrics): | ||
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) | ||
f | ||
if not saltedId.isSome: | ||
Future[void].completed() | ||
else: | ||
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithSaltedId(msg, saltedId.get)) | ||
when defined(pubsubpeer_queue_metrics): | ||
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) | ||
f | ||
|
||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = | ||
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. | ||
|
@@ -409,7 +454,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: | |
else: | ||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) | ||
|
||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.raises: [].} = | ||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)) {.raises: [].} = | ||
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization. | ||
## | ||
## Parameters: | ||
|
@@ -438,11 +483,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {. | |
|
||
if encoded.len > p.maxMessageSize and msg.messages.len > 1: | ||
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): | ||
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority) | ||
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority, saltedId) | ||
else: | ||
# If the message size is within limits, send it as is | ||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) | ||
asyncSpawn p.sendEncoded(encoded, isHighPriority) | ||
asyncSpawn p.sendEncoded(encoded, isHighPriority, saltedId) | ||
|
||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = | ||
for sentIHave in p.sentIHaves.mitems(): | ||
|
@@ -467,7 +512,8 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = | |
discard await race(p.rpcmessagequeue.sendPriorityQueue[^1]) | ||
when defined(pubsubpeer_queue_metrics): | ||
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) | ||
await p.sendMsg(msg) | ||
await p.sendMsg(msg.message, some(msg.sid)) | ||
#asyncSpawn p.sendMsg(msg.message, some(msg.sid)) | ||
|
||
proc startSendNonPriorityTask(p: PubSubPeer) = | ||
debug "starting sendNonPriorityTask", p | ||
|
@@ -489,7 +535,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = | |
proc new(T: typedesc[RpcMessageQueue]): T = | ||
return T( | ||
sendPriorityQueue: initDeque[Future[void]](), | ||
nonPriorityQueue: newAsyncQueue[seq[byte]]() | ||
nonPriorityQueue: newAsyncQueue[MessageWithSaltedId]() | ||
) | ||
|
||
proc new*( | ||
|
@@ -499,6 +545,7 @@ proc new*( | |
onEvent: OnEvent, | ||
codec: string, | ||
maxMessageSize: int, | ||
sem: ptr AsyncSemaphore, | ||
maxNumElementsInNonPriorityQueue: int = DefaultMaxNumElementsInNonPriorityQueue, | ||
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = | ||
|
||
|
@@ -516,3 +563,4 @@ proc new*( | |
result.sentIHaves.addFirst(default(HashSet[MessageId])) | ||
result.heDontWants.addFirst(default(HashSet[SaltedId])) | ||
result.startSendNonPriorityTask() | ||
result.semTxLimit = sem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate on why that is the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was intended to avoid A sending to B and B sending to A at the same time,,, but yes, its not making much impact,,, due to the preceding IDontWants