diff --git a/pysipp/__init__.py b/pysipp/__init__.py index 46eb248..fc703b8 100644 --- a/pysipp/__init__.py +++ b/pysipp/__init__.py @@ -20,16 +20,11 @@ ''' import sys from os.path import dirname -from . import launch, report, plugin, netplug, agent +from . import plugin, netplug, agent from .load import iter_scen_dirs from .agent import client, server -class SIPpFailure(RuntimeError): - """SIPp commands failed - """ - - __package__ = 'pysipp' __author__ = 'Tyler Goodlet (tgoodlet@gmail.com)' @@ -106,12 +101,11 @@ def scenario(dirpath=None, proxyaddr=None, autolocalsocks=True, # same as above scen = plugin.mng.hook.pysipp_conf_scen_protocol( agents=[uas, uac], confpy=None, - scenkwargs=scenkwargs ) - if proxyaddr: - assert isinstance( - proxyaddr, tuple), 'proxyaddr must be a (addr, port) tuple' + if proxyaddr is not None: + assert isinstance(proxyaddr, tuple), ( + 'proxyaddr must be a (addr, port) tuple') scen.clientdefaults.proxyaddr = proxyaddr return scen @@ -196,61 +190,5 @@ def pysipp_conf_scen(agents, scen): ua.rtp_echo = True -@plugin.hookimpl -def pysipp_new_runner(): - """Provision and assign a default cmd runner - """ - return launch.PopenRunner() - - -@plugin.hookimpl -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """"Run all rendered commands with the provided runner or the built-in - PopenRunner which runs commands locally. - """ - # use provided runner or default provided by hook - runner = runner or plugin.mng.hook.pysipp_new_runner() - agents = scen.prepare() - - def finalize(cmds2procs=None, timeout=180, raise_exc=True): - """Wait for all remaining agents in the scenario to finish executing - and perform error and logfile reporting. - """ - cmds2procs = cmds2procs or runner.get(timeout=timeout) - agents2procs = list(zip(agents, cmds2procs.values())) - msg = report.err_summary(agents2procs) - if msg: - # report logs and stderr - report.emit_logfiles(agents2procs) - if raise_exc: - # raise RuntimeError on agent failure(s) - # (HINT: to rerun type `scen()` from the debugger) - raise SIPpFailure(msg) - - return cmds2procs - - try: - # run all agents (raises RuntimeError on timeout) - cmds2procs = runner( - (ua.render() for ua in agents), - block=block, timeout=timeout - ) - except launch.TimeoutError: # sucessful timeout - cmds2procs = finalize(timeout=0, raise_exc=False) - if raise_exc: - raise - else: - # async - if not block: - # XXX async run must bundle up results for later processing - scen.finalize = finalize - return finalize - - # sync - finalize(cmds2procs, raise_exc=raise_exc) - - return runner - - # register the default hook set plugin.mng.register(sys.modules[__name__]) diff --git a/pysipp/agent.py b/pysipp/agent.py index 988eaeb..ceb211f 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -5,10 +5,14 @@ import re import itertools import tempfile +from functools import partial from copy import deepcopy from distutils import spawn from collections import namedtuple, OrderedDict -from . import command, plugin, utils + +import trio + +from . import command, plugin, utils, launch, report log = utils.get_logger() @@ -60,20 +64,21 @@ def name(self): ipcaddr = tuple_property(('ipc_host', 'ipc_port')) call_load = tuple_property(('rate', 'limit', 'call_count')) - def __call__(self, block=True, timeout=180, runner=None, raise_exc=True, - **kwargs): + + def __call__(self, *args, **kwargs): + return self.run(*args, **kwargs) + + def run( + self, + timeout=180, + **kwargs + ): # create and configure a temp scenario scen = plugin.mng.hook.pysipp_conf_scen_protocol( agents=[self], confpy=None, scenkwargs={}, ) - # run the standard protocol - # (attach allocted runner for reuse/post-portem) - return plugin.mng.hook.pysipp_run_protocol( - scen=scen, block=block, timeout=timeout, - runner=runner, - raise_exc=raise_exc, **kwargs - ) + return scen.run(timeout=timeout, **kwargs) def is_client(self): return 'uac' in self.name.lower() @@ -254,8 +259,13 @@ class ScenarioType(object): If called it will invoke the standard run hooks. """ - def __init__(self, agents, defaults, clientdefaults=None, - serverdefaults=None, confpy=None, enable_screen_file=True): + def __init__( + self, agents, defaults, clientdefaults=None, + serverdefaults=None, confpy=None, enable_screen_file=True + ): + # placeholder for process "runner" + self._runner = None + # agents iterable in launch-order self._agents = agents ua_attrs = UserAgent.keys() @@ -431,10 +441,30 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs): return type(self)( self.prepare(agents), self._defaults, confpy=self.mod) - def __call__(self, agents=None, block=True, timeout=180, runner=None, - raise_exc=True, copy_agents=False, **kwargs): - return plugin.mng.hook.pysipp_run_protocol( - scen=self, - block=block, timeout=timeout, runner=runner, - raise_exc=raise_exc, **kwargs + async def arun( + self, + timeout=180, + runner=None, + ): + agents = self.prepare() + runner = runner or launch.TrioRunner() + + return await launch.run_all_agents(runner, agents, timeout=timeout) + + def run( + self, + timeout=180, + **kwargs + ): + """Run scenario blocking to completion.""" + return trio.run( + partial( + self.arun, + timeout=timeout, + **kwargs + ) ) + + def __call__(self, *args, **kwargs): + # TODO: deprecation warning here + return self.run(*args, **kwargs) diff --git a/pysipp/launch.py b/pysipp/launch.py index a750128..63b2d29 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -1,17 +1,18 @@ """ Launchers for invoking SIPp user agents """ -import subprocess -import os import shlex -import select -import threading import signal +import subprocess import time from . import utils from pprint import pformat from collections import OrderedDict, namedtuple +import trio + +from . import report + log = utils.get_logger() Streams = namedtuple("Streams", "stdout stderr") @@ -21,30 +22,29 @@ class TimeoutError(Exception): "SIPp process timeout exception" -class PopenRunner(object): - """Run a sequence of SIPp agents asynchronously. If any process terminates - with a non-zero exit code, immediately kill all remaining processes and - collect std streams. +class SIPpFailure(RuntimeError): + """SIPp commands failed + """ + - Adheres to an interface similar to `multiprocessing.pool.AsyncResult`. +class TrioRunner(object): + """Run a sequence of SIPp cmds asynchronously. If any process terminates + with a non-zero exit code, immediately canacel all remaining processes and + collect std streams. """ def __init__( self, - subprocmod=subprocess, - osmod=os, - poller=select.epoll, ): - # these could optionally be rpyc proxy objs - self.spm = subprocmod - self.osm = osmod - self.poller = poller() - # collector thread placeholder - self._waiter = None # store proc results self._procs = OrderedDict() - def __call__(self, cmds, block=True, rate=300, **kwargs): - if self._waiter and self._waiter.is_alive(): + async def run( + self, + cmds, + rate=300, + **kwargs + ): + if self.is_alive(): raise RuntimeError( "Not all processes from a prior run have completed" ) @@ -52,85 +52,78 @@ def __call__(self, cmds, block=True, rate=300, **kwargs): raise RuntimeError( "Process results have not been cleared from previous run" ) - sp = self.spm - os = self.osm - DEVNULL = open(os.devnull, 'wb') - fds2procs = OrderedDict() - # run agent commands in sequence for cmd in cmds: log.debug( "launching cmd:\n\"{}\"\n".format(cmd)) - proc = sp.Popen( + proc = await trio.open_process( shlex.split(cmd), - stdout=DEVNULL, - stderr=sp.PIPE + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE ) - fd = proc.stderr.fileno() - log.debug("registering fd '{}' for pid '{}'".format( - fd, proc.pid)) - fds2procs[fd] = self._procs[cmd] = proc - # register for stderr hangup events - self.poller.register(proc.stderr.fileno(), select.EPOLLHUP) + self._procs[cmd] = proc + # limit launch rate time.sleep(1. / rate) - # launch waiter - self._waiter = threading.Thread(target=self._wait, args=(fds2procs,)) - self._waiter.daemon = True - self._waiter.start() - - return self.get(**kwargs) if block else self._procs + return self._procs - def _wait(self, fds2procs): - log.debug("started waiter for procs {}".format(fds2procs)) + async def get(self, timeout=180): + '''Block up to `timeout` seconds for all agents to complete. + Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + ''' signalled = None - left = len(fds2procs) - collected = 0 - while collected < left: - pairs = self.poller.poll() # wait on hangup events - log.debug("received hangup for pairs '{}'".format(pairs)) - for fd, status in pairs: - collected += 1 - proc = fds2procs[fd] - # attach streams so they can be read more then once - log.debug("collecting streams for {}".format(proc)) - proc.streams = Streams(*proc.communicate()) # timeout=2)) - if proc.returncode != 0 and not signalled: + + # taken mostly verbatim from ``trio.run_process()`` + async def read_output(stream): + chunks = [] + async with stream: + try: + while True: + chunk = await stream.receive_some(32768) + if not chunk: + break + chunks.append(chunk) + except trio.ClosedResourceError: + pass + + return b"".join(chunks) + + async def wait_on_proc(proc): + nonlocal signalled + async with proc as proc: + rc = await proc.wait() + if rc != 0 and not signalled: # stop all other agents if there is a failure signalled = self.stop() - log.debug("terminating waiter thread") + # collect stderr output + proc.stderr_output = await read_output(proc.stderr) + + try: + with trio.fail_after(timeout): + async with trio.open_nursery() as n: + for cmd, proc in self._procs.items(): + # async wait on each process to complete + n.start_soon(wait_on_proc, proc) + + return self._procs + + except trio.TooSlowError: + # kill all SIPp processes + signalled = self.stop() + # all procs were killed by SIGUSR1 + raise TimeoutError( + "pids '{}' failed to complete after '{}' seconds".format( + pformat([p.pid for p in signalled.values()]), timeout) + ) - def get(self, timeout=180): - '''Block up to `timeout` seconds for all agents to complete. - Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + def iterprocs(self): + '''Iterate all processes which are still alive yielding + (cmd, proc) pairs ''' - if self._waiter.is_alive(): - self._waiter.join(timeout=timeout) - - if self._waiter.is_alive(): - # kill them mfin SIPps - signalled = self.stop() - self._waiter.join(timeout=10) - - if self._waiter.is_alive(): - # try to stop a few more times - for _ in range(3): - signalled = self.stop() - self._waiter.join(timeout=1) - - if self._waiter.is_alive(): - # some procs failed to terminate via signalling - raise RuntimeError("Unable to kill all agents!?") - - # all procs were killed by SIGUSR1 - raise TimeoutError( - "pids '{}' failed to complete after '{}' seconds".format( - pformat([p.pid for p in signalled.values()]), timeout) - ) - - return self._procs + return ((cmd, proc) for cmd, proc in self._procs.items() + if proc and proc.poll() is None) def stop(self): '''Stop all agents with SIGUSR1 as per SIPp's signal handling @@ -151,25 +144,44 @@ def _signalall(self, signum): signalled[cmd] = proc return signalled - def iterprocs(self): - '''Iterate all processes which are still alive yielding - (cmd, proc) pairs - ''' - return ((cmd, proc) for cmd, proc in self._procs.items() - if proc and proc.poll() is None) - def is_alive(self): '''Return bool indicating whether some agents are still alive ''' return any(self.iterprocs()) - def ready(self): - '''Return bool indicating whether all agents have completed - ''' - return not self.is_alive() - def clear(self): '''Clear all processes from the last run ''' - assert self.ready(), "Not all processes have completed" + assert not self.is_alive(), "Not all processes have completed" self._procs.clear() + + +async def run_all_agents(runner, agents, timeout=180): + """Run a sequencec of agents using a ``TrioRunner``. + """ + async def finalize(): + # this might raise TimeoutError + cmds2procs = await runner.get(timeout=timeout) + agents2procs = list(zip(agents, cmds2procs.values())) + msg = report.err_summary(agents2procs) + if msg: + # report logs and stderr + await report.emit_logfiles(agents2procs) + raise SIPpFailure(msg) + + return cmds2procs + + try: + await runner.run( + (ua.render() for ua in agents), + timeout=timeout + ) + await finalize() + return runner + except TimeoutError as terr: + # print error logs even when we timeout + try: + await finalize() + except SIPpFailure as err: + assert 'exit code -9' in str(err) + raise terr diff --git a/pysipp/report.py b/pysipp/report.py index 9643892..b4dbb49 100644 --- a/pysipp/report.py +++ b/pysipp/report.py @@ -16,6 +16,7 @@ 99: "Normal exit without calls processed", -1: "Fatal error", -2: "Fatal error binding a socket", + -9: "Signalled to stop with SIGUSR1", -10: "Signalled to stop with SIGUSR1", 254: "Connection Error: socket already in use", 255: "Command or syntax error: check stderr output", @@ -41,7 +42,7 @@ def err_summary(agents2procs): return msg -def emit_logfiles(agents2procs, level='warn', max_lines=100): +async def emit_logfiles(agents2procs, level='warn', max_lines=100): """Log all available SIPp log-file contents """ emit = getattr(log, level) @@ -49,7 +50,7 @@ def emit_logfiles(agents2procs, level='warn', max_lines=100): # print stderr emit("stderr for '{}' @ {}\n{}\n".format( - ua.name, ua.srcaddr, proc.streams.stderr)) + ua.name, ua.srcaddr, proc.stderr_output)) # FIXME: no idea, but some logs are not being printed without this # logging mod bug? time.sleep(0.01)