From d5f62cad9a7cbca9998e96c9175a66f7057df708 Mon Sep 17 00:00:00 2001 From: "Specht, Tobias" Date: Tue, 5 Jul 2022 08:45:57 +0200 Subject: [PATCH] fix typing --- src/gallia/analyzer/categorizer.py | 8 +++-- src/gallia/analyzer/exceptions.py | 4 +-- src/gallia/analyzer/operator.py | 54 ++++++++++------------------- src/gallia/analyzer/reporter.py | 6 ++-- src/gallia/analyzer/xl_generator.py | 18 +++++----- 5 files changed, 37 insertions(+), 53 deletions(-) diff --git a/src/gallia/analyzer/categorizer.py b/src/gallia/analyzer/categorizer.py index ae90e69bd..e7f312481 100644 --- a/src/gallia/analyzer/categorizer.py +++ b/src/gallia/analyzer/categorizer.py @@ -95,7 +95,7 @@ def categorize_iden( categorize failures for scan_identifier. """ try: - serv_vec = pd.unique(raw_df[ColNm.serv]) + serv_vec = np.unique(raw_df[ColNm.serv]) if not serv_vec.size == 1: self.log("more than one service in a run", True) return pd.DataFrame() @@ -124,7 +124,7 @@ def categorize_iden( return pd.DataFrame() return raw_df - def check_sess_alwd(self, serv: int, sess: int, op_mode: OpMode, ecu_mode) -> bool: + def check_sess_alwd(self, serv: int, sess: int, op_mode: OpMode, ecu_mode: int) -> bool: """ check if a certain diagnostic session is available or supported for a certain service at given analysis mode. @@ -149,7 +149,7 @@ def check_resp_alwd(self, serv: int, resp: int) -> bool: + self.iso_supp_err_for_all_vec.tolist() ) - def check_sbfn_alwd(self, serv: int, sbfn: int, op_mode: OpMode, ecu_mode) -> bool: + def check_sbfn_alwd(self, serv: int, sbfn: int, op_mode: OpMode, ecu_mode: int) -> bool: """ check if a certain sub-function is available or supported for a certain service at given analysis mode. @@ -275,6 +275,8 @@ def get_fail_iden( supp_serv_vec = self.supp_serv_ven_vec if op_mode == OpMode.ISO: supp_serv_vec = self.supp_serv_iso_vec + else: + raise RuntimeError(f'Unsupported op_mode: {op_mode}') cond_serv_supp = serv in supp_serv_vec cond_resp_alwd = self.check_resp_alwd(serv, resp) diff --git a/src/gallia/analyzer/exceptions.py b/src/gallia/analyzer/exceptions.py index 3b9ee6d9f..8d767a237 100644 --- a/src/gallia/analyzer/exceptions.py +++ b/src/gallia/analyzer/exceptions.py @@ -8,7 +8,7 @@ class EmptyTableException(Exception): exception class for empty table error """ - def __init__(self): + def __init__(self) -> None: super().__init__("Empty Table.") @@ -17,5 +17,5 @@ class ColumnMismatchException(Exception): exception class for column mismatch """ - def __init__(self): + def __init__(self) -> None: super().__init__("Columns Mismatch.") diff --git a/src/gallia/analyzer/operator.py b/src/gallia/analyzer/operator.py index 1d55d254e..0a71e4f01 100644 --- a/src/gallia/analyzer/operator.py +++ b/src/gallia/analyzer/operator.py @@ -3,9 +3,9 @@ """ import json from json.decoder import JSONDecodeError -from typing import Tuple from sqlite3 import OperationalError from itertools import chain +from typing import cast import numpy as np import pandas as pd from pandas.core.indexing import IndexingError @@ -35,8 +35,8 @@ def __init__(self, path: str = "", log_mode: LogMode = LogMode.STD_OUT): self.ref_ven_df = pd.DataFrame() self.supp_serv_ven_vec = np.array([]) self.sess_code_vec = np.array([]) - self.sess_code_dict = dict() - self.sess_name_dict = dict() + self.sess_code_dict: dict[int, str] = {} + self.sess_name_dict: dict[str, int] = {} self.load_all_dicts() if self.connect_db(): self.load_ref_iso() @@ -58,7 +58,7 @@ def get_runs(self) -> np.ndarray: get all a numpy array of all runs in the database. """ if self.load_meta(force=True): - return self.run_meta_df.index + return self.run_meta_df.index.to_numpy() return np.array([]) def get_scan_mode(self, run: int) -> ScanMode: @@ -88,10 +88,10 @@ def get_sid(self, run: int) -> int: return -1 raw_df = self.read_run_db(TblNm.iden, run) self.check_df(raw_df, TblStruct.iden) - serv_vec = pd.unique(raw_df[ColNm.serv]) + serv_vec = np.unique(raw_df[ColNm.serv]) if serv_vec.shape[0] > 1: self.log("A run has more than one Service ID.", True) - serv_ser = raw_df[ColNm.serv].mode() + serv_ser = raw_df[ColNm.serv].mode(dropna=True) if serv_ser.shape[0] > 1: self.log("A run has more than one most frequent Service ID.", True) except ( @@ -112,7 +112,7 @@ def get_ecu_mode(self, run: int) -> int: if not self.load_meta(): return -1 try: - ecu_mode = self.run_meta_df.loc[run, ColNm.ecu_mode] + ecu_mode = cast(int, self.run_meta_df.loc[run, ColNm.ecu_mode]) return ecu_mode except (KeyError, IndexingError, AttributeError) as exc: self.log("getting ECU mode failed", True, exc) @@ -135,7 +135,7 @@ def get_sess_lu(self) -> np.ndarray: try: lu_df = self.read_db(TblNm.ven_lu) self.check_df(lu_df, TblStruct.ven_lu) - sess_vec = pd.unique(lu_df[ColNm.sess]) + sess_vec = np.unique(lu_df[ColNm.sess]) except ( KeyError, IndexingError, @@ -173,36 +173,18 @@ def get_ref_df_from_json(self, path: str) -> pd.DataFrame: return pd.DataFrame() return ref_df - def get_dft_err_df(self, run: int) -> Tuple[pd.DataFrame, np.ndarray]: - """ - get data frame that shows most common error(default error) - for each diagnostic session regarding a run. - """ - try: - scan_mode = self.get_scan_mode(run) - if scan_mode == ScanMode.SERV: - raw_df = self.read_run_db(TblNm.serv, run) - self.check_df(raw_df, TblStruct.serv) - else: - raw_df = self.read_run_db(TblNm.iden, run) - self.check_df(raw_df, TblStruct.iden) - except (EmptyTableException, ColumnMismatchException) as exc: - self.log("getting default error data frame failed", True, exc) - return pd.DataFrame() - return self.get_dft_err_df_from_raw(raw_df) - def get_dft_err_df_from_raw(self, raw_df: pd.DataFrame) -> pd.DataFrame: """ get summarized data frame that shows most common error(default error) for each diagnostic session from raw data frame. """ try: - sess_vec = pd.unique(raw_df[ColNm.sess]) + sess_vec = np.unique(raw_df[ColNm.sess]) dft_err_df = pd.DataFrame([], index=[ColNm.dft], columns=sess_vec) for sess in sess_vec: cond = raw_df[ColNm.sess] == sess dft_err_df.loc[ColNm.dft, sess] = raw_df.loc[cond, ColNm.resp].mode()[0] - dft_err_df.attrs[ColNm.serv] = list(pd.unique(raw_df[ColNm.serv])) + dft_err_df.attrs[ColNm.serv] = list(np.unique(raw_df[ColNm.serv])) except ( KeyError, IndexingError, @@ -224,7 +206,7 @@ def get_pos_res(self, search_id: int) -> str: FROM "{TblNm.scan_result}" WHERE "{ColNm.id}" = {str(search_id)}; """ res_df = self.get_df_by_query(res_sql) - resp = res_df.iloc[0, 0] + resp = cast(str, res_df.iloc[0, 0]) except (KeyError, IndexingError, AttributeError) as exc: self.log("getting positive response failed", True, exc) return "" @@ -281,7 +263,7 @@ def load_ven_lu(self, force: bool = False, num_modes: int = NUM_ECU_MODES) -> bo try: lu_df = self.read_db(TblNm.ven_lu) self.check_df(lu_df, TblStruct.ven_lu) - supp_serv_vec = np.sort(pd.unique(lu_df[ColNm.serv])) + supp_serv_vec = np.sort(np.unique(lu_df[ColNm.serv])) mode_vec = np.arange(num_modes) ven_lu_dict = {} self.num_modes = 0 @@ -295,21 +277,21 @@ def load_ven_lu(self, force: bool = False, num_modes: int = NUM_ECU_MODES) -> bo for serv in supp_serv_vec: sess_ls = list( np.sort( - pd.unique( + np.unique( loi_df.loc[loi_df[ColNm.serv] == serv, ColNm.sess] ) ) ) sbfn_ls = list( np.sort( - pd.unique( + np.unique( loi_df.loc[loi_df[ColNm.serv] == serv, ColNm.sbfn] ) ) ) iden_ls = list( np.sort( - pd.unique( + np.unique( loi_df.loc[ loi_df[ColNm.serv] == serv, ColNm.iden, @@ -323,7 +305,7 @@ def load_ven_lu(self, force: bool = False, num_modes: int = NUM_ECU_MODES) -> bo ) ven_lu_dict[mode] = ref_df.T ven_lu_df = pd.concat(ven_lu_dict.values(), axis=1, keys=ven_lu_dict.keys()) - self.ref_ven_df: pd.DataFrame = ven_lu_df + self.ref_ven_df = ven_lu_df self.supp_serv_ven_vec = np.sort(np.array(ven_lu_df.index)) except ( KeyError, @@ -413,7 +395,7 @@ def load_lu_iden(self, serv: int, ecu_mode: int) -> bool: (raw_df[ColNm.serv] == serv) & (raw_df[ColNm.ecu_mode] == ecu_mode) ].copy() self.lu_iden_df = pd.DataFrame( - pd.unique( + np.unique( list( zip( serv_df[ColNm.sess], @@ -503,7 +485,7 @@ def prepare_alwd_sess_boot( ven_lu_df[ColNm.combi] = list( zip(ven_lu_df[ColNm.serv], ven_lu_df[ColNm.sess], ven_lu_df[ColNm.boot]) ) - entries_vec = pd.unique(ven_lu_df[ColNm.combi]) + entries_vec = np.unique(ven_lu_df[ColNm.combi]) for entry in entries_vec: pair_ls.append((entry[0], entry[1], entry[2])) pair_df = pd.DataFrame( diff --git a/src/gallia/analyzer/reporter.py b/src/gallia/analyzer/reporter.py index de05b31ec..4166a0867 100644 --- a/src/gallia/analyzer/reporter.py +++ b/src/gallia/analyzer/reporter.py @@ -306,7 +306,7 @@ def load_sid_oi_from_df(self, raw_df: pd.DataFrame, ecu_mode: int = -1) -> bool: ) if ecu_mode != -1: cond_abn &= raw_df[ColNm.ecu_mode] == ecu_mode - self.abn_serv_vec = np.sort(pd.unique(raw_df.loc[cond_abn, ColNm.serv])) + self.abn_serv_vec = np.sort(np.unique(raw_df.loc[cond_abn, ColNm.serv])) except (KeyError, IndexingError, AttributeError) as exc: self.log("loading services of interest from data frame failed", True, exc) return False @@ -331,7 +331,7 @@ def load_iden_oi_from_df(self, raw_df: pd.DataFrame, ecu_mode: int = -1) -> bool load identifiers of interest from input raw data frame. """ try: - serv_vec = np.sort(pd.unique(raw_df[ColNm.serv])) + serv_vec = np.sort(np.unique(raw_df[ColNm.serv])) if not serv_vec.size == 1: self.log("more than one service in a run", True) return False @@ -348,7 +348,7 @@ def load_iden_oi_from_df(self, raw_df: pd.DataFrame, ecu_mode: int = -1) -> bool ) if ecu_mode != -1: cond_abn &= raw_df[ColNm.ecu_mode] == ecu_mode - self.abn_iden_vec = np.sort(pd.unique(raw_df.loc[cond_abn, ColNm.iden])) + self.abn_iden_vec = np.sort(np.unique(raw_df.loc[cond_abn, ColNm.iden])) except (KeyError, IndexingError, AttributeError) as exc: self.log( "loading identifiers of interest from data frame failed", True, exc diff --git a/src/gallia/analyzer/xl_generator.py b/src/gallia/analyzer/xl_generator.py index 98cbb0a74..253ed66e9 100644 --- a/src/gallia/analyzer/xl_generator.py +++ b/src/gallia/analyzer/xl_generator.py @@ -32,7 +32,7 @@ class ExcelGenerator(Operator): def __init__(self, path: str = "", log_mode: LogMode = LogMode.STD_OUT): Operator.__init__(self, path, log_mode) self.msg_head = "[ExcelGenerator] " - self.workbook = self.workbook = op.Workbook() + self.workbook: op.Workbook = op.Workbook() self.worksheet: Any self.load_color_code(SrcPath.err_src) @@ -77,7 +77,7 @@ def save_close_xl(self, out_path: str) -> bool: except (InvalidFileException, WorkbookAlreadySaved) as exc: self.log("saving EXCEL failed", True, exc) return False - return + return True def add_sum_sheet_serv( self, raw_df: pd.DataFrame, entries_vec: np.ndarray, sheet_name: str = "" @@ -121,8 +121,8 @@ def add_sum_sheet_iden( try: self.worksheet = self.workbook.create_sheet(sheet_name) ref_col = ColNm.iden - serv = pd.unique(raw_df[ColNm.serv])[0] - sbfn_vec = np.sort(pd.unique(raw_df[ColNm.sbfn])) + serv = np.unique(raw_df[ColNm.serv])[0] + sbfn_vec = np.sort(np.unique(raw_df[ColNm.sbfn])) dft_err_df = self.get_dft_err_df_from_raw(raw_df) cur_row, cur_col = self.sum_sheet_fill_origin(ScanMode.IDEN, serv, sbfn_vec) cur_row, cur_col = self.sum_sheet_fill_index( @@ -288,7 +288,7 @@ def sum_sheet_fill_resp( if scan_mode == ScanMode.SERV: sbfn_vec = np.arange(1) if scan_mode == ScanMode.IDEN: - sbfn_vec = np.sort(pd.unique(raw_df[ColNm.sbfn])) + sbfn_vec = np.sort(np.unique(raw_df[ColNm.sbfn])) for sess in sess_vec: if dft_err_df[sess][0] == -1: continue @@ -344,7 +344,7 @@ def add_failure_sheet( width = XlDesign.dim_wide if scan_mode == ScanMode.IDEN: fail_vec = np.array([Failure.UNDOC_IDEN, Failure.MISS_IDEN]) - sbfn_vec = np.sort(pd.unique(raw_df[ColNm.sbfn])) + sbfn_vec = np.sort(np.unique(raw_df[ColNm.sbfn])) width = XlDesign.dim_middle cur_row = self.start_row cur_col = self.start_col @@ -390,7 +390,7 @@ def add_failure_sheet( lambda x, fl=fail: self.check_fail(x, fl) ) & (raw_df[ColNm.sess] == sess) if scan_mode == ScanMode.SERV: - serv_vec = np.sort(pd.unique(raw_df.loc[cond, ColNm.serv])) + serv_vec = np.sort(np.unique(raw_df.loc[cond, ColNm.serv])) for serv in serv_vec: self.worksheet.cell( cur_row, cur_col @@ -406,7 +406,7 @@ def add_failure_sheet( zip(raw_df[ColNm.iden], raw_df[ColNm.sbfn]) ) iden_sbfn_vec = np.sort( - pd.unique(raw_df.loc[cond, ColNm.combi]) + np.unique(raw_df.loc[cond, ColNm.combi]) ) for iden_sbfn in iden_sbfn_vec: iden = iden_sbfn[0] @@ -422,7 +422,7 @@ def add_failure_sheet( cur_row += 1 cur_col += 1 else: - iden_vec = np.sort(pd.unique(raw_df.loc[cond, ColNm.iden])) + iden_vec = np.sort(np.unique(raw_df.loc[cond, ColNm.iden])) for iden in iden_vec: if iden == -1: entry = CellCnt.no_ent