diff --git a/release/files_common b/release/files_common index 58585c95b40939..1e16cb9439cf34 100644 --- a/release/files_common +++ b/release/files_common @@ -226,7 +226,6 @@ system/ubloxd/*.cc selfdrive/locationd/__init__.py selfdrive/locationd/SConscript selfdrive/locationd/.gitignore -selfdrive/locationd/laikad.py selfdrive/locationd/locationd.h selfdrive/locationd/locationd.cc selfdrive/locationd/paramsd.py diff --git a/selfdrive/locationd/laikad.py b/selfdrive/locationd/laikad.py deleted file mode 100755 index 10c0e4a7b0882b..00000000000000 --- a/selfdrive/locationd/laikad.py +++ /dev/null @@ -1,468 +0,0 @@ -#!/usr/bin/env python3 -import math -import os -import time -import shutil -from collections import defaultdict -from concurrent.futures import Future, ProcessPoolExecutor -from enum import IntEnum -from typing import List, Optional, Dict, Any - -import numpy as np - -from cereal import log, messaging -from openpilot.common.params import Params, put_nonblocking -from laika import AstroDog -from laika.constants import SECS_IN_HR, SECS_IN_MIN -from laika.downloader import DownloadFailed -from laika.ephemeris import EphemerisType, GPSEphemeris, GLONASSEphemeris, ephemeris_structs, parse_qcom_ephem -from laika.gps_time import GPSTime -from laika.helpers import ConstellationId, get_sv_id -from laika.raw_gnss import GNSSMeasurement, correct_measurements, process_measurements, read_raw_ublox -from laika.raw_gnss import gps_time_from_qcom_report, get_measurements_from_qcom_reports -from laika.opt import calc_pos_fix, get_posfix_sympy_fun, calc_vel_fix, get_velfix_sympy_func -from openpilot.selfdrive.locationd.models.constants import GENERATED_DIR, ObservationKind -from openpilot.selfdrive.locationd.models.gnss_kf import GNSSKalman -from openpilot.selfdrive.locationd.models.gnss_kf import States as GStates -from openpilot.system.hardware.hw import Paths -from openpilot.system.swaglog import cloudlog - -MAX_TIME_GAP = 10 -EPHEMERIS_CACHE = 'LaikadEphemerisV3' -CACHE_VERSION = 0.2 -POS_FIX_RESIDUAL_THRESHOLD = 100.0 - - -class LogEphemerisType(IntEnum): - nav = 0 - nasaUltraRapid = 1 - glonassIacUltraRapid = 2 - qcom = 3 - -class EphemerisSource(IntEnum): - gnssChip = 0 - internet = 1 - cache = 2 - unknown = 3 - -def get_log_eph_type(ephem): - if ephem.eph_type == EphemerisType.NAV: - source_type = LogEphemerisType.nav - elif ephem.eph_type == EphemerisType.QCOM_POLY: - source_type = LogEphemerisType.qcom - else: - assert ephem.file_epoch is not None - file_src = ephem.file_source - if file_src == 'igu': # example nasa: '2214/igu22144_00.sp3.Z' - source_type = LogEphemerisType.nasaUltraRapid - elif file_src == 'Sta': # example nasa: '22166/ultra/Stark_1D_22061518.sp3' - source_type = LogEphemerisType.glonassIacUltraRapid - else: - raise Exception(f"Didn't expect file source {file_src}") - return source_type - -def get_log_eph_source(ephem): - if ephem.file_name == 'qcom' or ephem.file_name == 'ublox': - source = EphemerisSource.gnssChip - elif ephem.file_name == EPHEMERIS_CACHE: - source = EphemerisSource.cache - else: - source = EphemerisSource.internet - return source - - -class Laikad: - def __init__(self, valid_const=(ConstellationId.GPS, ConstellationId.GLONASS), auto_fetch_navs=True, auto_update=False, - valid_ephem_types=(EphemerisType.NAV, EphemerisType.QCOM_POLY), - save_ephemeris=False, use_qcom=False): - """ - valid_const: GNSS constellation which can be used - auto_fetch_navs: If true fetch navs from internet when needed - auto_update: If true download AstroDog will download all files needed. This can be ephemeris or correction data like ionosphere. - valid_ephem_types: Valid ephemeris types to be used by AstroDog - save_ephemeris: If true saves and loads nav and orbit ephemeris to cache. - """ - self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, - clear_old_ephemeris=True, cache_dir=Paths.download_cache_root()) - self.gnss_kf = GNSSKalman(GENERATED_DIR, cython=True, erratic_clock=use_qcom) - - self.auto_fetch_navs = auto_fetch_navs - self.orbit_fetch_executor: Optional[ProcessPoolExecutor] = None - self.orbit_fetch_future: Optional[Future] = None - - self.last_report_time = GPSTime(0, 0) - self.last_fetch_navs_t = GPSTime(0, 0) - self.last_cached_t = GPSTime(0, 0) - self.save_ephemeris = save_ephemeris - self.load_cache() - - self.posfix_functions = {constellation: get_posfix_sympy_fun(constellation) for constellation in (ConstellationId.GPS, ConstellationId.GLONASS)} - self.velfix_function = get_velfix_sympy_func() - self.last_fix_pos = None - self.last_fix_t = None - self.use_qcom = use_qcom - self.first_log_time = None - self.ttff = -1 - self.measurement_lag = 0.630 if self.use_qcom else 0.095 - - # qcom specific stuff - self.qcom_reports_received = 4 - self.qcom_reports = [] - - def load_cache(self): - if not self.save_ephemeris: - return - - cache_bytes = Params().get(EPHEMERIS_CACHE) - if not cache_bytes: - return - - nav_dict = {} - try: - with ephemeris_structs.EphemerisCache.from_bytes(cache_bytes) as ephem_cache: - glonass_navs = [GLONASSEphemeris(data_struct, file_name=EPHEMERIS_CACHE) for data_struct in ephem_cache.glonassEphemerides] - gps_navs = [GPSEphemeris(data_struct, file_name=EPHEMERIS_CACHE) for data_struct in ephem_cache.gpsEphemerides] - for e in sum([glonass_navs, gps_navs], []): - if e.prn not in nav_dict: - nav_dict[e.prn] = [] - nav_dict[e.prn].append(e) - self.astro_dog.add_navs(nav_dict) - except Exception: - cloudlog.exception("Error parsing cache") - cloudlog.debug( - f"Loaded navs ({sum([len(nav_dict[prn]) for prn in nav_dict.keys()])}). Unique orbit and nav sats: {list(nav_dict.keys())} ") - - def cache_ephemeris(self): - - if self.save_ephemeris and (self.last_report_time - self.last_cached_t > SECS_IN_MIN): - nav_list: List = sum([v for k,v in self.astro_dog.navs.items()], []) - #TODO this only saves currently valid ephems, when we download future ephems we should save them too - valid_navs = [e for e in nav_list if e.valid(self.last_report_time)] - if len(valid_navs) > 0: - ephem_cache = ephemeris_structs.EphemerisCache(glonassEphemerides=[e.data for e in valid_navs if e.prn[0]=='R'], - gpsEphemerides=[e.data for e in valid_navs if e.prn[0]=='G']) - put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes()) - cloudlog.debug("Cache saved") - self.last_cached_t = self.last_report_time - - def create_ephem_statuses(self): - ephemeris_statuses = [] - eph_list: List = sum([v for k,v in self.astro_dog.navs.items()], []) + sum([v for k,v in self.astro_dog.qcom_polys.items()], []) - for eph in eph_list: - status = log.GnssMeasurements.EphemerisStatus.new_message() - status.constellationId = ConstellationId.from_rinex_char(eph.prn[0]).value - status.svId = get_sv_id(eph.prn) - status.type = get_log_eph_type(eph).value - status.source = get_log_eph_source(eph).value - status.tow = eph.epoch.tow - status.gpsWeek = eph.epoch.week - ephemeris_statuses.append(status) - return ephemeris_statuses - - - def get_lsq_fix(self, t, measurements): - if self.last_fix_t is None or abs(self.last_fix_t - t) > 0: - min_measurements = 5 if any(p.constellation_id == ConstellationId.GLONASS for p in measurements) else 4 - - position_solution, pr_residuals, pos_std = calc_pos_fix(measurements, self.posfix_functions, min_measurements=min_measurements) - if len(position_solution) < 3: - return None - position_estimate = position_solution[:3] - position_std = pos_std[:3] - - velocity_solution, prr_residuals, vel_std = calc_vel_fix(measurements, position_estimate, self.velfix_function, min_measurements=min_measurements) - if len(velocity_solution) < 3: - return None - velocity_estimate = velocity_solution[:3] - velocity_std = vel_std[:3] - - return position_estimate, position_std, velocity_estimate, velocity_std - - def is_good_report(self, gnss_msg): - if gnss_msg.which() in ['drMeasurementReport', 'measurementReport'] and self.use_qcom: - # TODO: Understand and use remaining unknown constellations - try: - if gnss_msg.which() == 'drMeasurementReport': - constellation_id = ConstellationId.from_qcom_source(gnss_msg.drMeasurementReport.source) - else: - constellation_id = ConstellationId.from_qcom_source(gnss_msg.measurementReport.source) - good_constellation = constellation_id in [ConstellationId.GPS, ConstellationId.SBAS, ConstellationId.GLONASS] - report_time = gps_time_from_qcom_report(gnss_msg) - except NotImplementedError: - return False - # Garbage timestamps with week > 32767 are sometimes sent by module. - # This is an issue with gpsTime and GLONASS time. - good_week = report_time.week < np.iinfo(np.int16).max - return good_constellation and good_week - elif gnss_msg.which() == 'measurementReport' and not self.use_qcom: - return True - else: - return False - - def read_report(self, gnss_msg): - if self.use_qcom: - # QCOM reports are per constellation, so we need to aggregate them - # Additionally, the pseudoranges are broken in the measurementReports - # and the doppler filteredSpeed is broken in the drMeasurementReports - report_time = gps_time_from_qcom_report(gnss_msg) - if report_time - self.last_report_time == 0: - self.qcom_reports.append(gnss_msg) - self.last_report_time = report_time - elif report_time - self.last_report_time > 0: - self.qcom_reports_received = max(1, len(self.qcom_reports)) - self.qcom_reports = [gnss_msg] - self.last_report_time = report_time - else: - # Sometimes DR reports get sent one iteration late (1second), they need to be ignored - cloudlog.warning(f"Received report with time {report_time} before last report time {self.last_report_time}") - - if len(self.qcom_reports) == self.qcom_reports_received: - new_meas = get_measurements_from_qcom_reports(self.qcom_reports) - else: - new_meas = [] - else: - report = gnss_msg.measurementReport - self.last_report_time = GPSTime(report.gpsWeek, report.rcvTow) - new_meas = read_raw_ublox(report) - return self.last_report_time, new_meas - - def is_ephemeris(self, gnss_msg): - if self.use_qcom: - return gnss_msg.which() == 'drSvPoly' - else: - return gnss_msg.which() in ('ephemeris', 'glonassEphemeris') - - def read_ephemeris(self, gnss_msg): - if self.use_qcom: - try: - ephem = parse_qcom_ephem(gnss_msg.drSvPoly) - self.astro_dog.add_qcom_polys({ephem.prn: [ephem]}) - except Exception: - cloudlog.exception("Error parsing qcom svPoly ephemeris from qcom module") - return - - else: - if gnss_msg.which() == 'ephemeris': - data_struct = ephemeris_structs.Ephemeris.new_message(**gnss_msg.ephemeris.to_dict()) - try: - ephem = GPSEphemeris(data_struct, file_name='ublox') - except Exception: - cloudlog.exception("Error parsing GPS ephemeris from ublox") - return - elif gnss_msg.which() == 'glonassEphemeris': - data_struct = ephemeris_structs.GlonassEphemeris.new_message(**gnss_msg.glonassEphemeris.to_dict()) - try: - ephem = GLONASSEphemeris(data_struct, file_name='ublox') - except Exception: - cloudlog.exception("Error parsing GLONASS ephemeris from ublox") - return - else: - cloudlog.error(f"Unsupported ephemeris type: {gnss_msg.which()}") - return - self.astro_dog.add_navs({ephem.prn: [ephem]}) - self.cache_ephemeris() - - def process_report(self, new_meas, t): - # Filter measurements with unexpected pseudoranges for GPS and GLONASS satellites - new_meas = [m for m in new_meas if 1e7 < m.observables['C1C'] < 3e7] - processed_measurements = process_measurements(new_meas, self.astro_dog) - if self.last_fix_pos is not None: - est_pos = self.last_fix_pos - correct_delay = True - else: - est_pos = self.gnss_kf.x[GStates.ECEF_POS].tolist() - correct_delay = False - corrected_measurements = correct_measurements(processed_measurements, est_pos, self.astro_dog, correct_delay=correct_delay) - # If many measurements weren't corrected, position may be garbage, so reset - if len(processed_measurements) >= 8 and len(corrected_measurements) < 5: - cloudlog.error("Didn't correct enough measurements, resetting estimate position") - self.last_fix_pos = None - self.last_fix_t = None - return corrected_measurements - - def calc_fix(self, t, measurements): - instant_fix = self.get_lsq_fix(t, measurements) - if instant_fix is None: - return None - else: - position_estimate, position_std, velocity_estimate, velocity_std = instant_fix - self.last_fix_t = t - self.last_fix_pos = position_estimate - self.lat_fix_pos_std = position_std - return position_estimate, position_std, velocity_estimate, velocity_std - - def process_gnss_msg(self, gnss_msg, gnss_mono_time: int, block=False): - out_msg = messaging.new_message("gnssMeasurements") - t = gnss_mono_time * 1e-9 - msg_dict: Dict[str, Any] = {"measTime": gnss_mono_time - int(1e9 * self.measurement_lag)} - if self.first_log_time is None: - self.first_log_time = 1e-9 * gnss_mono_time - if self.is_ephemeris(gnss_msg): - self.read_ephemeris(gnss_msg) - elif self.is_good_report(gnss_msg): - report_t, new_meas = self.read_report(gnss_msg) - if report_t.week > 0: - if self.auto_fetch_navs: - self.fetch_navs(report_t, block) - - corrected_measurements = self.process_report(new_meas, t) - msg_dict['correctedMeasurements'] = [create_measurement_msg(m) for m in corrected_measurements] - - fix = self.calc_fix(t, corrected_measurements) - measurement_msg = log.LiveLocationKalman.Measurement.new_message - if fix is not None: - position_estimate, position_std, velocity_estimate, velocity_std = fix - if self.ttff <= 0: - self.ttff = max(1e-3, t - self.first_log_time) - msg_dict["positionECEF"] = measurement_msg(value=position_estimate, std=position_std.tolist(), valid=bool(self.last_fix_t == t)) - msg_dict["velocityECEF"] = measurement_msg(value=velocity_estimate, std=velocity_std.tolist(), valid=bool(self.last_fix_t == t)) - - self.update_localizer(self.last_fix_pos, t, corrected_measurements) - P_diag = self.gnss_kf.P.diagonal() - kf_valid = all(self.kf_valid(t)) - msg_dict["kalmanPositionECEF"] = measurement_msg(value=self.gnss_kf.x[GStates.ECEF_POS].tolist(), - std=np.sqrt(P_diag[GStates.ECEF_POS]).tolist(), - valid=kf_valid) - msg_dict["kalmanVelocityECEF"] = measurement_msg(value=self.gnss_kf.x[GStates.ECEF_VELOCITY].tolist(), - std=np.sqrt(P_diag[GStates.ECEF_VELOCITY]).tolist(), - valid=kf_valid) - - msg_dict['gpsWeek'] = self.last_report_time.week - msg_dict['gpsTimeOfWeek'] = self.last_report_time.tow - msg_dict['timeToFirstFix'] = self.ttff - msg_dict['ephemerisStatuses'] = self.create_ephem_statuses() - out_msg.gnssMeasurements = msg_dict - return out_msg - - def update_localizer(self, est_pos, t: float, measurements: List[GNSSMeasurement]): - # Check time and outputs are valid - valid = self.kf_valid(t) - if not all(valid): - if not valid[0]: # Filter not initialized - pass - elif not valid[1]: - cloudlog.error("Time gap of over 10s detected, gnss kalman reset") - elif not valid[2]: - cloudlog.error("Gnss kalman filter state is nan") - if est_pos is not None and len(est_pos) > 0: - cloudlog.info(f"Reset kalman filter with {est_pos}") - self.init_gnss_localizer(est_pos) - else: - return - if len(measurements) > 0: - kf_add_observations(self.gnss_kf, t, measurements) - else: - # Ensure gnss filter is updated even with no new measurements - self.gnss_kf.predict(t) - - def kf_valid(self, t: float) -> List[bool]: - filter_time = self.gnss_kf.filter.get_filter_time() - return [not math.isnan(filter_time), - abs(t - filter_time) < MAX_TIME_GAP, - all(np.isfinite(self.gnss_kf.x[GStates.ECEF_POS]))] - - def init_gnss_localizer(self, est_pos): - x_initial, p_initial_diag = np.copy(GNSSKalman.x_initial), np.copy(np.diagonal(GNSSKalman.P_initial)) - x_initial[GStates.ECEF_POS] = est_pos - p_initial_diag[GStates.ECEF_POS] = 1000 ** 2 - self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag) - - def fetch_navs(self, t: GPSTime, block): - # Download new navs if 1 hour of navs data left - if t + SECS_IN_HR not in self.astro_dog.navs_fetched_times and (abs(t - self.last_fetch_navs_t) > SECS_IN_MIN): - astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types, self.astro_dog.cache_dir - ret = None - - if block: # Used for testing purposes - ret = get_orbit_data(t, *astro_dog_vars) - elif self.orbit_fetch_future is None: - self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1) - self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars) - elif self.orbit_fetch_future.done(): - ret = self.orbit_fetch_future.result() - self.orbit_fetch_executor = self.orbit_fetch_future = None - - if ret is not None: - if ret[0] is None: - self.last_fetch_navs_t = ret[2] - else: - self.astro_dog.navs, self.astro_dog.navs_fetched_times, self.last_fetch_navs_t = ret - self.cache_ephemeris() - - -def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types, cache_dir): - astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, cache_dir=cache_dir) - cloudlog.info(f"Start to download/parse navs for time {t.as_datetime()}") - start_time = time.monotonic() - try: - astro_dog.get_navs(t) - cloudlog.info(f"Done parsing navs. Took {time.monotonic() - start_time:.1f}s") - cloudlog.debug(f"Downloaded navs ({sum([len(v) for v in astro_dog.navs])}): {list(astro_dog.navs.keys())}" + - f"With time range: {[f'{start.as_datetime()}, {end.as_datetime()}' for (start,end) in astro_dog.orbit_fetched_times._ranges]}") - return astro_dog.navs, astro_dog.navs_fetched_times, t - except (DownloadFailed, RuntimeError, ValueError, IOError) as e: - cloudlog.warning(f"No orbit data found or parsing failure: {e}") - return None, None, t - - -def create_measurement_msg(meas: GNSSMeasurement): - c = log.GnssMeasurements.CorrectedMeasurement.new_message() - c.constellationId = meas.constellation_id.value - c.svId = meas.sv_id - c.glonassFrequency = meas.glonass_freq if meas.constellation_id == ConstellationId.GLONASS else 0 - c.pseudorange = float(meas.observables_final['C1C']) - c.pseudorangeStd = float(meas.observables_std['C1C']) - c.pseudorangeRate = float(meas.observables_final['D1C']) - c.pseudorangeRateStd = float(meas.observables_std['D1C']) - c.satPos = meas.sat_pos_final.tolist() - c.satVel = meas.sat_vel.tolist() - c.satVel = meas.sat_vel.tolist() - return c - -def kf_add_observations(gnss_kf: GNSSKalman, t: float, measurements: List[GNSSMeasurement]): - ekf_data = defaultdict(list) - for m in measurements: - m_arr = m.as_array() - if m.constellation_id == ConstellationId.GPS: - ekf_data[ObservationKind.PSEUDORANGE_GPS].append(m_arr) - elif m.constellation_id == ConstellationId.GLONASS: - ekf_data[ObservationKind.PSEUDORANGE_GLONASS].append(m_arr) - ekf_data[ObservationKind.PSEUDORANGE_RATE_GPS] = ekf_data[ObservationKind.PSEUDORANGE_GPS] - ekf_data[ObservationKind.PSEUDORANGE_RATE_GLONASS] = ekf_data[ObservationKind.PSEUDORANGE_GLONASS] - for kind, data in ekf_data.items(): - if len(data) > 0: - gnss_kf.predict_and_observe(t, kind, data) - - -def clear_tmp_cache(): - if os.path.exists(Paths.download_cache_root()): - shutil.rmtree(Paths.download_cache_root()) - os.mkdir(Paths.download_cache_root()) - - -def main(): - #clear_tmp_cache() - - use_qcom = not Params().get_bool("UbloxAvailable") - if use_qcom: - raw_name = "qcomGnss" - else: - raw_name = "ubloxGnss" - raw_gnss_sock = messaging.sub_sock(raw_name, conflate=False) - pm = messaging.PubMaster(['gnssMeasurements']) - - # disable until set as main gps source, to better analyze startup time - # TODO ensure low CPU usage before enabling - use_internet = False # "LAIKAD_NO_INTERNET" not in os.environ - - replay = "REPLAY" in os.environ - laikad = Laikad(save_ephemeris=not replay, auto_fetch_navs=use_internet, use_qcom=use_qcom) - - while True: - for in_msg in messaging.drain_sock(raw_gnss_sock, wait_for_one=True): - out_msg = laikad.process_gnss_msg(getattr(in_msg, raw_name), in_msg.logMonoTime, replay) - pm.send('gnssMeasurements', out_msg) - - -if __name__ == "__main__": - main() diff --git a/selfdrive/locationd/test/test_laikad.py b/selfdrive/locationd/test/test_laikad.py deleted file mode 100755 index db8802ed364997..00000000000000 --- a/selfdrive/locationd/test/test_laikad.py +++ /dev/null @@ -1,319 +0,0 @@ -#!/usr/bin/env python3 -import pytest -import time -import unittest -from cereal import log -from openpilot.common.params import Params -from datetime import datetime -from unittest import mock - - -from laika.constants import SECS_IN_DAY -from laika.downloader import DownloadFailed -from laika.ephemeris import EphemerisType -from laika.gps_time import GPSTime -from laika.helpers import ConstellationId -from laika.raw_gnss import GNSSMeasurement, read_raw_ublox, read_raw_qcom -from openpilot.selfdrive.locationd.laikad import EPHEMERIS_CACHE, Laikad -from openpilot.selfdrive.test.openpilotci import get_url -from openpilot.tools.lib.logreader import LogReader - -from openpilot.selfdrive.test.process_replay.process_replay import get_process_config, replay_process - -GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC = GPSTime.from_datetime(datetime(2022, month=1, day=29, hour=12)) -UBLOX_TEST_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36" -QCOM_TEST_ROUTE = "616dc83ca1f7f11e|2023-07-11--10-52-31" - - -def get_log_ublox(): - logs = LogReader(get_url(UBLOX_TEST_ROUTE, 0)) - - ublox_cfg = get_process_config("ubloxd") - all_logs = replay_process(ublox_cfg, logs) - low_gnss = [] - for m in all_logs: - if m.which() != "ubloxGnss" or m.ubloxGnss.which() != 'measurementReport': - continue - - MAX_MEAS = 7 - if m.ubloxGnss.measurementReport.numMeas > MAX_MEAS: - mb = log.Event.new_message(ubloxGnss=m.ubloxGnss.to_dict()) - mb.logMonoTime = m.logMonoTime - mb.ubloxGnss.measurementReport.numMeas = MAX_MEAS - mb.ubloxGnss.measurementReport.measurements = list(m.ubloxGnss.measurementReport.measurements)[:MAX_MEAS] - mb.ubloxGnss.measurementReport.measurements[0].pseudorange += 1000 - low_gnss.append(mb.as_reader()) - else: - low_gnss.append(m) - return all_logs, low_gnss - - -def get_log_qcom(): - logs = LogReader(get_url(QCOM_TEST_ROUTE, 0)) - all_logs = [m for m in logs if m.which() == 'qcomGnss'] - return all_logs - - -def verify_messages(lr, laikad, return_one_success=False): - good_msgs = [] - for m in lr: - if m.which() == 'ubloxGnss': - gnss_msg = m.ubloxGnss - elif m.which() == 'qcomGnss': - gnss_msg = m.qcomGnss - else: - continue - msg = laikad.process_gnss_msg(gnss_msg, m.logMonoTime, block=True) - if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0: - good_msgs.append(msg) - if return_one_success: - return msg - return good_msgs - - -def get_first_gps_time(logs): - for m in logs: - if m.which() == 'ubloxGnss': - if m.ubloxGnss.which == 'measurementReport': - new_meas = read_raw_ublox(m.ubloxGnss.measurementReport) - if len(new_meas) > 0: - return new_meas[0].recv_time - elif m.which() == "qcomGnss": - if m.qcomGnss.which == 'measurementReport': - new_meas = read_raw_qcom(m.qcomGnss.measurementReport) - if len(new_meas) > 0: - return new_meas[0].recv_time - - -def get_measurement_mock(gpstime, sat_ephemeris): - meas = GNSSMeasurement(ConstellationId.GPS, 1, gpstime.week, gpstime.tow, {'C1C': 0., 'D1C': 0.}, {'C1C': 0., 'D1C': 0.}) - # Fake measurement being processed - meas.observables_final = meas.observables - meas.sat_ephemeris = sat_ephemeris - return meas - - -@pytest.mark.slow -class TestLaikad(unittest.TestCase): - - @classmethod - def setUpClass(cls): - logs, low_gnss = get_log_ublox() - cls.logs = logs - cls.low_gnss = low_gnss - cls.logs_qcom = get_log_qcom() - first_gps_time = get_first_gps_time(logs) - cls.first_gps_time = first_gps_time - - def setUp(self): - Params().remove(EPHEMERIS_CACHE) - - def test_fetch_navs_non_blocking(self): - gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1)) - laikad = Laikad() - laikad.fetch_navs(gpstime, block=False) - laikad.orbit_fetch_future.result(30) - - # Get results and save orbits to laikad: - laikad.fetch_navs(gpstime, block=False) - ephem = laikad.astro_dog.navs['G01'][0] - self.assertIsNotNone(ephem) - - laikad.fetch_navs(gpstime+2*SECS_IN_DAY, block=False) - laikad.orbit_fetch_future.result(30) - # Get results and save orbits to laikad: - laikad.fetch_navs(gpstime + 2 * SECS_IN_DAY, block=False) - - ephem2 = laikad.astro_dog.navs['G01'][0] - self.assertIsNotNone(ephem) - self.assertNotEqual(ephem, ephem2) - - - def test_fetch_navs_with_wrong_clocks(self): - laikad = Laikad() - - def check_has_navs(): - self.assertGreater(len(laikad.astro_dog.navs), 0) - ephem = laikad.astro_dog.navs['G01'][0] - self.assertIsNotNone(ephem) - real_current_time = GPSTime.from_datetime(datetime(2021, month=3, day=1)) - wrong_future_clock_time = real_current_time + SECS_IN_DAY - - laikad.fetch_navs(wrong_future_clock_time, block=True) - check_has_navs() - self.assertEqual(laikad.last_fetch_navs_t, wrong_future_clock_time) - - # Test fetching orbits with earlier time - assert real_current_time < laikad.last_fetch_navs_t - - laikad.astro_dog.orbits = {} - laikad.fetch_navs(real_current_time, block=True) - check_has_navs() - self.assertEqual(laikad.last_fetch_navs_t, real_current_time) - - def test_laika_online(self): - laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT) - correct_msgs = verify_messages(self.logs, laikad) - - correct_msgs_expected = 560 - self.assertEqual(correct_msgs_expected, len(correct_msgs)) - self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid])) - - def test_kf_becomes_valid(self): - laikad = Laikad(auto_update=False) - m = self.logs[0] - self.assertFalse(all(laikad.kf_valid(m.logMonoTime * 1e-9))) - kf_valid = False - for m in self.logs: - if m.which() != 'ubloxGnss': - continue - - laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True) - kf_valid = all(laikad.kf_valid(m.logMonoTime * 1e-9)) - if kf_valid: - break - self.assertTrue(kf_valid) - - def test_laika_online_nav_only(self): - for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True): - laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.NAV, use_qcom=use_qcom) - # Disable fetch_orbits to test NAV only - correct_msgs = verify_messages(logs, laikad) - correct_msgs_expected = 55 if use_qcom else 560 - valid_fix_expected = 55 if use_qcom else 560 - - self.assertEqual(correct_msgs_expected, len(correct_msgs)) - self.assertEqual(valid_fix_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid])) - - @mock.patch('laika.downloader.download_and_cache_file') - def test_laika_offline(self, downloader_mock): - downloader_mock.side_effect = DownloadFailed("Mock download failed") - laikad = Laikad(auto_update=False) - laikad.fetch_navs(GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC, block=True) - - @mock.patch('laika.downloader.download_and_cache_file') - def test_download_failed_russian_source(self, downloader_mock): - downloader_mock.side_effect = DownloadFailed - laikad = Laikad(auto_update=False) - correct_msgs = verify_messages(self.logs, laikad) - expected_msgs = 376 - self.assertEqual(expected_msgs, len(correct_msgs)) - self.assertEqual(expected_msgs, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid])) - - def test_laika_get_orbits(self): - laikad = Laikad(auto_update=False) - # Pretend process has loaded the orbits on startup by using the time of the first gps message. - laikad.fetch_navs(self.first_gps_time, block=True) - self.dict_has_values(laikad.astro_dog.navs) - - @unittest.skip("Use to debug live data") - def test_laika_get_navs_now(self): - laikad = Laikad(auto_update=False) - laikad.fetch_navs(GPSTime.from_datetime(datetime.utcnow()), block=True) - prn = "G01" - self.assertGreater(len(laikad.astro_dog.navs[prn]), 0) - prn = "R01" - self.assertGreater(len(laikad.astro_dog.navs[prn]), 0) - - def test_get_navs_in_process(self): - for auto_fetch_navs in [True, False]: - for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True): - laikad = Laikad(auto_update=False, use_qcom=use_qcom, auto_fetch_navs=auto_fetch_navs) - has_navs = False - has_fix = False - seen_chip_eph = False - seen_internet_eph = False - - for m in logs: - if m.which() != 'ubloxGnss' and m.which() != 'qcomGnss': - continue - - gnss_msg = m.qcomGnss if use_qcom else m.ubloxGnss - out_msg = laikad.process_gnss_msg(gnss_msg, m.logMonoTime, block=False) - if laikad.orbit_fetch_future is not None: - laikad.orbit_fetch_future.result() - vals = laikad.astro_dog.navs.values() - has_navs = len(vals) > 0 and max([len(v) for v in vals]) > 0 - vals = laikad.astro_dog.qcom_polys.values() - has_polys = len(vals) > 0 and max([len(v) for v in vals]) > 0 - has_fix = has_fix or out_msg.gnssMeasurements.positionECEF.valid - if len(out_msg.gnssMeasurements.ephemerisStatuses): - seen_chip_eph = seen_chip_eph or any(x.source == 'gnssChip' for x in out_msg.gnssMeasurements.ephemerisStatuses) - seen_internet_eph = seen_internet_eph or any(x.source == 'internet' for x in out_msg.gnssMeasurements.ephemerisStatuses) - - self.assertTrue(has_navs or has_polys) - self.assertTrue(has_fix) - self.assertTrue(seen_chip_eph or auto_fetch_navs) - self.assertTrue(seen_internet_eph or not auto_fetch_navs) - self.assertEqual(len(laikad.astro_dog.navs_fetched_times._ranges), 0) - self.assertEqual(None, laikad.orbit_fetch_future) - - def test_cache(self): - use_qcom = True - for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True): - Params().remove(EPHEMERIS_CACHE) - laikad = Laikad(auto_update=True, save_ephemeris=True, use_qcom=use_qcom) - def wait_for_cache(): - max_time = 2 - while Params().get(EPHEMERIS_CACHE) is None: - time.sleep(0.1) - max_time -= 0.1 - if max_time < 0: - self.fail("Cache has not been written after 2 seconds") - - # Test cache with no ephemeris - laikad.last_report_time = GPSTime(1,0) - laikad.cache_ephemeris() - if Params().get(EPHEMERIS_CACHE) is not None: - self.fail("Cache should not have been written without valid ephem") - - #laikad.astro_dog.get_navs(self.first_gps_time) - msg = verify_messages(logs, laikad, return_one_success=True) - laikad.cache_ephemeris() - wait_for_cache() - - # Check both nav and orbits separate - laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV, - save_ephemeris=True, use_qcom=use_qcom, auto_fetch_navs=False) - # Verify navs are loaded from cache - self.dict_has_values(laikad.astro_dog.navs) - # Verify cache is working for only nav by running a segment - msg = verify_messages(logs, laikad, return_one_success=True) - self.assertTrue(len(msg.gnssMeasurements.ephemerisStatuses)) - self.assertTrue(any(x.source=='cache' for x in msg.gnssMeasurements.ephemerisStatuses)) - self.assertIsNotNone(msg) - - #TODO test cache with only orbits - #with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method: - # # Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently - # laikad.astro_dog.orbit_fetched_times = TimeRangeHolder() - # laikad.fetch_navs(self.first_gps_time, block=False) - # mock_method.assert_not_called() - - # # Verify cache is working for only orbits by running a segment - # laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True) - # msg = verify_messages(self.logs, laikad, return_one_success=True) - # self.assertIsNotNone(msg) - # # Verify orbit data is not downloaded - # mock_method.assert_not_called() - #break - - def test_low_gnss_meas(self): - cnt = 0 - laikad = Laikad() - for m in self.low_gnss: - msg = laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True) - if msg is None: - continue - gm = msg.gnssMeasurements - if len(gm.correctedMeasurements) != 0 and gm.positionECEF.valid: - cnt += 1 - self.assertEqual(cnt, 560) - - def dict_has_values(self, dct): - self.assertGreater(len(dct), 0) - self.assertGreater(min([len(v) for v in dct.values()]), 0) - - -if __name__ == "__main__": - unittest.main() diff --git a/selfdrive/manager/process_config.py b/selfdrive/manager/process_config.py index 883351cdbc6308..320958d3440515 100644 --- a/selfdrive/manager/process_config.py +++ b/selfdrive/manager/process_config.py @@ -68,7 +68,6 @@ def only_offroad(started, params, CP: car.CarParams) -> bool: PythonProcess("controlsd", "selfdrive.controls.controlsd", only_onroad), PythonProcess("deleter", "system.loggerd.deleter", always_run), PythonProcess("dmonitoringd", "selfdrive.monitoring.dmonitoringd", driverview, enabled=(not PC or WEBCAM)), - PythonProcess("laikad", "selfdrive.locationd.laikad", only_onroad), PythonProcess("rawgpsd", "system.sensord.rawgps.rawgpsd", qcomgps, enabled=TICI), PythonProcess("navd", "selfdrive.navd.navd", only_onroad), PythonProcess("pandad", "selfdrive.boardd.pandad", always_run), diff --git a/selfdrive/test/process_replay/README.md b/selfdrive/test/process_replay/README.md index 6992d46f1dc9a1..bec3eb9016f814 100644 --- a/selfdrive/test/process_replay/README.md +++ b/selfdrive/test/process_replay/README.md @@ -17,7 +17,6 @@ Currently the following processes are tested: * locationd * paramsd * ubloxd -* laikad * torqued ### Usage @@ -71,7 +70,7 @@ lr = LogReader(...) output_logs = replay_process_with_name('locationd', lr) # or list of names -output_logs = replay_process_with_name(['ubloxd', 'locationd', 'laikad'], lr) +output_logs = replay_process_with_name(['ubloxd', 'locationd'], lr) ``` Supported processes: @@ -83,7 +82,6 @@ Supported processes: * locationd * paramsd * ubloxd -* laikad * torqued * modeld * dmonitoringmodeld diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 9aab54199102ec..0cb8ac9cac237a 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -448,16 +448,6 @@ def controlsd_config_callback(params, cfg, lr): params.put("ReplayControlsState", controlsState.as_builder().to_bytes()) -def laikad_config_pubsub_callback(params, cfg, lr): - ublox = params.get_bool("UbloxAvailable") - main_key = "ubloxGnss" if ublox else "qcomGnss" - sub_keys = ({"qcomGnss", } if ublox else {"ubloxGnss", }) - - cfg.pubs = set(cfg.pubs) - sub_keys - cfg.main_pub = main_key - cfg.main_pub_drained = True - - def locationd_config_pubsub_callback(params, cfg, lr): ublox = params.get_bool("UbloxAvailable") sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", }) @@ -543,17 +533,6 @@ def locationd_config_pubsub_callback(params, cfg, lr): subs=["ubloxGnss", "gpsLocationExternal"], ignore=["logMonoTime"], ), - ProcessConfig( - proc_name="laikad", - pubs=["ubloxGnss", "qcomGnss"], - subs=["gnssMeasurements"], - ignore=["logMonoTime"], - config_callback=laikad_config_pubsub_callback, - tolerance=NUMPY_TOLERANCE, - processing_time=0.002, - timeout=60*10, # first messages are blocked on internet assistance - main_pub="ubloxGnss", # config_callback will switch this to qcom if needed - ), ProcessConfig( proc_name="torqued", pubs=["liveLocationKalman", "carState", "carControl"], diff --git a/selfdrive/test/process_replay/test_fuzzy.py b/selfdrive/test/process_replay/test_fuzzy.py index f3f48d0159f7e7..4b8629fc7d42f1 100755 --- a/selfdrive/test/process_replay/test_fuzzy.py +++ b/selfdrive/test/process_replay/test_fuzzy.py @@ -13,7 +13,7 @@ # These processes currently fail because of unrealistic data breaking assumptions # that openpilot makes causing error with NaN, inf, int size, array indexing ... # TODO: Make each one testable -NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'laikad', 'dmonitoringmodeld', 'modeld'] +NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'dmonitoringmodeld', 'modeld'] TEST_CASES = [(cfg.proc_name, copy.deepcopy(cfg)) for cfg in pr.CONFIGS if cfg.proc_name not in NOT_TESTED] class TestFuzzProcesses(unittest.TestCase): diff --git a/selfdrive/test/profiling/profiler.py b/selfdrive/test/profiling/profiler.py index 1d380ba5eb718a..6d0cc204c55e86 100755 --- a/selfdrive/test/profiling/profiler.py +++ b/selfdrive/test/profiling/profiler.py @@ -83,14 +83,12 @@ def run(sm, pm, can_sock): from openpilot.selfdrive.controls.radard import radard_thread from openpilot.selfdrive.locationd.paramsd import main as paramsd_thread from openpilot.selfdrive.controls.plannerd import main as plannerd_thread - from openpilot.selfdrive.locationd.laikad import main as laikad_thread procs = { 'radard': radard_thread, 'controlsd': controlsd_thread, 'paramsd': paramsd_thread, 'plannerd': plannerd_thread, - 'laikad': laikad_thread, } proc = sys.argv[1] diff --git a/selfdrive/test/test_onroad.py b/selfdrive/test/test_onroad.py index 0e1293ebc3386e..e4f6bccf6cd71f 100755 --- a/selfdrive/test/test_onroad.py +++ b/selfdrive/test/test_onroad.py @@ -56,7 +56,6 @@ "selfdrive.navd.navd": 0.4, "system.loggerd.uploader": 3.0, "system.loggerd.deleter": 0.1, - "selfdrive.locationd.laikad": (1.0, 80.0), # TODO: better GPS setup in testing closet } PROCS.update({ diff --git a/selfdrive/test/update_ci_routes.py b/selfdrive/test/update_ci_routes.py index 8157066334b81e..326cb4fa568416 100755 --- a/selfdrive/test/update_ci_routes.py +++ b/selfdrive/test/update_ci_routes.py @@ -8,7 +8,6 @@ from tqdm import tqdm from openpilot.selfdrive.car.tests.routes import routes as test_car_models_routes -from openpilot.selfdrive.locationd.test.test_laikad import UBLOX_TEST_ROUTE, QCOM_TEST_ROUTE from openpilot.selfdrive.test.process_replay.test_processes import source_segments as replay_segments from openpilot.selfdrive.test.openpilotci import (DATA_CI_ACCOUNT, DATA_CI_ACCOUNT_URL, DATA_CI_CONTAINER, get_azure_credential, get_container_sas) @@ -90,7 +89,6 @@ def sync_to_ci_public(route: str) -> bool: if not len(to_sync): # sync routes from the car tests routes and process replay - to_sync.extend([UBLOX_TEST_ROUTE, QCOM_TEST_ROUTE]) to_sync.extend([rt.route for rt in test_car_models_routes]) to_sync.extend([s[1].rsplit('--', 1)[0] for s in replay_segments]) diff --git a/system/sensord/rawgps/test_rawgps.py b/system/sensord/rawgps/test_rawgps.py index 02777d5a1d34ee..2a0ee656e2a5e7 100755 --- a/system/sensord/rawgps/test_rawgps.py +++ b/system/sensord/rawgps/test_rawgps.py @@ -5,13 +5,11 @@ import datetime import unittest import subprocess -import numpy as np import cereal.messaging as messaging from openpilot.system.hardware import TICI from openpilot.system.sensord.rawgps.rawgpsd import at_cmd, wait_for_modem from openpilot.selfdrive.manager.process_config import managed_processes -from openpilot.common.transformations.coordinates import ecef_from_geodetic GOOD_SIGNAL = bool(int(os.getenv("GOOD_SIGNAL", '0'))) @@ -123,22 +121,5 @@ def test_late_assistance_loading(self): managed_processes['rawgpsd'].stop() self.check_assistance(True) - @unittest.skipIf(not GOOD_SIGNAL, "No good GPS signal") - def test_fix(self): - managed_processes['rawgpsd'].start() - managed_processes['laikad'].start() - assert self._wait_for_output(60) - assert self.sm.updated['qcomGnss'] - assert self.sm.updated['gpsLocation'] - assert self.sm['gpsLocation'].flags == 1 - module_fix = ecef_from_geodetic([self.sm['gpsLocation'].latitude, - self.sm['gpsLocation'].longitude, - self.sm['gpsLocation'].altitude]) - assert self.sm['gnssMeasurements'].positionECEF.valid - total_diff = np.array(self.sm['gnssMeasurements'].positionECEF.value) - module_fix - self.assertLess(np.linalg.norm(total_diff), 100) - managed_processes['laikad'].stop() - managed_processes['rawgpsd'].stop() - if __name__ == "__main__": unittest.main(failfast=True) diff --git a/tools/gpstest/fuzzy_testing.py b/tools/gpstest/fuzzy_testing.py index 3bad2770cd294b..532fd2d34cc5e0 100755 --- a/tools/gpstest/fuzzy_testing.py +++ b/tools/gpstest/fuzzy_testing.py @@ -33,8 +33,7 @@ def run_remote_checker(lat, lon, alt, duration, ip_addr): return False, None, None matched, log, info = con.root.exposed_run_checker(lat, lon, alt, - timeout=duration, - use_laikad=True) + timeout=duration) con.close() # TODO: might wanna fetch more logs here con = None @@ -43,7 +42,7 @@ def run_remote_checker(lat, lon, alt, duration, ip_addr): stats = defaultdict(int) # type: ignore -keys = ['success', 'failed', 'ublox_fail', 'laikad_fail', 'proc_crash', 'checker_crash'] +keys = ['success', 'failed', 'ublox_fail', 'proc_crash', 'checker_crash'] def print_report(): print("\nFuzzy testing report summary:") @@ -62,10 +61,7 @@ def update_stats(matched, log, info): if log == "CHECKER CRASHED": stats['checker_crash'] += 1 if log == "TIMEOUT": - if "LAIKAD" in info: - stats['laikad_fail'] += 1 - else: # "UBLOX" in info - stats['ublox_fail'] += 1 + stats['ublox_fail'] += 1 def main(ip_addr, continuous_mode, timeout, pos): diff --git a/tools/gpstest/rpc_server.py b/tools/gpstest/rpc_server.py deleted file mode 100644 index 798237142d7511..00000000000000 --- a/tools/gpstest/rpc_server.py +++ /dev/null @@ -1,185 +0,0 @@ -import os -import time -import shutil -from datetime import datetime -from collections import defaultdict -from openpilot.system.hardware.hw import Paths - -import rpyc -from rpyc.utils.server import ThreadedServer - -#from openpilot.common.params import Params -import cereal.messaging as messaging -from openpilot.selfdrive.manager.process_config import managed_processes -from laika.lib.coordinates import ecef2geodetic - -DELTA = 0.001 -ALT_DELTA = 30 -MATCH_NUM = 10 -REPORT_STATS = 10 - -EPHEM_CACHE = "/data/params/d/LaikadEphemerisV3" - -SERVER_LOG_FILE = "/tmp/fuzzy_server.log" -server_log = open(SERVER_LOG_FILE, "w+") - -def slog(msg): - server_log.write(f"{datetime.now().strftime('%H:%M:%S.%f')} | {msg}\n") - server_log.flush() - -def handle_laikad(msg): - if not hasattr(msg, 'correctedMeasurements'): - return None - - num_corr = len(msg.correctedMeasurements) - pos_ecef = msg.positionECEF.value - pos_geo = [] - if len(pos_ecef) > 0: - pos_geo = ecef2geodetic(pos_ecef) - - pos_std = msg.positionECEF.std - pos_valid = msg.positionECEF.valid - - slog(f"{num_corr} {pos_geo} {pos_ecef} {pos_std} {pos_valid}") - return pos_geo, (num_corr, pos_geo, list(pos_ecef), list(msg.positionECEF.std)) - -hw_msgs = 0 -ephem_msgs: dict = defaultdict(int) -def handle_ublox(msg): - global hw_msgs - - d = msg.to_dict() - - if 'hwStatus2' in d: - hw_msgs += 1 - - if 'ephemeris' in d: - ephem_msgs[msg.ephemeris.svId] += 1 - - num_meas = None - if 'measurementReport' in d: - num_meas = msg.measurementReport.numMeas - - return [hw_msgs, ephem_msgs, num_meas] - - -def start_procs(procs): - for p in procs: - managed_processes[p].start() - time.sleep(1) - -def kill_procs(procs, no_retry=False): - for p in procs: - managed_processes[p].stop() - time.sleep(1) - - if not no_retry: - for p in procs: - mp = managed_processes[p].proc - if mp is not None and mp.is_alive(): - managed_processes[p].stop() - time.sleep(3) - -def check_alive_procs(procs): - for p in procs: - mp = managed_processes[p].proc - if mp is None or not mp.is_alive(): - return False, p - return True, None - - -class RemoteCheckerService(rpyc.Service): - def on_connect(self, conn): - pass - - def on_disconnect(self, conn): - #kill_procs(self.procs, no_retry=False) - # this execution is delayed, it will kill the next run of laikad - # TODO: add polling to wait for everything is killed - pass - - def run_checker(self, slat, slon, salt, sockets, procs, timeout): - global hw_msgs, ephem_msgs - hw_msgs = 0 - ephem_msgs = defaultdict(int) - - slog(f"Run test: {slat} {slon} {salt}") - - # quectel_mod = Params().get_bool("UbloxAvailable") - - match_cnt = 0 - msg_cnt = 0 - stats_laikad = [] - stats_ublox = [] - - self.procs = procs - start_procs(procs) - sm = messaging.SubMaster(sockets) - - start_time = time.monotonic() - while True: - sm.update() - - if sm.updated['ubloxGnss']: - stats_ublox.append(handle_ublox(sm['ubloxGnss'])) - - if sm.updated['gnssMeasurements']: - pos_geo, stats = handle_laikad(sm['gnssMeasurements']) - if pos_geo is None or len(pos_geo) == 0: - continue - - match = all(abs(g-s) < DELTA for g,s in zip(pos_geo[:2], [slat, slon], strict=True)) - match &= abs(pos_geo[2] - salt) < ALT_DELTA - if match: - match_cnt += 1 - if match_cnt >= MATCH_NUM: - return True, "MATCH", f"After: {round(time.monotonic() - start_time, 4)}" - - # keep some stats for error reporting - stats_laikad.append(stats) - - if (msg_cnt % 10) == 0: - a, p = check_alive_procs(procs) - if not a: - return False, "PROC CRASH", f"{p}" - msg_cnt += 1 - - if (time.monotonic() - start_time) > timeout: - h = f"LAIKAD: {stats_laikad[-REPORT_STATS:]}" - if len(h) == 0: - h = f"UBLOX: {stats_ublox[-REPORT_STATS:]}" - return False, "TIMEOUT", h - - - def exposed_run_checker(self, slat, slon, salt, timeout=180, use_laikad=True): - try: - procs = [] - sockets = [] - - if use_laikad: - procs.append("laikad") # pigeond, ubloxd # might wanna keep them running - sockets += ['ubloxGnss', 'gnssMeasurements'] - - if os.path.exists(EPHEM_CACHE): - os.remove(EPHEM_CACHE) - shutil.rmtree(Paths.download_cache_root(), ignore_errors=True) - - ret = self.run_checker(slat, slon, salt, sockets, procs, timeout) - kill_procs(procs) - return ret - - except Exception as e: - # always make sure processes get killed - kill_procs(procs) - return False, "CHECKER CRASHED", f"{str(e)}" - - - def exposed_kill_procs(self): - kill_procs(self.procs, no_retry=True) - - -if __name__ == "__main__": - print(f"Sever Log written to: {SERVER_LOG_FILE}") - t = ThreadedServer(RemoteCheckerService, port=18861) - t.start() - diff --git a/tools/gpstest/test_laikad.py b/tools/gpstest/test_laikad.py deleted file mode 100755 index abb47d4bfce717..00000000000000 --- a/tools/gpstest/test_laikad.py +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/env python3 -import os -import time -import unittest - -import cereal.messaging as messaging -import openpilot.system.sensord.pigeond as pd - -from openpilot.common.params import Params -from openpilot.system.hardware import TICI -from openpilot.selfdrive.manager.process_config import managed_processes -from openpilot.selfdrive.test.helpers import with_processes - - -def wait_for_location(sm, timeout, con=10): - cons_meas = 0 - start_time = time.monotonic() - while (time.monotonic() - start_time) < timeout: - sm.update() - if not sm.updated["gnssMeasurements"]: - continue - - msg = sm["gnssMeasurements"] - cons_meas = (cons_meas + 1) if 'positionECEF' in msg.to_dict() else 0 - if cons_meas >= con: - return True - return False - - -class TestLaikad(unittest.TestCase): - @classmethod - def setUpClass(self): - if not TICI: - raise unittest.SkipTest - - ublox_available = Params().get_bool("UbloxAvailable") - if not ublox_available: - raise unittest.SkipTest - - def setUp(self): - # ensure laikad cold start - Params().remove("LaikadEphemerisV3") - os.environ["LAIKAD_NO_INTERNET"] = "1" - managed_processes['laikad'].start() - - def tearDown(self): - managed_processes['laikad'].stop() - - - @with_processes(['pigeond', 'ubloxd']) - def test_laikad_cold_start(self): - time.sleep(5) - - start_time = time.monotonic() - sm = messaging.SubMaster(["gnssMeasurements"]) - - success = wait_for_location(sm, 60*2, con=10) - duration = time.monotonic() - start_time - - assert success, "Waiting for location timed out (2min)!" - assert duration < 60, f"Received Location {duration}!" - - - @with_processes(['ubloxd']) - def test_laikad_ublox_reset_start(self): - time.sleep(2) - - pigeon, pm = pd.create_pigeon() - pd.init_baudrate(pigeon) - assert pigeon.reset_device(), "Could not reset device!" - - laikad_sock = messaging.sub_sock("gnssMeasurements", timeout=0.1) - ublox_gnss_sock = messaging.sub_sock("ubloxGnss", timeout=0.1) - - pd.init_baudrate(pigeon) - pd.initialize_pigeon(pigeon) - pd.run_receiving(pigeon, pm, 180) - - ublox_msgs = messaging.drain_sock(ublox_gnss_sock) - laikad_msgs = messaging.drain_sock(laikad_sock) - - gps_ephem_cnt = 0 - glonass_ephem_cnt = 0 - for um in ublox_msgs: - if um.ubloxGnss.which() == 'ephemeris': - gps_ephem_cnt += 1 - elif um.ubloxGnss.which() == 'glonassEphemeris': - glonass_ephem_cnt += 1 - - assert gps_ephem_cnt > 0, "NO gps ephemeris collected!" - assert glonass_ephem_cnt > 0, "NO glonass ephemeris collected!" - - pos_meas = 0 - duration = -1 - for lm in laikad_msgs: - pos_meas = (pos_meas + 1) if 'positionECEF' in lm.gnssMeasurements.to_dict() else 0 - if pos_meas > 5: - duration = (lm.logMonoTime - laikad_msgs[0].logMonoTime)*1e-9 - break - - assert pos_meas > 5, "NOT enough positions at end of read!" - assert duration < 120, "Laikad took too long to get a Position!" - -if __name__ == "__main__": - unittest.main()