Skip to content

Commit

Permalink
feat(fr): Handle FlowControl messages for large messages
Browse files Browse the repository at this point in the history
Co-Authored-By: <[email protected]>
  • Loading branch information
rumpelsepp committed Oct 24, 2024
1 parent 2643ffc commit 620fb00
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions src/gallia/transports/flexray_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,19 @@ async def read_tp_frame(self) -> FlexRayTPFrame:
logger.trace("read FlexRayTPFrame %s", repr(frame))
return frame

def _require_fc_frame(self, block_size: int, read_bytes: int) -> bool:
# 6 bytes already read in first frame.
return ((read_bytes-6) & block_size) == 0

async def _send_flow_control_frame(self):
block_size = self.config.fc_block_size
fc_frame = FlexRayTPFlowControlFrame(
flag=FlexRayTPFlowControlFlag.CONTINUE_TO_SEND,
separation_time=self.config.fc_separation_time,
block_size=block_size,
)
await self.write_tp_frame(fc_frame)

async def _handle_fragmented(self, expected_len: int) -> bytes:
# 6 bytes already read in first frame.
# Headersize is 2 byte.
Expand All @@ -431,6 +444,10 @@ async def _handle_fragmented(self, expected_len: int) -> bytes:
while read_bytes < expected_len:
# Reordering is not implemented.
logger.debug(f"expected_len: {expected_len}; read_bytes: {read_bytes}")

if self._require_fc_frame(self.config.fc_block_size, read_bytes):
await self._send_flow_control_frame()

frame = await self.read_tp_frame()
if not isinstance(frame, FlexRayTPConsecutiveFrame):
raise RuntimeError(f"expected consecutive frame, got: {frame}")
Expand All @@ -455,15 +472,6 @@ async def read_unsafe(
case FlexRayTPSingleFrame():
return frame.data
case FlexRayTPFirstFrame():
fc_frame = FlexRayTPFlowControlFrame(
flag=FlexRayTPFlowControlFlag.CONTINUE_TO_SEND,
separation_time=self.config.fc_separation_time,
# TODO: send again after block_size number of frames is read.
# Maybe move sending the flow control frame into the
# _handle_fragmented() function and create a loop.
block_size=self.config.fc_block_size,
)
await self.write_tp_frame(fc_frame)
data = frame.data + await self._handle_fragmented(frame.size)
data = data[: frame.size]
logger.debug("read data: %s", data.hex())
Expand Down

0 comments on commit 620fb00

Please sign in to comment.