Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small fixes here and there #474

Merged
merged 7 commits into from
Jan 4, 2024
14 changes: 10 additions & 4 deletions src/gallia/command/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from argparse import ArgumentParser, Namespace
from datetime import UTC, datetime
from enum import Enum, unique
from logging import Handler
from pathlib import Path
from subprocess import CalledProcessError, run
from tempfile import gettempdir
Expand Down Expand Up @@ -104,6 +105,8 @@ class BaseCommand(ABC):
#: a log message with level critical is logged.
CATCHED_EXCEPTIONS: list[type[Exception]] = []

log_file_handlers: list[Handler]

def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None:
self.id = camel_to_snake(self.__class__.__name__)
self.parser = parser
Expand All @@ -124,6 +127,7 @@ def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None:
self.db_handler: DBHandler | None = None
self.configure_class_parser()
self.configure_parser()
self.log_file_handlers = []

@abstractmethod
def run(self, args: Namespace) -> int:
Expand Down Expand Up @@ -381,10 +385,12 @@ def entry_point(self, args: Namespace) -> int:
args.artifacts_base,
args.artifacts_dir,
)
add_zst_log_handler(
logger_name="gallia",
filepath=self.artifacts_dir.joinpath(FileNames.LOGFILE.value),
file_log_level=get_file_log_level(args),
self.log_file_handlers.append(
add_zst_log_handler(
logger_name="gallia",
filepath=self.artifacts_dir.joinpath(FileNames.LOGFILE.value),
file_log_level=get_file_log_level(args),
)
)

if args.hooks:
Expand Down
4 changes: 0 additions & 4 deletions src/gallia/commands/scan/uds/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ async def perform_scan(
f"{g_repr(sid)}: {e!r} occurred, this needs to be investigated!"
)
continue
except Exception as e:
logger.info(f"{g_repr(sid)}: {e!r} occurred")
await self.ecu.reconnect()
continue

if isinstance(resp, NegativeResponse) and resp.response_code in [
UDSErrorCodes.serviceNotSupported,
Expand Down
7 changes: 4 additions & 3 deletions src/gallia/dumpcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ async def start(
src_addr,
dst_addr,
)
# Unix domain sockets are not supported by dumpcap.
case UnixTransport.SCHEME | UnixLinesTransport.SCHEME:
logger.warning("Dumpcap does not support unix domain sockets")
return None
# There is currently no API for transport plugins to
# register a scheme and a corresponding invocation
Expand Down Expand Up @@ -164,11 +164,12 @@ def _can_cmd(
return args

@staticmethod
async def _eth_cmd(target_ip: str) -> list[str]:
async def _eth_cmd(target_ip: str) -> list[str] | None:
try:
host, port = split_host_port(target_ip)
except Exception as e:
raise ValueError(f"Invalid argument for target ip: {target_ip}; {e}") from e
logger.error(f"Invalid argument for target ip: {target_ip}; {e}")
return None

if proxy := os.getenv("all_proxy"):
url = urlparse(proxy)
Expand Down
3 changes: 2 additions & 1 deletion src/gallia/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def add_stderr_log_handler(

def add_zst_log_handler(
logger_name: str, filepath: Path, file_log_level: Loglevel
) -> None:
) -> logging.Handler:
rumpelsepp marked this conversation as resolved.
Show resolved Hide resolved
queue: Queue[Any] = Queue()
logger = get_logger(logger_name)
logger.addHandler(QueueHandler(queue))
Expand All @@ -318,6 +318,7 @@ def add_zst_log_handler(
)
queue_listener.start()
atexit.register(queue_listener.stop)
return zstd_handler


class _PenlogRecordV1(msgspec.Struct, omit_defaults=True):
Expand Down
2 changes: 2 additions & 0 deletions src/gallia/services/uds/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async def request_unsafe(
except asyncio.TimeoutError as e:
# Send a tester present to indicate that
# we are still there.
# TODO Is this really necessary?
await self._tester_present(suppress_resp=True)
n_timeout += 1
if n_timeout >= max_n_timeout:
Expand Down Expand Up @@ -158,6 +159,7 @@ async def _tester_present(
tags = config.tags if config.tags is not None else []
logger.debug(pdu.hex(), extra={"tags": ["write", "uds"] + tags})
await self.transport.write(pdu, timeout, config.tags)
# TODO This is not fail safe: What if there is an answer???
return None
return await self.tester_present(False, config)

Expand Down
49 changes: 18 additions & 31 deletions src/gallia/services/uds/ecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,42 +370,25 @@ async def wait_for_ecu(
async def _tester_present_worker(self, interval: float) -> None:
assert self.transport
logger.debug("tester present worker started")
while True:
task = asyncio.current_task()
while task is not None and task.cancelling() == 0:
try:
async with self.mutex:
rumpelsepp marked this conversation as resolved.
Show resolved Hide resolved
try:
payload = bytes([0x3E, 0x80])
await self.transport.write(payload)
logger.debug(payload.hex(), extra={"tags": ["write", "uds"]})

# Hold the mutex for 10 ms to synchronize this background
# worker with the main sender task.
await asyncio.sleep(0.01)

# The BCP might send us an error. Everything
# will break if we do not read it back. Since
# this read() call is only intended to flush
# errors caused by the previous write(), it is
# sane to ignore the error here.
try:
await self.transport.read(timeout=0.01)
except asyncio.TimeoutError:
pass
except asyncio.CancelledError:
raise
except ConnectionError:
logger.info("connection lost; tester present waiting…")
await asyncio.sleep(1)
except Exception as e:
logger.debug(f"tester present got {e!r}")

# Release the mutex while sleeping.
await asyncio.sleep(interval)
# TODO Only ping if there was no other UDS traffic for `interval` amount of time
await self.ping(UDSRequestConfig(max_retry=1))
except asyncio.CancelledError:
logger.debug("tester present worker terminated")
break
raise
except ConnectionError:
logger.info("connection lost; tester present waiting…")
except Exception as e:
logger.warning(f"Tester present worker got {e!r}")
logger.debug(
"Tester present worker was cancelled but received no asyncio.CancelledError"
)

async def start_cyclic_tester_present(self, interval: float) -> None:
logger.debug("Starting tester present worker")
self.tester_present_interval = interval
coroutine = self._tester_present_worker(interval)
self.tester_present_task = asyncio.create_task(coroutine)
Expand All @@ -416,14 +399,18 @@ async def start_cyclic_tester_present(self, interval: float) -> None:
await asyncio.sleep(0)

async def stop_cyclic_tester_present(self) -> None:
logger.debug("Stopping tester present worker")
if self.tester_present_task is None:
logger.warning(
"BUG: stop_cyclic_tester_present() called but no task running"
)
return

self.tester_present_task.cancel()
await self.tester_present_task
try:
await self.tester_present_task
except asyncio.CancelledError:
pass

async def update_state(
self, request: service.UDSRequest, response: service.UDSResponse
Expand Down
12 changes: 10 additions & 2 deletions src/gallia/services/uds/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,12 @@ async def handle_client(
if uds_response_raw is not None:
writer.write(hexlify(uds_response_raw) + b"\n")
await writer.drain()
except Exception:
except Exception as e:
logger.error(
f"Unexpected exception when handling client communication: {e!r}"
)
traceback.print_exc()
break
ferdinandjarisch marked this conversation as resolved.
Show resolved Hide resolved

logger.info("Connection closed")
logger.info(
Expand All @@ -897,8 +901,12 @@ async def run(self) -> None:

if uds_response_raw is not None:
await transport.write(uds_response_raw)
except Exception:
except Exception as e:
logger.error(
f"Unexpected exception when handling client communication: {e!r}"
)
traceback.print_exc()
break
ferdinandjarisch marked this conversation as resolved.
Show resolved Hide resolved


class UnixUDSServerTransport(TCPUDSServerTransport):
Expand Down