From fde07e92fe22eabb9cd9504d7aff969eb5b5a024 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 26 Feb 2024 19:56:00 +0100 Subject: [PATCH] does not finish write futures while the channel is being closed --- libp2p/muxers/mplex/lpchannel.nim | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index f81c4afa5e..bc58bb9243 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -61,6 +61,7 @@ type closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code writes*: int # In-flight writes + closing: Future[void] func shortLog*(s: LPChannel): auto = try: @@ -99,7 +100,7 @@ proc reset*(s: LPChannel) {.async.} = if s.isClosed: trace "Already closed", s return - + s.closing = newFuture[void]() s.isClosed = true s.closedLocal = true s.localReset = not s.remoteReset @@ -117,10 +118,10 @@ proc reset*(s: LPChannel) {.async.} = await s.conn.close() trace "Can't send reset message", s, conn = s.conn, msg = exc.msg - asyncSpawn resetMessage() + await resetMessage() await s.closeImpl() # noraises, nocancels - + s.closing.complete() trace "Channel reset", s method close*(s: LPChannel) {.async.} = @@ -130,6 +131,8 @@ method close*(s: LPChannel) {.async.} = if s.closedLocal: trace "Already closed", s return + + s.closing = newFuture[void]() s.closedLocal = true trace "Closing channel", s, conn = s.conn, len = s.len @@ -147,7 +150,7 @@ method close*(s: LPChannel) {.async.} = trace "Cannot send close message", s, id = s.id, msg = exc.msg await s.closeUnderlying() # maybe already eofed - + s.closing.complete() trace "Closed channel", s, len = s.len method initStream*(s: LPChannel) = @@ -200,6 +203,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = if s.remoteReset: raise newLPStreamResetError() if s.closedLocal: + await s.closing raise newLPStreamClosedError() if s.conn.closed: raise newLPStreamConnDownError() @@ -293,8 +297,9 @@ proc init*( msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, - dir: if initiator: Direction.Out else: Direction.In) - + dir: if initiator: Direction.Out else: Direction.In, + closing: newFuture[void]()) + chann.closing.complete() chann.initStream() when chronicles.enabledLogLevel == LogLevel.TRACE: