Skip to content

Commit

Permalink
feat: run_command log error before raising
Browse files Browse the repository at this point in the history
  • Loading branch information
Jean-Louis Fuchs committed Mar 13, 2024
1 parent 828bd66 commit eebeb89
Showing 1 changed file with 49 additions and 9 deletions.
58 changes: 49 additions & 9 deletions pyaptly/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import logging
import os
import subprocess
import sys
import traceback
from pathlib import Path

from colorama import Fore, init
from subprocess import PIPE, CalledProcessError # noqa: F401
from tempfile import NamedTemporaryFile
from typing import Optional, Sequence

from colorama import Fore, init

_DEFAULT_KEYSERVER: str = "hkps://keys.openpgp.org"
_PYTEST_KEYSERVER: Optional[str] = None

Expand All @@ -19,7 +22,7 @@
Command call
cmd: {cmd} {color_begin}-> {returncode}{color_end}
""".strip()
OUTPUT_LOG = " {out_type}: '{output}'"
OUTPUT_LOG = " {out_type}:{white_space}'{output}'"
_indent = " " * 15

_isatty_cache: bool | None = None
Expand All @@ -28,6 +31,13 @@
lg = logging.getLogger(__name__)


def write_traceback(): # pragma: no cover
with NamedTemporaryFile("w", delete=False) as tmp:
tmp.write(traceback.format_exc())
tmp.close()
return tmp.name


def isatty():
global _isatty_cache
if _isatty_cache is None:
Expand Down Expand Up @@ -75,27 +85,48 @@ def run_command(
"""
added_stdout = False
added_stderr = False
# TODO assert PIPE or None
if "stdout" not in kwargs:
kwargs["stdout"] = PIPE
added_stdout = True
else:
assert kwargs["stdout"] in (PIPE, None)
if "stderr" not in kwargs:
kwargs["stderr"] = PIPE
added_stderr = True
else:
assert kwargs["stdout"] in (PIPE, None) # pragma: no cover
# If we want to log stdout/err before raising CalledProcessError we have to
# check ourselves
check = False
if "check" in kwargs:
check = kwargs["check"]
del kwargs["check"]
result = None
tb = ""
if decode and "encoding" not in kwargs:
kwargs["encoding"] = "UTF-8"
try:
result = subprocess.run(cmd_args, **kwargs)
if check and result.returncode:
raise CalledProcessError(
result.returncode,
result.args,
output=result.stdout,
stderr=result.stderr,
)
except Exception:
if "pytest" not in sys.modules:
tb = write_traceback() # pragma: no cover
raise
finally:
if result:
log_msg = format_run_result(result, result.returncode)
log_msg = format_run_result(result, result.returncode, tb)
if result.returncode == 0:
lg.info(log_msg)
else:
if not hide_error or lg.root.level <= 20:
lg.error(log_msg)
# Do not change returned result by debug mode
# This function should not alter the returned result
if added_stdout:
delattr(result, "stdout")
if added_stderr:
Expand Down Expand Up @@ -132,7 +163,7 @@ def indent_out(output: bytes | str) -> str:
return "\n".join(result)


def format_run_result(result: subprocess.CompletedProcess, returncode: int):
def format_run_result(result: subprocess.CompletedProcess, returncode: int, tb: str):
"""Format a CompletedProcess result log."""
color_begin = ""
color_end = ""
Expand All @@ -153,11 +184,20 @@ def format_run_result(result: subprocess.CompletedProcess, returncode: int):
stderr=indent_out(result.stderr),
)
]
for out_type, output in [("stdout", result.stdout), ("stderr", result.stderr)]:
for out_type, output in [
("stdout", result.stdout),
("stderr", result.stderr),
("traceback", tb),
]:
output = output.strip()
if output:
output = indent_out(output)
msg.append(OUTPUT_LOG.format(out_type=out_type, output=output))
white_space = " " * (11 - len(out_type))
msg.append(
OUTPUT_LOG.format(
out_type=out_type, white_space=white_space, output=output
)
)
pass
return "\n".join(msg)

Expand Down

0 comments on commit eebeb89

Please sign in to comment.