From db4e63661333fd4f99107ccb84c850fca21b15fb Mon Sep 17 00:00:00 2001 From: Stefan Tatschner Date: Tue, 17 Dec 2024 09:32:24 +0100 Subject: [PATCH] fix: Remove dependence on special Python builds for CAN functionality The CAN constants are compiled in depending in the enabled kernel features. In case a Python interpreter is built in a special environment (e.g., https://github.com/indygreg/python-build-standalone used by the uv tool), then a few constants might be missing. The absence of these constants does not limit the availability of the CAN feature on a supported kernel. So, lets checkin a generated file with the correct constants to avoid running into this problem. --- Makefile | 5 ++ scripts/gen_constants.py | 53 +++++++++++++++++++ src/gallia/transports/_can_constants.py | 31 +++++++++++ src/gallia/transports/can.py | 68 ++++++++++++++----------- 4 files changed, 126 insertions(+), 31 deletions(-) create mode 100755 scripts/gen_constants.py create mode 100644 src/gallia/transports/_can_constants.py diff --git a/Makefile b/Makefile index a8e4a3d81..03e2b6074 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,7 @@ default: @echo " tests run testsuite" @echo " pytest run pytest tests" @echo " bats run bats end to end tests" + @echo " constants generate transport constants for compat reasons" @echo " clean delete build artifacts" .PHONY: lint @@ -48,6 +49,10 @@ pytest: bats: ./tests/bats/run_bats.sh +.PHONY: constants +constants: + ./scripts/gen_constants.py | ruff format - > src/gallia/transports/_can_constants.py + .PHONY: clean clean: $(MAKE) -C docs clean diff --git a/scripts/gen_constants.py b/scripts/gen_constants.py new file mode 100755 index 000000000..247c237c8 --- /dev/null +++ b/scripts/gen_constants.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +import socket + + +TEMPLATE = f"""# This file has been autogenerated by `make constants`. +# !! DO NOT CHANGE MANUALLY !! + +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +import struct + + +CANFD_MTU = 72 +CAN_MTU = 16 + +# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/can.h +CAN_HEADER_FMT = struct.Struct("=IBB2x") + +CANFD_BRS = 0x01 +CANFD_ESI = 0x02 + +CAN_EFF_FLAG = {socket.CAN_EFF_FLAG} +CAN_ERR_FLAG = {socket.CAN_ERR_FLAG} +CAN_RTR_FLAG = {socket.CAN_RTR_FLAG} + +CAN_EFF_MASK = {socket.CAN_EFF_MASK} +CAN_INV_FILTER = 0x20000000 # TODO: Add to CPython +CAN_SFF_MASK = {socket.CAN_SFF_MASK} + +CAN_RAW = {socket.CAN_RAW} +CAN_RAW_FD_FRAMES = {socket.CAN_RAW_FD_FRAMES} +CAN_RAW_FILTER = {socket.CAN_RAW_FILTER} +CAN_RAW_JOIN_FILTERS = {socket.CAN_RAW_JOIN_FILTERS} +SOL_CAN_RAW = {socket.SOL_CAN_RAW} +""" + + +def main() -> None: + print(TEMPLATE) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + pass diff --git a/src/gallia/transports/_can_constants.py b/src/gallia/transports/_can_constants.py new file mode 100644 index 000000000..dd23da3e8 --- /dev/null +++ b/src/gallia/transports/_can_constants.py @@ -0,0 +1,31 @@ +# This file has been autogenerated by `make constants`. +# !! DO NOT CHANGE MANUALLY !! + +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +import struct + +CANFD_MTU = 72 +CAN_MTU = 16 + +# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/can.h +CAN_HEADER_FMT = struct.Struct("=IBB2x") + +CANFD_BRS = 0x01 +CANFD_ESI = 0x02 + +CAN_EFF_FLAG = 2147483648 +CAN_ERR_FLAG = 536870912 +CAN_RTR_FLAG = 1073741824 + +CAN_EFF_MASK = 536870911 +CAN_INV_FILTER = 0x20000000 # TODO: Add to CPython +CAN_SFF_MASK = 2047 + +CAN_RAW = 1 +CAN_RAW_FD_FRAMES = 5 +CAN_RAW_FILTER = 1 +CAN_RAW_JOIN_FILTERS = 6 +SOL_CAN_RAW = 101 diff --git a/src/gallia/transports/can.py b/src/gallia/transports/can.py index 7dafd0f20..7b56aa224 100644 --- a/src/gallia/transports/can.py +++ b/src/gallia/transports/can.py @@ -15,22 +15,31 @@ from pydantic import BaseModel, field_validator from gallia.log import get_logger +from gallia.transports._can_constants import ( + CAN_EFF_FLAG, + CAN_EFF_MASK, + CAN_ERR_FLAG, + CAN_HEADER_FMT, + CAN_INV_FILTER, + CAN_RAW, + CAN_RAW_FD_FRAMES, + CAN_RAW_FILTER, + CAN_RAW_JOIN_FILTERS, + CAN_RTR_FLAG, + CAN_SFF_MASK, + CANFD_BRS, + CANFD_ESI, + CANFD_MTU, + SOL_CAN_RAW, +) from gallia.transports.base import BaseTransport, TargetURI from gallia.utils import auto_int logger = get_logger(__name__) -CANFD_MTU = 72 -CAN_MTU = 16 - @dataclass(kw_only=True, slots=True, frozen=True) class CANMessage: - # https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/can.h - CAN_HEADER_FMT = struct.Struct("=IBB2x") - CANFD_BRS = 0x01 - CANFD_ESI = 0x02 - timestamp: float = 0.0 arbitration_id: int = 0 @@ -54,27 +63,27 @@ def __len__(self) -> int: def _compose_arbitration_id(self) -> int: can_id = self.arbitration_id if self.is_extended_id: - can_id |= s.CAN_EFF_FLAG + can_id |= CAN_EFF_FLAG if self.is_remote_frame: - can_id |= s.CAN_RTR_FLAG + can_id |= CAN_RTR_FLAG if self.is_error_frame: - can_id |= s.CAN_ERR_FLAG + can_id |= CAN_ERR_FLAG return can_id def pack(self) -> bytes: can_id = self._compose_arbitration_id() flags = 0 if self.bitrate_switch: - flags |= self.CANFD_BRS + flags |= CANFD_BRS if self.error_state_indicator: - flags |= self.CANFD_ESI + flags |= CANFD_ESI max_len = 64 if self.is_fd else 8 data = bytes(self.data).ljust(max_len, b"\x00") - return self.CAN_HEADER_FMT.pack(can_id, self.dlc, flags) + data + return CAN_HEADER_FMT.pack(can_id, self.dlc, flags) + data @staticmethod def _dissect_can_frame(frame: bytes) -> tuple[int, int, int, bytes]: - can_id, can_dlc, flags = CANMessage.CAN_HEADER_FMT.unpack_from(frame) + can_id, can_dlc, flags = CAN_HEADER_FMT.unpack_from(frame) if len(frame) != CANFD_MTU: # Flags not valid in non-FD frames flags = 0 @@ -89,17 +98,17 @@ def unpack(cls, frame: bytes) -> Self: # #define CAN_EFF_FLAG 0x80000000U /* EFF/SFF is set in the MSB */ # #define CAN_RTR_FLAG 0x40000000U /* remote transmission request */ # #define CAN_ERR_FLAG 0x20000000U /* error frame */ - is_extended_frame_format = bool(can_id & s.CAN_EFF_FLAG) - is_remote_transmission_request = bool(can_id & s.CAN_RTR_FLAG) - is_error_frame = bool(can_id & s.CAN_ERR_FLAG) + is_extended_frame_format = bool(can_id & CAN_EFF_FLAG) + is_remote_transmission_request = bool(can_id & CAN_RTR_FLAG) + is_error_frame = bool(can_id & CAN_ERR_FLAG) is_fd = len(frame) == CANFD_MTU - bitrate_switch = bool(flags & cls.CANFD_BRS) - error_state_indicator = bool(flags & cls.CANFD_ESI) + bitrate_switch = bool(flags & CANFD_BRS) + error_state_indicator = bool(flags & CANFD_ESI) if is_extended_frame_format: - arbitration_id = can_id & s.CAN_EFF_MASK + arbitration_id = can_id & CAN_EFF_MASK else: - arbitration_id = can_id & s.CAN_SFF_MASK + arbitration_id = can_id & CAN_SFF_MASK return cls( arbitration_id=arbitration_id, @@ -128,9 +137,6 @@ def auto_int(cls, v: str) -> int: class RawCANTransport(BaseTransport, scheme="can-raw"): - # Flags for setsockopt CAN_RAW_FILTER - CAN_INV_FILTER = 0x20000000 - def __init__(self, target: TargetURI, config: RawCANConfig, sock: s.socket) -> None: super().__init__(target) @@ -149,12 +155,12 @@ async def connect( if t.hostname is None: raise ValueError("empty interface") - sock = s.socket(s.PF_CAN, s.SOCK_RAW, s.CAN_RAW) + sock = s.socket(s.PF_CAN, s.SOCK_RAW, CAN_RAW) sock.bind((t.hostname,)) config = RawCANConfig(**t.qs_flat) if config.is_fd is True: - sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_FD_FRAMES, 1) + sock.setsockopt(SOL_CAN_RAW, CAN_RAW_FD_FRAMES, 1) sock.setblocking(False) @@ -163,15 +169,15 @@ async def connect( def set_filter(self, can_ids: list[int], inv_filter: bool = False) -> None: if not can_ids: return - filter_mask = s.CAN_EFF_MASK if self.config.is_extended else s.CAN_SFF_MASK + filter_mask = CAN_EFF_MASK if self.config.is_extended else CAN_SFF_MASK data = b"" for can_id in can_ids: if inv_filter: - can_id |= self.CAN_INV_FILTER # noqa: PLW2901 + can_id |= CAN_INV_FILTER # noqa: PLW2901 data += struct.pack("@II", can_id, filter_mask) - self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_FILTER, data) + self._sock.setsockopt(SOL_CAN_RAW, CAN_RAW_FILTER, data) if inv_filter: - self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_JOIN_FILTERS, 1) + self._sock.setsockopt(SOL_CAN_RAW, CAN_RAW_JOIN_FILTERS, 1) async def read( self,