Skip to content

Commit

Permalink
feat(doip-discover): Greatly speed up DoIP discover
Browse files Browse the repository at this point in the history
Previously we waited for each valid target_address for the ECU to reply.
This commit introduces a parallel approach where diagnostic message
replies are treated independently of their requests which resolves
virtually all waits for timeouts except at the very end during the
target address enumeration.
  • Loading branch information
ferdinandjarisch authored and rumpelsepp committed Dec 4, 2024
1 parent b3f5a85 commit ed1b203
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 68 deletions.
131 changes: 64 additions & 67 deletions src/gallia/commands/discover/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from gallia.log import get_logger
from gallia.services.uds.core.service import TesterPresentRequest, TesterPresentResponse
from gallia.transports.doip import (
DiagnosticMessage,
DiagnosticMessageNegativeAckCodes,
DoIPConnection,
DoIPEntityStatusResponse,
Expand Down Expand Up @@ -320,16 +319,19 @@ async def enumerate_target_addresses(
) -> None: # noqa: PLR0913
known_targets = []
unreachable_targets = []
responsive_targets = []
search_space = range(start, stop + 1)

conn = await self.create_DoIP_conn(tgt_hostname, tgt_port, correct_rat, correct_src, 0xAFFE)
target_template = f"doip://{tgt_hostname}:{tgt_port}?protocol_version={self.protocol_version}&activation_type={correct_rat:#x}&src_addr={correct_src:#x}&target_addr={{:#x}}"
conn = await self.create_DoIP_conn(
tgt_hostname, tgt_port, correct_rat, correct_src, 0xAFFE, fast_queue=True
)
reader_task = asyncio.create_task(self.task_read_diagnostic_messages(conn, target_template))

for target_addr in search_space:
logger.debug(f"[🚧] Attempting connection to {target_addr:#x}")

conn.target_addr = target_addr
current_target = f"doip://{tgt_hostname}:{tgt_port}?protocol_version={self.protocol_version}&activation_type={correct_rat:#x}&src_addr={correct_src:#x}&target_addr={target_addr:#x}"
current_target = target_template.format(target_addr)

try:
req = TesterPresentRequest(suppress_response=False)
Expand All @@ -343,37 +345,7 @@ async def enumerate_target_addresses(
) as f:
await f.write(f"{current_target}\n")

logger.info(f"[⏳] Waiting for reply of target {target_addr:#x}")
# Hardcoded loop to detect potential broadcasts
while True:
pot_broadcast, data = await asyncio.wait_for(
self.read_diag_request_custom(conn),
TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000
if timeout is None
else timeout,
)
if pot_broadcast is None:
break

logger.notice(
f"[🤑] B-B-B-B-B-B-BROADCAST at TargetAddress {target_addr:#x}! Got reply from {pot_broadcast:#x}"
)
async with aiofiles.open(
self.artifacts_dir.joinpath("6_unsolicited_replies.txt"), "a"
) as f:
await f.write(
f"target_addr={target_addr:#x} yielded reply from {pot_broadcast:#x}; could also be late answer triggered by previous address!\n"
)

resp = TesterPresentResponse.parse_static(data)
logger.notice(f"[🥳] It cannot get nicer: {target_addr:#x} responded: {resp}")
responsive_targets.append(current_target)
async with aiofiles.open(
self.artifacts_dir.joinpath("4_responsive_targets.txt"), "a"
) as f:
await f.write(f"{current_target}\n")
if self.db_handler is not None:
await self.db_handler.insert_discovery_result(current_target)
# Here is where "reader_task" comes into play, which monitors incoming DiagnosticMessage replies

except DoIPNegativeAckError as e:
if e.nack_code == DiagnosticMessageNegativeAckCodes.UnknownTargetAddress:
Expand All @@ -397,14 +369,6 @@ async def enumerate_target_addresses(
await f.write(f"{target_addr:#x}: {e.nack_code.name}\n")
continue

except TimeoutError: # This triggers when DoIP ACK but no UDS reply
logger.info(f"[🙊] Presumably no active ECU on target address {target_addr:#x}")
async with aiofiles.open(
self.artifacts_dir.joinpath("5_unresponsive_targets.txt"), "a"
) as f:
await f.write(f"{current_target}\n")
continue

except ConnectionError as e:
# Whenever this triggers, but sometimes connections are closed not by us
logger.warning(f"[🫦] Sexy, but unexpected: {target_addr:#x} triggered {e!r}")
Expand All @@ -421,9 +385,6 @@ async def enumerate_target_addresses(
)
continue

await conn.close()
await asyncio.sleep(tcp_connect_delay)

logger.notice(
f"[⚔️] It's dangerous to test alone, take one of these {len(known_targets)} known targets:"
)
Expand All @@ -439,23 +400,73 @@ async def enumerate_target_addresses(
for item in unreachable_targets:
logger.notice(item)

logger.notice(
f"[💰] For even more profit, try one of the {len(responsive_targets)} targets that actually responded:"
)
for item in responsive_targets:
logger.notice(item)
logger.info("[😴] Giving all ECUs a chance to reply...")
await asyncio.sleep(TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000)
reader_task.cancel()
await reader_task
await conn.close()

logger.notice(
f"[🧭] Check out the content of the log files at {self.artifacts_dir} as well!"
)

async def task_read_diagnostic_messages(
self, conn: DoIPConnection, target_template: str
) -> None:
responsive_targets = []
potential_broadcasts = []
try:
while True:
_, payload = await conn.read_diag_request_raw()
(source_address, data) = (payload.SourceAddress, payload.UserData)
current_target = target_template.format(source_address)

resp = TesterPresentResponse.parse_static(data)
logger.notice(f"[🥇] It cannot get nicer: {source_address:#x} responded: {resp}")

if current_target not in responsive_targets:
responsive_targets.append(current_target)
async with aiofiles.open(
self.artifacts_dir.joinpath("4_responsive_targets.txt"), "a"
) as f:
await f.write(f"{current_target}\n")
if self.db_handler is not None:
await self.db_handler.insert_discovery_result(current_target)

if (
abs(source_address - conn.target_addr) > 10
and conn.target_addr not in potential_broadcasts
):
potential_broadcasts.append(conn.target_addr)

except asyncio.CancelledError:
logger.debug("Diagnostic Message reader got cancelled")
except Exception as e:
logger.error(f"Diagnostic Message reader died with {e!r}")

finally:
logger.notice(
f"[💰] For even more profit, try one of the {len(responsive_targets)} targets that actually responded:"
)
for item in responsive_targets:
logger.notice(item)

# TODO: the discoverer could be extended to search for and validate the broadcast address(es) automatically
if len(potential_broadcasts) > 0:
logger.notice(
"[🕵️] You could also investigate these target addresses that appear to be near broadcasts:"
)
for target_addr in potential_broadcasts:
logger.notice(f"[🤑] B-B-B-B-B-B-BROADCAST around TargetAddress {target_addr:#x}!")

async def create_DoIP_conn(
self,
hostname: str,
port: int,
routing_activation_type: int,
src_addr: int,
target_addr: int,
fast_queue: bool = False,
) -> DoIPConnection: # noqa: PLR0913
while True:
try: # Ensure that connections do not remain in TIME_WAIT
Expand All @@ -466,33 +477,19 @@ async def create_DoIP_conn(
target_addr,
so_linger=True,
protocol_version=self.protocol_version,
separate_diagnostic_message_queue=fast_queue,
)
logger.info("[📫] Sending RoutingActivationRequest")
await conn.write_routing_activation_request(
RoutingActivationRequestTypes(routing_activation_type)
)
except Exception as e: # TODO: this probably is too broad
logger.warning(
f"[\U0001fae8] Got me some good errors when it should be working (dis an infinite loop): {e!r}"
f"[🫨] Got me some good errors when it should be working (dis an infinite loop): {e!r}"
)
continue
return conn

async def read_diag_request_custom(self, conn: DoIPConnection) -> tuple[int | None, bytes]:
while True:
hdr, payload = await conn.read_frame()
if not isinstance(payload, DiagnosticMessage):
logger.warning(f"[🧨] Unexpected DoIP message: {hdr} {payload}")
return (None, b"")
if payload.SourceAddress != conn.target_addr:
return (payload.SourceAddress, payload.UserData)
if payload.TargetAddress != conn.src_addr:
logger.warning(
f"[🤌] You talking to me?! Unexpected DoIP target address: {payload.TargetAddress:#04x}"
)
continue
return (None, payload.UserData)

@staticmethod
def get_broadcast_addrs() -> list[AddrInfo]:
out = []
Expand Down
18 changes: 17 additions & 1 deletion src/gallia/transports/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,12 +470,15 @@ def __init__( # noqa: PLR0913
src_addr: int,
target_addr: int,
protocol_version: int,
separate_diagnostic_message_queue: bool = False,
):
self.reader = reader
self.writer = writer
self.src_addr = src_addr
self.target_addr = target_addr
self.protocol_version = protocol_version
self.separate_diagnostic_message_queue = separate_diagnostic_message_queue
self._diagnostic_message_queue: asyncio.Queue[DoIPDiagFrame] = asyncio.Queue()
self._read_queue: asyncio.Queue[DoIPFrame] = asyncio.Queue()
self._read_task = asyncio.create_task(self._read_worker())
self._read_task.add_done_callback(
Expand All @@ -494,6 +497,7 @@ async def connect( # noqa: PLR0913
target_addr: int,
so_linger: bool = False,
protocol_version: int = ProtocolVersions.ISO_13400_2_2019,
separate_diagnostic_message_queue: bool = False,
) -> Self:
reader, writer = await asyncio.open_connection(host, port)

Expand All @@ -508,7 +512,14 @@ async def connect( # noqa: PLR0913
sock = writer.get_extra_info("socket")
sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0))

return cls(reader, writer, src_addr, target_addr, protocol_version)
return cls(
reader,
writer,
src_addr,
target_addr,
protocol_version,
separate_diagnostic_message_queue,
)

async def _read_frame(self) -> DoIPFrame | tuple[None, None]:
# Header is fixed size 8 byte.
Expand Down Expand Up @@ -547,6 +558,9 @@ async def _read_worker(self) -> None:
if hdr.PayloadType == PayloadTypes.AliveCheckRequest:
await self.write_alive_check_response()
continue
if isinstance(data, DiagnosticMessage) and self.separate_diagnostic_message_queue:
await self._diagnostic_message_queue.put((hdr, data))
continue
await self._read_queue.put((hdr, data))
except asyncio.CancelledError:
logger.debug("DoIP read worker got cancelled")
Expand All @@ -573,6 +587,8 @@ async def read_frame(self) -> DoIPFrame:
async def read_diag_request_raw(self) -> DoIPDiagFrame:
unexpected_packets: list[tuple[Any, Any]] = []
while True:
if self.separate_diagnostic_message_queue:
return await self._diagnostic_message_queue.get()
hdr, payload = await self.read_frame()
if not isinstance(payload, DiagnosticMessage):
logger.warning(f"expected DoIP DiagnosticMessage, instead got: {hdr} {payload}")
Expand Down

0 comments on commit ed1b203

Please sign in to comment.