Skip to content

Commit

Permalink
fix: Remove dependence on special Python builds for CAN functionality
Browse files Browse the repository at this point in the history
The CAN constants are compiled in depending in the enabled kernel
features. In case a Python interpreter is built in a special
environment (e.g., https://github.com/indygreg/python-build-standalone
used by the uv tool), then a few constants might be missing. The absence
of these constants does not limit the availability of the CAN feature on
a supported kernel. So, lets checkin a generated file with the correct
constants to avoid running into this problem.
  • Loading branch information
rumpelsepp committed Dec 19, 2024
1 parent 1e408a6 commit b89aea9
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 31 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ default:
@echo " tests run testsuite"
@echo " pytest run pytest tests"
@echo " bats run bats end to end tests"
@echo " constants generate transport constants for compat reasons"
@echo " clean delete build artifacts"

.PHONY: lint
Expand Down Expand Up @@ -48,6 +49,10 @@ pytest:
bats:
./tests/bats/run_bats.sh

.PHONY: constants
constants:
./scripts/gen_constants.py | ruff format - > src/gallia/transports/_can_constants.py

.PHONY: clean
clean:
$(MAKE) -C docs clean
53 changes: 53 additions & 0 deletions scripts/gen_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python

# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

import socket


TEMPLATE = f"""# This file has been autogenerated by `make constants`.
# !! DO NOT CHANGE MANUALLY !!
# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0
import struct
CANFD_MTU = 72
CAN_MTU = 16
# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/can.h
CAN_HEADER_FMT = struct.Struct("=IBB2x")
CANFD_BRS = 0x01
CANFD_ESI = 0x02
CAN_EFF_FLAG = {socket.CAN_EFF_FLAG}
CAN_ERR_FLAG = {socket.CAN_ERR_FLAG}
CAN_RTR_FLAG = {socket.CAN_RTR_FLAG}
CAN_EFF_MASK = {socket.CAN_EFF_MASK}
CAN_INV_FILTER = 0x20000000 # TODO: Add to CPython
CAN_SFF_MASK = {socket.CAN_SFF_MASK}
CAN_RAW = {socket.CAN_RAW}
CAN_RAW_FD_FRAMES = {socket.CAN_RAW_FD_FRAMES}
CAN_RAW_FILTER = {socket.CAN_RAW_FILTER}
CAN_RAW_JOIN_FILTERS = {socket.CAN_RAW_JOIN_FILTERS}
SOL_CAN_RAW = {socket.SOL_CAN_RAW}
"""


def main() -> None:
print(TEMPLATE)


if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
31 changes: 31 additions & 0 deletions src/gallia/transports/_can_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# This file has been autogenerated by `make constants`.
# !! DO NOT CHANGE MANUALLY !!

# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

import struct

CANFD_MTU = 72
CAN_MTU = 16

# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/can.h
CAN_HEADER_FMT = struct.Struct("=IBB2x")

CANFD_BRS = 0x01
CANFD_ESI = 0x02

CAN_EFF_FLAG = 2147483648
CAN_ERR_FLAG = 536870912
CAN_RTR_FLAG = 1073741824

CAN_EFF_MASK = 536870911
CAN_INV_FILTER = 0x20000000 # TODO: Add to CPython
CAN_SFF_MASK = 2047

CAN_RAW = 1
CAN_RAW_FD_FRAMES = 5
CAN_RAW_FILTER = 1
CAN_RAW_JOIN_FILTERS = 6
SOL_CAN_RAW = 101
68 changes: 37 additions & 31 deletions src/gallia/transports/can.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,31 @@
from pydantic import BaseModel, field_validator

from gallia.log import get_logger
from gallia.transports._can_constants import (
CAN_EFF_FLAG,
CAN_EFF_MASK,
CAN_ERR_FLAG,
CAN_HEADER_FMT,
CAN_INV_FILTER,
CAN_RAW,
CAN_RAW_FD_FRAMES,
CAN_RAW_FILTER,
CAN_RAW_JOIN_FILTERS,
CAN_RTR_FLAG,
CAN_SFF_MASK,
CANFD_BRS,
CANFD_ESI,
CANFD_MTU,
SOL_CAN_RAW,
)
from gallia.transports.base import BaseTransport, TargetURI
from gallia.utils import auto_int

logger = get_logger(__name__)

CANFD_MTU = 72
CAN_MTU = 16


@dataclass(kw_only=True, slots=True, frozen=True)
class CANMessage:
# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/can.h
CAN_HEADER_FMT = struct.Struct("=IBB2x")
CANFD_BRS = 0x01
CANFD_ESI = 0x02

timestamp: float = 0.0
arbitration_id: int = 0

Expand All @@ -54,27 +63,27 @@ def __len__(self) -> int:
def _compose_arbitration_id(self) -> int:
can_id = self.arbitration_id
if self.is_extended_id:
can_id |= s.CAN_EFF_FLAG
can_id |= CAN_EFF_FLAG
if self.is_remote_frame:
can_id |= s.CAN_RTR_FLAG
can_id |= CAN_RTR_FLAG
if self.is_error_frame:
can_id |= s.CAN_ERR_FLAG
can_id |= CAN_ERR_FLAG
return can_id

def pack(self) -> bytes:
can_id = self._compose_arbitration_id()
flags = 0
if self.bitrate_switch:
flags |= self.CANFD_BRS
flags |= CANFD_BRS
if self.error_state_indicator:
flags |= self.CANFD_ESI
flags |= CANFD_ESI
max_len = 64 if self.is_fd else 8
data = bytes(self.data).ljust(max_len, b"\x00")
return self.CAN_HEADER_FMT.pack(can_id, self.dlc, flags) + data
return CAN_HEADER_FMT.pack(can_id, self.dlc, flags) + data

@staticmethod
def _dissect_can_frame(frame: bytes) -> tuple[int, int, int, bytes]:
can_id, can_dlc, flags = CANMessage.CAN_HEADER_FMT.unpack_from(frame)
can_id, can_dlc, flags = CAN_HEADER_FMT.unpack_from(frame)
if len(frame) != CANFD_MTU:
# Flags not valid in non-FD frames
flags = 0
Expand All @@ -89,17 +98,17 @@ def unpack(cls, frame: bytes) -> Self:
# #define CAN_EFF_FLAG 0x80000000U /* EFF/SFF is set in the MSB */
# #define CAN_RTR_FLAG 0x40000000U /* remote transmission request */
# #define CAN_ERR_FLAG 0x20000000U /* error frame */
is_extended_frame_format = bool(can_id & s.CAN_EFF_FLAG)
is_remote_transmission_request = bool(can_id & s.CAN_RTR_FLAG)
is_error_frame = bool(can_id & s.CAN_ERR_FLAG)
is_extended_frame_format = bool(can_id & CAN_EFF_FLAG)
is_remote_transmission_request = bool(can_id & CAN_RTR_FLAG)
is_error_frame = bool(can_id & CAN_ERR_FLAG)
is_fd = len(frame) == CANFD_MTU
bitrate_switch = bool(flags & cls.CANFD_BRS)
error_state_indicator = bool(flags & cls.CANFD_ESI)
bitrate_switch = bool(flags & CANFD_BRS)
error_state_indicator = bool(flags & CANFD_ESI)

if is_extended_frame_format:
arbitration_id = can_id & s.CAN_EFF_MASK
arbitration_id = can_id & CAN_EFF_MASK
else:
arbitration_id = can_id & s.CAN_SFF_MASK
arbitration_id = can_id & CAN_SFF_MASK

return cls(
arbitration_id=arbitration_id,
Expand Down Expand Up @@ -128,9 +137,6 @@ def auto_int(cls, v: str) -> int:


class RawCANTransport(BaseTransport, scheme="can-raw"):
# Flags for setsockopt CAN_RAW_FILTER
CAN_INV_FILTER = 0x20000000

def __init__(self, target: TargetURI, config: RawCANConfig, sock: s.socket) -> None:
super().__init__(target)

Expand All @@ -149,12 +155,12 @@ async def connect(
if t.hostname is None:
raise ValueError("empty interface")

sock = s.socket(s.PF_CAN, s.SOCK_RAW, s.CAN_RAW)
sock = s.socket(s.PF_CAN, s.SOCK_RAW, CAN_RAW)
sock.bind((t.hostname,))
config = RawCANConfig(**t.qs_flat)

if config.is_fd is True:
sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_FD_FRAMES, 1)
sock.setsockopt(SOL_CAN_RAW, CAN_RAW_FD_FRAMES, 1)

sock.setblocking(False)

Expand All @@ -163,15 +169,15 @@ async def connect(
def set_filter(self, can_ids: list[int], inv_filter: bool = False) -> None:
if not can_ids:
return
filter_mask = s.CAN_EFF_MASK if self.config.is_extended else s.CAN_SFF_MASK
filter_mask = CAN_EFF_MASK if self.config.is_extended else CAN_SFF_MASK
data = b""
for can_id in can_ids:
if inv_filter:
can_id |= self.CAN_INV_FILTER # noqa: PLW2901
can_id |= CAN_INV_FILTER # noqa: PLW2901
data += struct.pack("@II", can_id, filter_mask)
self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_FILTER, data)
self._sock.setsockopt(SOL_CAN_RAW, CAN_RAW_FILTER, data)
if inv_filter:
self._sock.setsockopt(s.SOL_CAN_RAW, s.CAN_RAW_JOIN_FILTERS, 1)
self._sock.setsockopt(SOL_CAN_RAW, CAN_RAW_JOIN_FILTERS, 1)

async def read(
self,
Expand Down

0 comments on commit b89aea9

Please sign in to comment.