Skip to content

Commit

Permalink
tests: on_target: extract values from uart log
Browse files Browse the repository at this point in the history
Signed-off-by: David Dilner <[email protected]>
  • Loading branch information
dilner committed Nov 13, 2024
1 parent 534c8cb commit 376d41b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 17 deletions.
13 changes: 5 additions & 8 deletions tests/on_target/tests/test_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pytest
import time
import os
import re
from utils.flash_tools import flash_device, reset_device
import sys
sys.path.append(os.getcwd())
Expand Down Expand Up @@ -46,13 +45,11 @@ def run_location(t91x_board, hex_file, location_method):
t91x_board.uart.wait_for_str(patterns_location, timeout=180)

# Extract coordinates from UART output
location_pattern = re.compile( \
values = t91x_board.uart.extract_value( \
r"location_event_handler: Got location: lat: ([\d.]+), lon: ([\d.]+), acc: ([\d.]+), method: ([\d.]+)")
match = location_pattern.search(t91x_board.uart.log)
assert match, "Failed to extract coordinates from UART output"
lat, lon, acc, method = match.groups()
assert abs(float(lat) - 61.5) < 2 and abs(float(lon) - 10.5) < 1, \
f"Coordinates ({lat}, {lon}) are not in the same part of the world as the test servers."
assert values
lat, lon, acc, method = values
assert abs(float(lat) - 61.5) < 2 and abs(float(lon) - 10.5) < 1
method = int(method)
expected_method = 4 if location_method == "Wi-Fi" else 1
assert method == expected_method, f"Unexpected location method used {method}, expected: {expected_method}"
assert method == expected_method
51 changes: 51 additions & 0 deletions tests/on_target/utils/test_uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,54 @@ def test_wait_ordered_7_none(time_sleep, time_time):
with pytest.raises(AssertionError) as ex_info:
u.wait_for_str_ordered(["abc", "def", "ghi", "jkl"], timeout=2)
assert "abc missing" in str(ex_info.value)

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_8_get_current_size(time_sleep, time_time):
u = mocked_uart()
u.log = "foo123\nbar123\nbaz123\n"
current_log_size = u.wait_for_str(["bar"], timeout=3)
assert current_log_size == len(u.log)

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_8_get_current_size(time_sleep, time_time):
u = mocked_uart()
u.log = "foo123\nbar123\nbaz123\n"
current_log_size = u.wait_for_str(["bar"], timeout=3)
assert current_log_size == len(u.log)

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_9_start_position(time_sleep, time_time):
u = mocked_uart()
u.log = "foo123\nbar123\nbaz123\n"
bar_pos = u.log.find("bar")
with pytest.raises(AssertionError) as ex_info:
u.wait_for_str(["baz", "foo", "bar"], timeout=3, start_pos=bar_pos)
assert "foo" in str(ex_info.value)
u.wait_for_str(["bar", "baz"], timeout=3, start_pos=bar_pos)

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_10_extract_one_value(time_sleep, time_time):
u = mocked_uart()
u.log = "foo: 123.45\n bar: 23.45 \n baz: 0.1234\n"
assert float(u.extract_value(r"bar: (\d.+)")[0]) == 23.45


@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_10_extract_three_values(time_sleep, time_time):
u = mocked_uart()
u.log = "foo: 123.45 bar: 23.45 baz: 0.1234"
extrated_values = u.extract_value(r"foo: (\d.+) bar: (\d.+) baz: (\d.+)")
assert [float(x) for x in extrated_values] == [123.45, 23.45, 0.1234]

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_11_extract_missing_values(time_sleep, time_time):
u = mocked_uart()
u.log = "foo: 123.45 baz: 23.45 bar: 0.1234"
extrated_values = u.extract_value(r"foo: (\d.+) foo: (\d.+) foo: (\d.+)")
assert extrated_values is None
33 changes: 24 additions & 9 deletions tests/on_target/utils/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import queue
import os
import sys
import re
sys.path.append(os.getcwd())
from utils.logger import get_logger
from typing import Union
Expand Down Expand Up @@ -155,12 +156,15 @@ def start(self, timeout: int = DEFAULT_UART_TIMEOUT) -> None:
self._selfdestruct = threading.Timer(timeout , self.selfdestruct)
self._selfdestruct.start()

def get_size(self) -> int:
# Return the current size of the log
return len(self.log)

def wait_for_str_ordered(
self, msgs: list, error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT
) -> None:
start = time.time()
start_t = time.time()
while True:
time.sleep(1)
missing = None
pos = 0
for msg in msgs:
Expand All @@ -172,26 +176,34 @@ def wait_for_str_ordered(
pos += 1
else:
break
if start + timeout < time.time():
if start_t + timeout < time.time():
raise AssertionError(
f"{missing if missing else msgs} missing in UART log in the expected order. {error_msg}\nUart log:\n{self.log}"
)
if self._evt.is_set():
raise RuntimeError(f"Uart thread stopped, log:\n{self.log}")
time.sleep(1)

def wait_for_str(self, msgs: Union[str, list], error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT) -> None:
start = time.time()
def wait_for_str(self, msgs: Union[str, list], error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT, start_pos: int = 0) -> None:
start_t = time.time()
msgs = msgs if isinstance(msgs, (list, tuple)) else [msgs]

while True:
time.sleep(1)
missing_msgs = [x for x in msgs if x not in self.log]
missing_msgs = [x for x in msgs if x not in self.log[start_pos:]]
if missing_msgs == []:
break
if start + timeout < time.time():
return self.get_size()
if start_t + timeout < time.time():
raise AssertionError(f"{missing_msgs} missing in UART log. {error_msg}\nUart log:\n{self.log}")
if self._evt.is_set():
raise RuntimeError(f"Uart thread stopped, log:\n{self.log}")
time.sleep(1)

def extract_value(self, pattern: str, start_pos: int = 0):
pattern = re.compile(pattern)
match = pattern.search(self.log[start_pos:])
if match:
return match.groups()
return None

class UartBinary(Uart):
def __init__(
Expand Down Expand Up @@ -246,3 +258,6 @@ def save_to_file(self, filename: str) -> None:
return
with open(filename, "wb" ) as f:
f.write(self.data)

def get_size(self) -> int:
return len(self.data)

0 comments on commit 376d41b

Please sign in to comment.