Skip to content

Commit

Permalink
tests: on_target: add fota resumption test
Browse files Browse the repository at this point in the history
Adding app fota resumption test.

Signed-off-by: Giacomo Dematteis <[email protected]>
  • Loading branch information
DematteisGiacomo committed Aug 28, 2024
1 parent f48cbb4 commit 0ed53c6
Showing 1 changed file with 48 additions and 19 deletions.
67 changes: 48 additions & 19 deletions tests/on_target/tests/test_fota.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,67 @@
WAIT_FOR_FOTA_AVAILABLE = 60 * 2
APP_FOTA_TIMEOUT = 60 * 10


def post_job_and_trigger_fota(t91x_board, bundle_id, fota_type):
if bundle_id:
t91x_board.fota.post_fota_job(device_id=DEVICE_ID, fingerprint=FOTADEVICE_FINGERPRINT, bundle_id=bundle_id)
t91x_board.uart.flush()

start = time.time()
while time.time() - start < WAIT_FOR_FOTA_AVAILABLE:
time.sleep(20)
try:
t91x_board.uart.write("zbus button_press\r\n")
t91x_board.uart.wait_for_str("FOTA job received", timeout=30)
return
except AssertionError:
logger.debug(f"{fota_type} fota not available yet, trying again...")

raise RuntimeError(f"{fota_type} fota not available after {WAIT_FOR_FOTA_AVAILABLE} seconds")


def run_fota_resumption(t91x_board, fota_type):
logger.debug(f"Testing fota resumption on disconnect for {fota_type} fota")
t91x_board.uart.wait_for_str("bytes (50%)")

patterns_lte_offline = ["network: Network connectivity lost"]
patterns_lte_normal = ["network: Network connectivity established", "transport: Connected to Cloud"]

# LTE disconnect
t91x_board.uart.flush()
t91x_board.uart.write("lte offline\r\n")
t91x_board.uart.wait_for_str(patterns_lte_offline, timeout=20)

# LTE reconnect
t91x_board.uart.flush()
t91x_board.uart.write("lte normal\r\n")
t91x_board.uart.wait_for_str(patterns_lte_normal, timeout=120)

post_job_and_trigger_fota(t91x_board, None, fota_type) # We don't need bundle_id here

t91x_board.uart.wait_for_str("fota_download: Refuse fragment, restart with offset")
t91x_board.uart.wait_for_str("fota_download: Downloading from offset:")

@pytest.fixture
def run_fota_fixture(t91x_board, hex_file):
def _run_fota(bundleId, fota_type, fotatimeout=APP_FOTA_TIMEOUT):
def _run_fota(bundleId, fota_type, fotatimeout=APP_FOTA_TIMEOUT, test_fota_resumption=False):
flash_device(os.path.abspath(hex_file))
time.sleep(5)
t91x_board.uart.xfactoryreset()
reset_device()

t91x_board.uart.wait_for_str("Connected to Cloud")

t91x_board.fota.post_fota_job(device_id=DEVICE_ID, fingerprint=FOTADEVICE_FINGERPRINT, bundle_id=bundleId)

t91x_board.uart.flush()
start = time.time()
while time.time() - start < WAIT_FOR_FOTA_AVAILABLE:
time.sleep(20)
try:
t91x_board.uart.write("zbus button_press\r\n")
t91x_board.uart.wait_for_str("FOTA job received", timeout=30)
break
except AssertionError:
logger.debug(f"{fota_type} fota not available yet, trying again...")
continue
else:
raise RuntimeError(f"{fota_type} fota not available after {WAIT_FOR_FOTA_AVAILABLE} seconds")
post_job_and_trigger_fota(t91x_board, bundleId, fota_type)

if test_fota_resumption:
run_fota_resumption(t91x_board, fota_type)

t91x_board.uart.wait_for_str("FOTA download finished", timeout=fotatimeout)
if fota_type == "app":
t91x_board.uart.wait_for_str("App FOTA update confirmed")
elif fota_type == "delta":
t91x_board.uart.wait_for_str("Modem (delta) FOTA complete")
t91x_board.uart.wait_for_str("Connected to Cloud")
t91x_board.uart.wait_for_str("Connected to Cloud", "Failed to connect to Cloud after FOTA")
return _run_fota


Expand All @@ -71,7 +100,7 @@ def test_app_fota(t91x_board, hex_file, run_fota_fixture):

logger.debug(f"Latest bundle: {latest_bundle}")

run_fota_fixture(latest_bundle["bundleId"], "app")
run_fota_fixture(bundleId=latest_bundle["bundleId"], fota_type="app", test_fota_resumption=True)


@pytest.mark.dut1
Expand Down

0 comments on commit 0ed53c6

Please sign in to comment.