Skip to content

Commit

Permalink
Merge pull request #101 from ploxiln/send_confirm_channel_open
Browse files Browse the repository at this point in the history
channel: fix race between sending channel EOF and CLOSE messages
  • Loading branch information
ploxiln authored Nov 26, 2021
2 parents 53c7876 + 7bb15bd commit ece946e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
49 changes: 35 additions & 14 deletions paramiko/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def __init__(self, chanid):
self.timeout = None
#: Whether the connection has been closed
self.closed = False
# whether close message was actually sent by transport yet
self._close_sent = False
self.ultra_debug = False
self.lock = threading.Lock()
self.out_buffer_cv = threading.Condition(self.lock)
Expand Down Expand Up @@ -652,9 +654,8 @@ def close(self):
msgs = self._close_internal()
finally:
self.lock.release()
for m in msgs:
if m is not None:
self.transport._send_user_message(m)

self._close_internal_send(*msgs)

def recv_ready(self):
"""
Expand Down Expand Up @@ -947,11 +948,12 @@ def shutdown(self, how):
m = None
self.lock.acquire()
try:
m = self._send_eof()
if self.active:
m = self._eof_internal()
finally:
self.lock.release()
if m is not None:
self.transport._send_user_message(m)
self.transport._send_user_message(m, self._close_not_sent)

def shutdown_read(self):
"""
Expand Down Expand Up @@ -1020,9 +1022,8 @@ def _request_failed(self, m):
msgs = self._close_internal()
finally:
self.lock.release()
for m in msgs:
if m is not None:
self.transport._send_user_message(m)

self._close_internal_send(*msgs)

def _feed(self, m):
if isinstance(m, bytes):
Expand Down Expand Up @@ -1174,9 +1175,8 @@ def _handle_close(self, m):
self.transport._unlink_channel(self.chanid)
finally:
self.lock.release()
for m in msgs:
if m is not None:
self.transport._send_user_message(m)

self._close_internal_send(*msgs)

# ...internals...

Expand Down Expand Up @@ -1229,9 +1229,9 @@ def _set_closed(self):
if self._pipe is not None:
self._pipe.set_forever()

def _send_eof(self):
def _eof_internal(self):
# you are holding the lock.
if self.eof_sent or self.closed:
if self.eof_sent:
return None
m = Message()
m.add_byte(cMSG_CHANNEL_EOF)
Expand All @@ -1244,7 +1244,7 @@ def _close_internal(self):
# you are holding the lock.
if not self.active or self.closed:
return None, None
m1 = self._send_eof()
m1 = self._eof_internal()
m2 = Message()
m2.add_byte(cMSG_CHANNEL_CLOSE)
m2.add_int(self.remote_chanid)
Expand All @@ -1253,6 +1253,27 @@ def _close_internal(self):
# try to send meta-data (exit-status, etc)
return m1, m2

def _close_internal_send(self, eof, close):
if eof is not None:
self.transport._send_user_message(eof, self._close_not_sent)
if close is not None:
self.transport._send_user_message(close, self._set_close_sent)

# Only one close message can be generated, and other messages will not be generated after,
# but messages are sent by different threads taking the transport clear_to_send lock.
# Races are possible: a message generated before close may actually be sent after close.
# This is much more likely for EOF of stdin (recently made automatic by ChannelStdinFile)
# than for any other message, so only EOF uses this.
def _close_not_sent(self):
# caller should hold Transport clear_to_send_lock (not channel lock)
return not self._close_sent

# see _close_not_sent above
def _set_close_sent(self):
# caller should hold Transport clear_to_send_lock (not channel lock)
self._close_sent = True
return True

def _unlink(self):
# server connection could die before we become active:
# still signal the close!
Expand Down
5 changes: 3 additions & 2 deletions paramiko/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -1716,7 +1716,7 @@ def _unlink_channel(self, chanid):
def _send_message(self, data):
self.packetizer.send_message(data)

def _send_user_message(self, data):
def _send_user_message(self, data, confirm_callback=None):
"""
send a message, but block if we're in key negotiation. this is used
for user-initiated requests.
Expand All @@ -1734,7 +1734,8 @@ def _send_user_message(self, data):
if time.time() > start + self.clear_to_send_timeout:
raise SSHException('Key-exchange timed out waiting for key negotiation')
try:
self._send_message(data)
if confirm_callback is None or confirm_callback():
self._send_message(data)
finally:
self.clear_to_send_lock.release()

Expand Down

0 comments on commit ece946e

Please sign in to comment.