From eebeb892b7ca3cabc92b09b490b189e8bdf11db4 Mon Sep 17 00:00:00 2001 From: Jean-Louis Fuchs Date: Wed, 13 Mar 2024 19:47:03 +0100 Subject: [PATCH] feat: run_command log error before raising --- pyaptly/util.py | 58 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/pyaptly/util.py b/pyaptly/util.py index 696bd7c..344ee4c 100644 --- a/pyaptly/util.py +++ b/pyaptly/util.py @@ -3,12 +3,15 @@ import logging import os import subprocess +import sys +import traceback from pathlib import Path - -from colorama import Fore, init from subprocess import PIPE, CalledProcessError # noqa: F401 +from tempfile import NamedTemporaryFile from typing import Optional, Sequence +from colorama import Fore, init + _DEFAULT_KEYSERVER: str = "hkps://keys.openpgp.org" _PYTEST_KEYSERVER: Optional[str] = None @@ -19,7 +22,7 @@ Command call cmd: {cmd} {color_begin}-> {returncode}{color_end} """.strip() -OUTPUT_LOG = " {out_type}: '{output}'" +OUTPUT_LOG = " {out_type}:{white_space}'{output}'" _indent = " " * 15 _isatty_cache: bool | None = None @@ -28,6 +31,13 @@ lg = logging.getLogger(__name__) +def write_traceback(): # pragma: no cover + with NamedTemporaryFile("w", delete=False) as tmp: + tmp.write(traceback.format_exc()) + tmp.close() + return tmp.name + + def isatty(): global _isatty_cache if _isatty_cache is None: @@ -75,27 +85,48 @@ def run_command( """ added_stdout = False added_stderr = False - # TODO assert PIPE or None if "stdout" not in kwargs: kwargs["stdout"] = PIPE added_stdout = True + else: + assert kwargs["stdout"] in (PIPE, None) if "stderr" not in kwargs: kwargs["stderr"] = PIPE added_stderr = True + else: + assert kwargs["stdout"] in (PIPE, None) # pragma: no cover + # If we want to log stdout/err before raising CalledProcessError we have to + # check ourselves + check = False + if "check" in kwargs: + check = kwargs["check"] + del kwargs["check"] result = None + tb = "" if decode and "encoding" not in kwargs: kwargs["encoding"] = "UTF-8" try: result = subprocess.run(cmd_args, **kwargs) + if check and result.returncode: + raise CalledProcessError( + result.returncode, + result.args, + output=result.stdout, + stderr=result.stderr, + ) + except Exception: + if "pytest" not in sys.modules: + tb = write_traceback() # pragma: no cover + raise finally: if result: - log_msg = format_run_result(result, result.returncode) + log_msg = format_run_result(result, result.returncode, tb) if result.returncode == 0: lg.info(log_msg) else: if not hide_error or lg.root.level <= 20: lg.error(log_msg) - # Do not change returned result by debug mode + # This function should not alter the returned result if added_stdout: delattr(result, "stdout") if added_stderr: @@ -132,7 +163,7 @@ def indent_out(output: bytes | str) -> str: return "\n".join(result) -def format_run_result(result: subprocess.CompletedProcess, returncode: int): +def format_run_result(result: subprocess.CompletedProcess, returncode: int, tb: str): """Format a CompletedProcess result log.""" color_begin = "" color_end = "" @@ -153,11 +184,20 @@ def format_run_result(result: subprocess.CompletedProcess, returncode: int): stderr=indent_out(result.stderr), ) ] - for out_type, output in [("stdout", result.stdout), ("stderr", result.stderr)]: + for out_type, output in [ + ("stdout", result.stdout), + ("stderr", result.stderr), + ("traceback", tb), + ]: output = output.strip() if output: output = indent_out(output) - msg.append(OUTPUT_LOG.format(out_type=out_type, output=output)) + white_space = " " * (11 - len(out_type)) + msg.append( + OUTPUT_LOG.format( + out_type=out_type, white_space=white_space, output=output + ) + ) pass return "\n".join(msg)