Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

staggered_sending #1093

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import ./errors as pubsub_errors,
../../peerid,
../../peerinfo,
../../errors,
../../utility
../../utility,
../../utils/semaphore

import metrics
import stew/results
Expand Down Expand Up @@ -80,6 +81,11 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab
declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"])
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])

#The number of simultaneous transmissions. A smaller number can speedup message reception and relaying
#ideally this should be an adaptive number, increasing for low bandwidth peers and decreasing for high bandwidth peers
const
DefaultMaxSimultaneousTx* = 2

type
InitializationError* = object of LPError

Expand Down Expand Up @@ -128,6 +134,7 @@ type
rng*: ref HmacDrbgContext

knownTopics*: HashSet[string]
semTxLimit: AsyncSemaphore

method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
## handle peer disconnects
Expand Down Expand Up @@ -311,7 +318,7 @@ method getOrCreatePeer*(
p.onPubSubPeerEvent(peer, event)

# create new pubsub peer
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize)
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, addr p.semTxLimit)
debug "created new pubsub peer", peerId

p.peers[peerId] = pubSubPeer
Expand Down Expand Up @@ -510,6 +517,7 @@ method initPubSub*(p: PubSub)
p.observers = new(seq[PubSubObserver])
if p.msgIdProvider == nil:
p.msgIdProvider = defaultMsgIdProvider
p.semTxLimit = newAsyncSemaphore(DefaultMaxSimultaneousTx)

method addValidator*(p: PubSub,
topic: varargs[string],
Expand Down
44 changes: 30 additions & 14 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import rpc/[messages, message, protobuf],
../../stream/connection,
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
../../utility,
../../utils/semaphore

export peerid, connection, deques

Expand Down Expand Up @@ -91,6 +92,7 @@ type
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]

semTxLimit: ptr AsyncSemaphore #Control Max simultaneous transmissions to speed up indivisual receptions
rpcmessagequeue: RpcMessageQueue
maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue.
disconnected: bool
Expand Down Expand Up @@ -311,23 +313,35 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} =
debug "No send connection", p, msg = shortLog(msg)
return

trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
await sendMsgContinue(conn, conn.writeLp(msg))

proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] =
proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] {.async.}=
if p.sendConn == nil or p.sendConn.closed():
await sendMsgSlow(p, msg)
if p.sendConn != nil and not p.sendConn.closed():
# Fast path that avoids copying msg (which happens for {.async.})
let conn = p.sendConn

trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
let f = conn.writeLp(msg)
if not f.completed():
sendMsgContinue(conn, f)
else:
f
else:
sendMsgSlow(p, msg)

if msg.len < 2000: #ideally we should only forward idontwant messages through this fast path
let f = conn.writeLp(msg)
await f or sleepAsync(5.milliseconds)
if not f.completed:
asyncSpawn sendMsgContinue(conn, f)
else:
await p.semTxLimit[].acquire()
try:
let f = conn.writeLp(msg)
#ideally sleep time should be based on peer bandwidth and message size
await f or sleepAsync(450.milliseconds)

if not f.completed:
asyncSpawn sendMsgContinue(conn, f)
p.semTxLimit[].release()

except CatchableError as exc:
p.semTxLimit[].release()
await conn.close()


proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] =
## Asynchronously sends an encoded message to a specified `PubSubPeer`.
##
Expand Down Expand Up @@ -499,6 +513,7 @@ proc new*(
onEvent: OnEvent,
codec: string,
maxMessageSize: int,
sem: ptr AsyncSemaphore,
maxNumElementsInNonPriorityQueue: int = DefaultMaxNumElementsInNonPriorityQueue,
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =

Expand All @@ -516,3 +531,4 @@ proc new*(
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[SaltedId]))
result.startSendNonPriorityTask()
result.semTxLimit = sem
Loading