Skip to content

Commit

Permalink
Wait finished timeout (#216)
Browse files Browse the repository at this point in the history
* Added timeout handling in native SSHClient wait_finished. 
* Updated libssh client wait_finished timeout to be separate from session timeout - resolves #182. 
* Added single client timeout tests.
* Updated changelog, documentation.
* Added timeout, finished and unfinished commands as timeout exception arguments on parallel client join.
* Updated tunnel cleanup, error handling, timeout test.
  • Loading branch information
pkittenis authored Sep 3, 2020
1 parent a0b0967 commit c7d446e
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 41 deletions.
7 changes: 5 additions & 2 deletions Changelog.rst
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
Change Log
============

1.13.0 (unreleased)
++++++++++++++++++++
1.13.0
++++++

Changes
--------

* Added ``pssh.config.HostConfig`` for providing per-host configuration. Replaces dictionary ``host_config`` which is now deprecated. See `per-host configuration <https://parallel-ssh.readthedocs.io/en/latest/advanced.html#per-host-configuration>`_ documentation.
* ``ParallelSSHClient.scp_send`` and ``scp_recv`` with directory target path will now copy source file to directory keeping existing name instead of failing when recurse is off - #183.
* ``pssh.clients.ssh.SSHClient`` ``wait_finished`` timeout is now separate from ``SSHClient(timeout=<timeout>)`` session timeout.
* ``ParallelSSHClient.join`` with timeout now has finished and unfinished commands as ``Timeout`` exception arguments for use by client code.

Fixes
------
Expand All @@ -17,6 +19,7 @@ Fixes
* ``ParallelSSHClient.copy_file`` and ``scp_recv`` with recurse enabled would not create remote directories when copying empty local directories.
* ``ParallelSSHClient.scp_send`` would require SFTP when recurse is off and remote destination path contains directory - #157.
* ``ParallelSSHClient.scp_recv`` could block infinitely on large - 200-300MB or more - files.
* ``SSHClient.wait_finished`` would not apply timeout value given.


1.12.1
Expand Down
2 changes: 1 addition & 1 deletion pssh/clients/base/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def join(self, output, consume_output=False, timeout=None,
if unfinished_cmds:
raise Timeout(
"Timeout of %s sec(s) reached with commands "
"still running")
"still running", timeout, finished_cmds, unfinished_cmds)

def _join(self, host_out, consume_output=False, timeout=None,
encoding="utf-8"):
Expand Down
13 changes: 11 additions & 2 deletions pssh/clients/native/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from collections import deque
from warnings import warn

from gevent import sleep, spawn, get_hub
from gevent import sleep, spawn, get_hub, Timeout as GTimeout
from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN
from ssh2.exceptions import SFTPHandleError, SFTPProtocolError, \
Timeout as SSH2Timeout, AgentConnectionError, AgentListIdentitiesError, \
Expand Down Expand Up @@ -264,14 +264,23 @@ def wait_finished(self, channel, timeout=None):
:param channel: The channel to use.
:type channel: :py:class:`ssh2.channel.Channel`
:param timeout: Timeout value in seconds - defaults to no timeout.
:type timeout: float
:raises: :py:class:`pssh.exceptions.Timeout` after <timeout> seconds if
timeout given.
"""
if channel is None:
return
# If wait_eof() returns EAGAIN after a select with a timeout, it means
# it reached timeout without EOF and _select_timeout will raise
# timeout exception causing the channel to appropriately
# not be closed as the command is still running.
self._eagain(channel.wait_eof)
if timeout is not None:
with GTimeout(seconds=timeout, exception=Timeout):
self._eagain(channel.wait_eof)
else:
self._eagain(channel.wait_eof)
# Close channel to indicate no more commands will be sent over it
self.close_channel(channel)

Expand Down
26 changes: 13 additions & 13 deletions pssh/clients/native/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def _read_forward_sock(self, forward_sock, channel):
return
try:
data = forward_sock.recv(1024)
except Exception:
logger.exception("Forward socket read error:")
except Exception as ex:
logger.error("Forward socket read error: %s", ex)
sleep(1)
continue
data_len = len(data)
Expand All @@ -121,8 +121,8 @@ def _read_forward_sock(self, forward_sock, channel):
while data_written < data_len:
try:
rc, bytes_written = channel.write(data[data_written:])
except Exception:
logger.exception("Channel write error:")
except Exception as ex:
logger.error("Channel write error: %s", ex)
sleep(1)
continue
data_written += bytes_written
Expand Down Expand Up @@ -184,16 +184,16 @@ def _init_tunnel_client(self):
self.tunnel_open.set()

def cleanup(self):
for _sock in self._sockets:
if not _sock:
continue
try:
for i in range(len(self._sockets)):
_sock = self._sockets[i]
if _sock is not None and not _sock.closed:
_sock.close()
except Exception as ex:
logger.error("Exception while closing sockets - %s", ex)
if self.session is not None:
self._sockets[i] = None
self._sockets = None
if self.client is not None and self.session is not None:
self.client.disconnect()
self.session = None
self.client = None

def _consume_q(self):
while True:
Expand Down Expand Up @@ -259,8 +259,8 @@ def _start_tunnel(self, fw_host, fw_port):
try:
channel = self._open_channel_retries(fw_host, fw_port, local_port)
except Exception as ex:
logger.exception("Could not establish channel to %s:%s:",
fw_host, fw_port)
logger.error("Could not establish channel to %s:%s: %s",
fw_host, fw_port, ex)
self.exception = ex
forward_sock.close()
listen_socket.close()
Expand Down
27 changes: 16 additions & 11 deletions pssh/clients/ssh/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,22 +333,27 @@ def wait_finished(self, channel, timeout=None):
:param channel: The channel to use.
:type channel: :py:class:`ssh.channel.Channel`
:param timeout: Timeout value in seconds - defaults to no timeout.
:type timeout: float
:raises: :py:class:`pssh.exceptions.Timeout` after <timeout> seconds if
timeout given.
"""
if channel is None:
return
timeout = timeout if timeout else self.timeout
logger.debug("Sending EOF on channel %s", channel)
eagain(self.session, channel.send_eof, timeout=timeout)
try:
self._stdout_reader.get(timeout=timeout)
self._stderr_reader.get(timeout=timeout)
except GeventTimeout as ex:
logger.debug("Timed out waiting for readers..")
raise Timeout(ex)
eagain(self.session, channel.send_eof, timeout=self.timeout)
if timeout is not None:
with GeventTimeout(seconds=timeout, exception=Timeout):
logger.debug("Waiting for readers, timeout %s", timeout)
self._stdout_reader.get(timeout=timeout)
self._stderr_reader.get(timeout=timeout)
else:
logger.debug("Readers finished, closing channel")
# Close channel
self.close_channel(channel)
self._stdout_reader.get()
self._stderr_reader.get()
logger.debug("Readers finished, closing channel")
# Close channel
self.close_channel(channel)

def finished(self, channel):
"""Checks if remote command has finished - has server sent client
Expand Down
12 changes: 12 additions & 0 deletions tests/native/test_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import subprocess
import shutil
from hashlib import sha256
from datetime import datetime

from gevent import socket, sleep, spawn

Expand Down Expand Up @@ -239,6 +240,17 @@ def test_finished(self):
self.assertTrue(self.client.finished(channel))
self.assertListEqual(stdout, [b'me'])

def test_wait_finished_timeout(self):
channel = self.client.execute('sleep 2')
timeout = 1
self.assertFalse(self.client.finished(channel))
start = datetime.now()
self.assertRaises(Timeout, self.client.wait_finished, channel, timeout=timeout)
dt = datetime.now() - start
self.assertTrue(timeout*1.05 > dt.total_seconds() > timeout)
self.client.wait_finished(channel)
self.assertTrue(self.client.finished(channel))

def test_scp_abspath_recursion(self):
cur_dir = os.path.dirname(__file__)
dir_name_to_copy = 'a_dir'
Expand Down
12 changes: 7 additions & 5 deletions tests/native/test_tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ def test_tunnel_init(self):
self.assertTrue(tunnel.tunnel_open.is_set())
self.assertIsNotNone(tunnel.client)
tunnel.cleanup()
for _sock in tunnel._sockets:
self.assertTrue(_sock.closed)
self.assertIsNone(tunnel._sockets)
finally:
server.stop()

Expand Down Expand Up @@ -302,9 +301,12 @@ def test_tunnel_remote_host_timeout(self):
for _server in (server, remote_server):
_server.stop()
_server.join()
# Gevent timeout cannot be caught by stop_on_errors
self.assertRaises(GTimeout, client.run_command, self.cmd,
greenlet_timeout=1, stop_on_errors=False)
try:
client.run_command(self.cmd, greenlet_timeout=1)
except (GTimeout, Exception):
pass
else:
raise Exception("Command neither failed nor timeout raised")
finally:
for _server in (server, remote_server):
_server.stop()
Expand Down
19 changes: 12 additions & 7 deletions tests/ssh/test_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import unittest
import logging

from datetime import datetime

from ssh.session import Session
# from ssh.exceptions import SocketDisconnectError
from pssh.exceptions import AuthenticationException, ConnectionErrorException, \
Expand Down Expand Up @@ -66,13 +68,16 @@ def test_long_running_cmd(self):
exit_code = channel.get_exit_status()
self.assertEqual(exit_code, 2)

def test_client_wait_finished_timeout(self):
client = SSHClient(self.host, port=self.port,
pkey=self.user_key,
num_retries=1,
timeout=0.6)
chan = client.execute('sleep 1')
self.assertRaises(Timeout, client.wait_finished, chan)
def test_wait_finished_timeout(self):
channel = self.client.execute('sleep 2')
timeout = 1
self.assertFalse(self.client.finished(channel))
start = datetime.now()
self.assertRaises(Timeout, self.client.wait_finished, channel, timeout=timeout)
dt = datetime.now() - start
self.assertTrue(timeout*1.05 > dt.total_seconds() > timeout)
self.client.wait_finished(channel)
self.assertTrue(self.client.finished(channel))

def test_client_exec_timeout(self):
client = SSHClient(self.host, port=self.port,
Expand Down

0 comments on commit c7d446e

Please sign in to comment.