From c7d446efc91093fc9d015e9278e2f6eb5716eedc Mon Sep 17 00:00:00 2001 From: Panos Date: Thu, 3 Sep 2020 13:59:58 +0100 Subject: [PATCH] Wait finished timeout (#216) * Added timeout handling in native SSHClient wait_finished. * Updated libssh client wait_finished timeout to be separate from session timeout - resolves #182. * Added single client timeout tests. * Updated changelog, documentation. * Added timeout, finished and unfinished commands as timeout exception arguments on parallel client join. * Updated tunnel cleanup, error handling, timeout test. --- Changelog.rst | 7 +++++-- pssh/clients/base/parallel.py | 2 +- pssh/clients/native/single.py | 13 +++++++++++-- pssh/clients/native/tunnel.py | 26 +++++++++++++------------- pssh/clients/ssh/single.py | 27 ++++++++++++++++----------- tests/native/test_single_client.py | 12 ++++++++++++ tests/native/test_tunnel.py | 12 +++++++----- tests/ssh/test_single_client.py | 19 ++++++++++++------- 8 files changed, 77 insertions(+), 41 deletions(-) diff --git a/Changelog.rst b/Changelog.rst index f8d753ed..27596f61 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -1,14 +1,16 @@ Change Log ============ -1.13.0 (unreleased) -++++++++++++++++++++ +1.13.0 +++++++ Changes -------- * Added ``pssh.config.HostConfig`` for providing per-host configuration. Replaces dictionary ``host_config`` which is now deprecated. See `per-host configuration `_ documentation. * ``ParallelSSHClient.scp_send`` and ``scp_recv`` with directory target path will now copy source file to directory keeping existing name instead of failing when recurse is off - #183. +* ``pssh.clients.ssh.SSHClient`` ``wait_finished`` timeout is now separate from ``SSHClient(timeout=)`` session timeout. +* ``ParallelSSHClient.join`` with timeout now has finished and unfinished commands as ``Timeout`` exception arguments for use by client code. Fixes ------ @@ -17,6 +19,7 @@ Fixes * ``ParallelSSHClient.copy_file`` and ``scp_recv`` with recurse enabled would not create remote directories when copying empty local directories. * ``ParallelSSHClient.scp_send`` would require SFTP when recurse is off and remote destination path contains directory - #157. * ``ParallelSSHClient.scp_recv`` could block infinitely on large - 200-300MB or more - files. +* ``SSHClient.wait_finished`` would not apply timeout value given. 1.12.1 diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index daa8fd68..63a787e1 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -343,7 +343,7 @@ def join(self, output, consume_output=False, timeout=None, if unfinished_cmds: raise Timeout( "Timeout of %s sec(s) reached with commands " - "still running") + "still running", timeout, finished_cmds, unfinished_cmds) def _join(self, host_out, consume_output=False, timeout=None, encoding="utf-8"): diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 77e2ba9f..30bb76e0 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -20,7 +20,7 @@ from collections import deque from warnings import warn -from gevent import sleep, spawn, get_hub +from gevent import sleep, spawn, get_hub, Timeout as GTimeout from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN from ssh2.exceptions import SFTPHandleError, SFTPProtocolError, \ Timeout as SSH2Timeout, AgentConnectionError, AgentListIdentitiesError, \ @@ -264,6 +264,11 @@ def wait_finished(self, channel, timeout=None): :param channel: The channel to use. :type channel: :py:class:`ssh2.channel.Channel` + :param timeout: Timeout value in seconds - defaults to no timeout. + :type timeout: float + + :raises: :py:class:`pssh.exceptions.Timeout` after seconds if + timeout given. """ if channel is None: return @@ -271,7 +276,11 @@ def wait_finished(self, channel, timeout=None): # it reached timeout without EOF and _select_timeout will raise # timeout exception causing the channel to appropriately # not be closed as the command is still running. - self._eagain(channel.wait_eof) + if timeout is not None: + with GTimeout(seconds=timeout, exception=Timeout): + self._eagain(channel.wait_eof) + else: + self._eagain(channel.wait_eof) # Close channel to indicate no more commands will be sent over it self.close_channel(channel) diff --git a/pssh/clients/native/tunnel.py b/pssh/clients/native/tunnel.py index f5f0de9a..6ab6807c 100644 --- a/pssh/clients/native/tunnel.py +++ b/pssh/clients/native/tunnel.py @@ -110,8 +110,8 @@ def _read_forward_sock(self, forward_sock, channel): return try: data = forward_sock.recv(1024) - except Exception: - logger.exception("Forward socket read error:") + except Exception as ex: + logger.error("Forward socket read error: %s", ex) sleep(1) continue data_len = len(data) @@ -121,8 +121,8 @@ def _read_forward_sock(self, forward_sock, channel): while data_written < data_len: try: rc, bytes_written = channel.write(data[data_written:]) - except Exception: - logger.exception("Channel write error:") + except Exception as ex: + logger.error("Channel write error: %s", ex) sleep(1) continue data_written += bytes_written @@ -184,16 +184,16 @@ def _init_tunnel_client(self): self.tunnel_open.set() def cleanup(self): - for _sock in self._sockets: - if not _sock: - continue - try: + for i in range(len(self._sockets)): + _sock = self._sockets[i] + if _sock is not None and not _sock.closed: _sock.close() - except Exception as ex: - logger.error("Exception while closing sockets - %s", ex) - if self.session is not None: + self._sockets[i] = None + self._sockets = None + if self.client is not None and self.session is not None: self.client.disconnect() self.session = None + self.client = None def _consume_q(self): while True: @@ -259,8 +259,8 @@ def _start_tunnel(self, fw_host, fw_port): try: channel = self._open_channel_retries(fw_host, fw_port, local_port) except Exception as ex: - logger.exception("Could not establish channel to %s:%s:", - fw_host, fw_port) + logger.error("Could not establish channel to %s:%s: %s", + fw_host, fw_port, ex) self.exception = ex forward_sock.close() listen_socket.close() diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 936db726..ed33a3a7 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -333,22 +333,27 @@ def wait_finished(self, channel, timeout=None): :param channel: The channel to use. :type channel: :py:class:`ssh.channel.Channel` + :param timeout: Timeout value in seconds - defaults to no timeout. + :type timeout: float + + :raises: :py:class:`pssh.exceptions.Timeout` after seconds if + timeout given. """ if channel is None: return - timeout = timeout if timeout else self.timeout logger.debug("Sending EOF on channel %s", channel) - eagain(self.session, channel.send_eof, timeout=timeout) - try: - self._stdout_reader.get(timeout=timeout) - self._stderr_reader.get(timeout=timeout) - except GeventTimeout as ex: - logger.debug("Timed out waiting for readers..") - raise Timeout(ex) + eagain(self.session, channel.send_eof, timeout=self.timeout) + if timeout is not None: + with GeventTimeout(seconds=timeout, exception=Timeout): + logger.debug("Waiting for readers, timeout %s", timeout) + self._stdout_reader.get(timeout=timeout) + self._stderr_reader.get(timeout=timeout) else: - logger.debug("Readers finished, closing channel") - # Close channel - self.close_channel(channel) + self._stdout_reader.get() + self._stderr_reader.get() + logger.debug("Readers finished, closing channel") + # Close channel + self.close_channel(channel) def finished(self, channel): """Checks if remote command has finished - has server sent client diff --git a/tests/native/test_single_client.py b/tests/native/test_single_client.py index cfddae73..83a86751 100644 --- a/tests/native/test_single_client.py +++ b/tests/native/test_single_client.py @@ -22,6 +22,7 @@ import subprocess import shutil from hashlib import sha256 +from datetime import datetime from gevent import socket, sleep, spawn @@ -239,6 +240,17 @@ def test_finished(self): self.assertTrue(self.client.finished(channel)) self.assertListEqual(stdout, [b'me']) + def test_wait_finished_timeout(self): + channel = self.client.execute('sleep 2') + timeout = 1 + self.assertFalse(self.client.finished(channel)) + start = datetime.now() + self.assertRaises(Timeout, self.client.wait_finished, channel, timeout=timeout) + dt = datetime.now() - start + self.assertTrue(timeout*1.05 > dt.total_seconds() > timeout) + self.client.wait_finished(channel) + self.assertTrue(self.client.finished(channel)) + def test_scp_abspath_recursion(self): cur_dir = os.path.dirname(__file__) dir_name_to_copy = 'a_dir' diff --git a/tests/native/test_tunnel.py b/tests/native/test_tunnel.py index f757bc57..c35bf722 100644 --- a/tests/native/test_tunnel.py +++ b/tests/native/test_tunnel.py @@ -151,8 +151,7 @@ def test_tunnel_init(self): self.assertTrue(tunnel.tunnel_open.is_set()) self.assertIsNotNone(tunnel.client) tunnel.cleanup() - for _sock in tunnel._sockets: - self.assertTrue(_sock.closed) + self.assertIsNone(tunnel._sockets) finally: server.stop() @@ -302,9 +301,12 @@ def test_tunnel_remote_host_timeout(self): for _server in (server, remote_server): _server.stop() _server.join() - # Gevent timeout cannot be caught by stop_on_errors - self.assertRaises(GTimeout, client.run_command, self.cmd, - greenlet_timeout=1, stop_on_errors=False) + try: + client.run_command(self.cmd, greenlet_timeout=1) + except (GTimeout, Exception): + pass + else: + raise Exception("Command neither failed nor timeout raised") finally: for _server in (server, remote_server): _server.stop() diff --git a/tests/ssh/test_single_client.py b/tests/ssh/test_single_client.py index ff0b8438..e565045c 100644 --- a/tests/ssh/test_single_client.py +++ b/tests/ssh/test_single_client.py @@ -18,6 +18,8 @@ import unittest import logging +from datetime import datetime + from ssh.session import Session # from ssh.exceptions import SocketDisconnectError from pssh.exceptions import AuthenticationException, ConnectionErrorException, \ @@ -66,13 +68,16 @@ def test_long_running_cmd(self): exit_code = channel.get_exit_status() self.assertEqual(exit_code, 2) - def test_client_wait_finished_timeout(self): - client = SSHClient(self.host, port=self.port, - pkey=self.user_key, - num_retries=1, - timeout=0.6) - chan = client.execute('sleep 1') - self.assertRaises(Timeout, client.wait_finished, chan) + def test_wait_finished_timeout(self): + channel = self.client.execute('sleep 2') + timeout = 1 + self.assertFalse(self.client.finished(channel)) + start = datetime.now() + self.assertRaises(Timeout, self.client.wait_finished, channel, timeout=timeout) + dt = datetime.now() - start + self.assertTrue(timeout*1.05 > dt.total_seconds() > timeout) + self.client.wait_finished(channel) + self.assertTrue(self.client.finished(channel)) def test_client_exec_timeout(self): client = SSHClient(self.host, port=self.port,