From ed1b203458c8e9c7ed4cfb71c746e1dda2c39086 Mon Sep 17 00:00:00 2001 From: "Jarisch, Ferdinand" Date: Tue, 3 Dec 2024 17:07:37 +0100 Subject: [PATCH] feat(doip-discover): Greatly speed up DoIP discover Previously we waited for each valid target_address for the ECU to reply. This commit introduces a parallel approach where diagnostic message replies are treated independently of their requests which resolves virtually all waits for timeouts except at the very end during the target address enumeration. --- src/gallia/commands/discover/doip.py | 131 +++++++++++++-------------- src/gallia/transports/doip.py | 18 +++- 2 files changed, 81 insertions(+), 68 deletions(-) diff --git a/src/gallia/commands/discover/doip.py b/src/gallia/commands/discover/doip.py index ccd0ecadc..d73351139 100644 --- a/src/gallia/commands/discover/doip.py +++ b/src/gallia/commands/discover/doip.py @@ -16,7 +16,6 @@ from gallia.log import get_logger from gallia.services.uds.core.service import TesterPresentRequest, TesterPresentResponse from gallia.transports.doip import ( - DiagnosticMessage, DiagnosticMessageNegativeAckCodes, DoIPConnection, DoIPEntityStatusResponse, @@ -320,16 +319,19 @@ async def enumerate_target_addresses( ) -> None: # noqa: PLR0913 known_targets = [] unreachable_targets = [] - responsive_targets = [] search_space = range(start, stop + 1) - conn = await self.create_DoIP_conn(tgt_hostname, tgt_port, correct_rat, correct_src, 0xAFFE) + target_template = f"doip://{tgt_hostname}:{tgt_port}?protocol_version={self.protocol_version}&activation_type={correct_rat:#x}&src_addr={correct_src:#x}&target_addr={{:#x}}" + conn = await self.create_DoIP_conn( + tgt_hostname, tgt_port, correct_rat, correct_src, 0xAFFE, fast_queue=True + ) + reader_task = asyncio.create_task(self.task_read_diagnostic_messages(conn, target_template)) for target_addr in search_space: logger.debug(f"[๐Ÿšง] Attempting connection to {target_addr:#x}") conn.target_addr = target_addr - current_target = f"doip://{tgt_hostname}:{tgt_port}?protocol_version={self.protocol_version}&activation_type={correct_rat:#x}&src_addr={correct_src:#x}&target_addr={target_addr:#x}" + current_target = target_template.format(target_addr) try: req = TesterPresentRequest(suppress_response=False) @@ -343,37 +345,7 @@ async def enumerate_target_addresses( ) as f: await f.write(f"{current_target}\n") - logger.info(f"[โณ] Waiting for reply of target {target_addr:#x}") - # Hardcoded loop to detect potential broadcasts - while True: - pot_broadcast, data = await asyncio.wait_for( - self.read_diag_request_custom(conn), - TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000 - if timeout is None - else timeout, - ) - if pot_broadcast is None: - break - - logger.notice( - f"[๐Ÿค‘] B-B-B-B-B-B-BROADCAST at TargetAddress {target_addr:#x}! Got reply from {pot_broadcast:#x}" - ) - async with aiofiles.open( - self.artifacts_dir.joinpath("6_unsolicited_replies.txt"), "a" - ) as f: - await f.write( - f"target_addr={target_addr:#x} yielded reply from {pot_broadcast:#x}; could also be late answer triggered by previous address!\n" - ) - - resp = TesterPresentResponse.parse_static(data) - logger.notice(f"[๐Ÿฅณ] It cannot get nicer: {target_addr:#x} responded: {resp}") - responsive_targets.append(current_target) - async with aiofiles.open( - self.artifacts_dir.joinpath("4_responsive_targets.txt"), "a" - ) as f: - await f.write(f"{current_target}\n") - if self.db_handler is not None: - await self.db_handler.insert_discovery_result(current_target) + # Here is where "reader_task" comes into play, which monitors incoming DiagnosticMessage replies except DoIPNegativeAckError as e: if e.nack_code == DiagnosticMessageNegativeAckCodes.UnknownTargetAddress: @@ -397,14 +369,6 @@ async def enumerate_target_addresses( await f.write(f"{target_addr:#x}: {e.nack_code.name}\n") continue - except TimeoutError: # This triggers when DoIP ACK but no UDS reply - logger.info(f"[๐Ÿ™Š] Presumably no active ECU on target address {target_addr:#x}") - async with aiofiles.open( - self.artifacts_dir.joinpath("5_unresponsive_targets.txt"), "a" - ) as f: - await f.write(f"{current_target}\n") - continue - except ConnectionError as e: # Whenever this triggers, but sometimes connections are closed not by us logger.warning(f"[๐Ÿซฆ] Sexy, but unexpected: {target_addr:#x} triggered {e!r}") @@ -421,9 +385,6 @@ async def enumerate_target_addresses( ) continue - await conn.close() - await asyncio.sleep(tcp_connect_delay) - logger.notice( f"[โš”๏ธ] It's dangerous to test alone, take one of these {len(known_targets)} known targets:" ) @@ -439,16 +400,65 @@ async def enumerate_target_addresses( for item in unreachable_targets: logger.notice(item) - logger.notice( - f"[๐Ÿ’ฐ] For even more profit, try one of the {len(responsive_targets)} targets that actually responded:" - ) - for item in responsive_targets: - logger.notice(item) + logger.info("[๐Ÿ˜ด] Giving all ECUs a chance to reply...") + await asyncio.sleep(TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000) + reader_task.cancel() + await reader_task + await conn.close() logger.notice( f"[๐Ÿงญ] Check out the content of the log files at {self.artifacts_dir} as well!" ) + async def task_read_diagnostic_messages( + self, conn: DoIPConnection, target_template: str + ) -> None: + responsive_targets = [] + potential_broadcasts = [] + try: + while True: + _, payload = await conn.read_diag_request_raw() + (source_address, data) = (payload.SourceAddress, payload.UserData) + current_target = target_template.format(source_address) + + resp = TesterPresentResponse.parse_static(data) + logger.notice(f"[๐Ÿฅ‡] It cannot get nicer: {source_address:#x} responded: {resp}") + + if current_target not in responsive_targets: + responsive_targets.append(current_target) + async with aiofiles.open( + self.artifacts_dir.joinpath("4_responsive_targets.txt"), "a" + ) as f: + await f.write(f"{current_target}\n") + if self.db_handler is not None: + await self.db_handler.insert_discovery_result(current_target) + + if ( + abs(source_address - conn.target_addr) > 10 + and conn.target_addr not in potential_broadcasts + ): + potential_broadcasts.append(conn.target_addr) + + except asyncio.CancelledError: + logger.debug("Diagnostic Message reader got cancelled") + except Exception as e: + logger.error(f"Diagnostic Message reader died with {e!r}") + + finally: + logger.notice( + f"[๐Ÿ’ฐ] For even more profit, try one of the {len(responsive_targets)} targets that actually responded:" + ) + for item in responsive_targets: + logger.notice(item) + + # TODO: the discoverer could be extended to search for and validate the broadcast address(es) automatically + if len(potential_broadcasts) > 0: + logger.notice( + "[๐Ÿ•ต๏ธ] You could also investigate these target addresses that appear to be near broadcasts:" + ) + for target_addr in potential_broadcasts: + logger.notice(f"[๐Ÿค‘] B-B-B-B-B-B-BROADCAST around TargetAddress {target_addr:#x}!") + async def create_DoIP_conn( self, hostname: str, @@ -456,6 +466,7 @@ async def create_DoIP_conn( routing_activation_type: int, src_addr: int, target_addr: int, + fast_queue: bool = False, ) -> DoIPConnection: # noqa: PLR0913 while True: try: # Ensure that connections do not remain in TIME_WAIT @@ -466,6 +477,7 @@ async def create_DoIP_conn( target_addr, so_linger=True, protocol_version=self.protocol_version, + separate_diagnostic_message_queue=fast_queue, ) logger.info("[๐Ÿ“ซ] Sending RoutingActivationRequest") await conn.write_routing_activation_request( @@ -473,26 +485,11 @@ async def create_DoIP_conn( ) except Exception as e: # TODO: this probably is too broad logger.warning( - f"[\U0001fae8] Got me some good errors when it should be working (dis an infinite loop): {e!r}" + f"[๐Ÿซจ] Got me some good errors when it should be working (dis an infinite loop): {e!r}" ) continue return conn - async def read_diag_request_custom(self, conn: DoIPConnection) -> tuple[int | None, bytes]: - while True: - hdr, payload = await conn.read_frame() - if not isinstance(payload, DiagnosticMessage): - logger.warning(f"[๐Ÿงจ] Unexpected DoIP message: {hdr} {payload}") - return (None, b"") - if payload.SourceAddress != conn.target_addr: - return (payload.SourceAddress, payload.UserData) - if payload.TargetAddress != conn.src_addr: - logger.warning( - f"[๐ŸคŒ] You talking to me?! Unexpected DoIP target address: {payload.TargetAddress:#04x}" - ) - continue - return (None, payload.UserData) - @staticmethod def get_broadcast_addrs() -> list[AddrInfo]: out = [] diff --git a/src/gallia/transports/doip.py b/src/gallia/transports/doip.py index b71c55b46..5d1d0174a 100644 --- a/src/gallia/transports/doip.py +++ b/src/gallia/transports/doip.py @@ -470,12 +470,15 @@ def __init__( # noqa: PLR0913 src_addr: int, target_addr: int, protocol_version: int, + separate_diagnostic_message_queue: bool = False, ): self.reader = reader self.writer = writer self.src_addr = src_addr self.target_addr = target_addr self.protocol_version = protocol_version + self.separate_diagnostic_message_queue = separate_diagnostic_message_queue + self._diagnostic_message_queue: asyncio.Queue[DoIPDiagFrame] = asyncio.Queue() self._read_queue: asyncio.Queue[DoIPFrame] = asyncio.Queue() self._read_task = asyncio.create_task(self._read_worker()) self._read_task.add_done_callback( @@ -494,6 +497,7 @@ async def connect( # noqa: PLR0913 target_addr: int, so_linger: bool = False, protocol_version: int = ProtocolVersions.ISO_13400_2_2019, + separate_diagnostic_message_queue: bool = False, ) -> Self: reader, writer = await asyncio.open_connection(host, port) @@ -508,7 +512,14 @@ async def connect( # noqa: PLR0913 sock = writer.get_extra_info("socket") sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)) - return cls(reader, writer, src_addr, target_addr, protocol_version) + return cls( + reader, + writer, + src_addr, + target_addr, + protocol_version, + separate_diagnostic_message_queue, + ) async def _read_frame(self) -> DoIPFrame | tuple[None, None]: # Header is fixed size 8 byte. @@ -547,6 +558,9 @@ async def _read_worker(self) -> None: if hdr.PayloadType == PayloadTypes.AliveCheckRequest: await self.write_alive_check_response() continue + if isinstance(data, DiagnosticMessage) and self.separate_diagnostic_message_queue: + await self._diagnostic_message_queue.put((hdr, data)) + continue await self._read_queue.put((hdr, data)) except asyncio.CancelledError: logger.debug("DoIP read worker got cancelled") @@ -573,6 +587,8 @@ async def read_frame(self) -> DoIPFrame: async def read_diag_request_raw(self) -> DoIPDiagFrame: unexpected_packets: list[tuple[Any, Any]] = [] while True: + if self.separate_diagnostic_message_queue: + return await self._diagnostic_message_queue.get() hdr, payload = await self.read_frame() if not isinstance(payload, DiagnosticMessage): logger.warning(f"expected DoIP DiagnosticMessage, instead got: {hdr} {payload}")