Skip to content

Commit

Permalink
mlex futures
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Feb 27, 2024
1 parent f5c5b3e commit c0b6bab
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions libp2p/muxers/mplex/lpchannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes
closing: Future[void]

func shortLog*(s: LPChannel): auto =
try:
Expand Down Expand Up @@ -97,31 +98,31 @@ proc closeUnderlying(s: LPChannel): Future[void] {.async.} =

proc reset*(s: LPChannel) {.async.} =
if s.isClosed:
trace "Already closed", s
debug "Already closed", s
return

s.closing = newFuture[void]()
s.isClosed = true
s.closedLocal = true
s.localReset = not s.remoteReset

trace "Resetting channel", s, len = s.len
debug "Resetting channel", s, len = s.len

if s.isOpen and not s.conn.isClosed:
# If the connection is still active, notify the other end
proc resetMessage() {.async.} =
try:
trace "sending reset message", s, conn = s.conn
debug "sending reset message", s, conn = s.conn
await s.conn.writeMsg(s.id, s.resetCode) # write reset
except CatchableError as exc:
# No cancellations
await s.conn.close()
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
debug "Can't send reset message", s, conn = s.conn, msg = exc.msg

asyncSpawn resetMessage()
await resetMessage()

await s.closeImpl() # noraises, nocancels

trace "Channel reset", s
s.closing.complete()
debug "Channel reset", s

method close*(s: LPChannel) {.async.} =
## Close channel for writing - a message will be sent to the other peer
Expand Down Expand Up @@ -200,6 +201,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
if s.remoteReset:
raise newLPStreamResetError()
if s.closedLocal:
await s.closing
raise newLPStreamClosedError()
if s.conn.closed:
raise newLPStreamConnDownError()
Expand All @@ -213,6 +215,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
when defined(libp2p_mplex_metrics):
libp2p_mplex_qlenclose.inc()
await s.reset()
echo "closing"
await s.conn.close()
return

Expand Down Expand Up @@ -293,8 +296,9 @@ proc init*(
msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn,
closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn,
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
dir: if initiator: Direction.Out else: Direction.In)

dir: if initiator: Direction.Out else: Direction.In,
closing: newFuture[void]())
chann.closing.complete()
chann.initStream()

when chronicles.enabledLogLevel == LogLevel.TRACE:
Expand Down

0 comments on commit c0b6bab

Please sign in to comment.