From c0b6bab834b34738d9668ff1c78e57c21ffb4e6b Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 27 Feb 2024 16:46:09 +0100 Subject: [PATCH] mlex futures --- libp2p/muxers/mplex/lpchannel.nim | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index f81c4afa5e..aabba8b555 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -61,6 +61,7 @@ type closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code writes*: int # In-flight writes + closing: Future[void] func shortLog*(s: LPChannel): auto = try: @@ -97,31 +98,31 @@ proc closeUnderlying(s: LPChannel): Future[void] {.async.} = proc reset*(s: LPChannel) {.async.} = if s.isClosed: - trace "Already closed", s + debug "Already closed", s return - + s.closing = newFuture[void]() s.isClosed = true s.closedLocal = true s.localReset = not s.remoteReset - trace "Resetting channel", s, len = s.len + debug "Resetting channel", s, len = s.len if s.isOpen and not s.conn.isClosed: # If the connection is still active, notify the other end proc resetMessage() {.async.} = try: - trace "sending reset message", s, conn = s.conn + debug "sending reset message", s, conn = s.conn await s.conn.writeMsg(s.id, s.resetCode) # write reset except CatchableError as exc: # No cancellations await s.conn.close() - trace "Can't send reset message", s, conn = s.conn, msg = exc.msg + debug "Can't send reset message", s, conn = s.conn, msg = exc.msg - asyncSpawn resetMessage() + await resetMessage() await s.closeImpl() # noraises, nocancels - - trace "Channel reset", s + s.closing.complete() + debug "Channel reset", s method close*(s: LPChannel) {.async.} = ## Close channel for writing - a message will be sent to the other peer @@ -200,6 +201,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = if s.remoteReset: raise newLPStreamResetError() if s.closedLocal: + await s.closing raise newLPStreamClosedError() if s.conn.closed: raise newLPStreamConnDownError() @@ -213,6 +215,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = when defined(libp2p_mplex_metrics): libp2p_mplex_qlenclose.inc() await s.reset() + echo "closing" await s.conn.close() return @@ -293,8 +296,9 @@ proc init*( msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, - dir: if initiator: Direction.Out else: Direction.In) - + dir: if initiator: Direction.Out else: Direction.In, + closing: newFuture[void]()) + chann.closing.complete() chann.initStream() when chronicles.enabledLogLevel == LogLevel.TRACE: