Skip to content

Commit

Permalink
fix typing
Browse files Browse the repository at this point in the history
  • Loading branch information
peckto committed Jul 5, 2022
1 parent ef53160 commit d5f62ca
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 53 deletions.
8 changes: 5 additions & 3 deletions src/gallia/analyzer/categorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def categorize_iden(
categorize failures for scan_identifier.
"""
try:
serv_vec = pd.unique(raw_df[ColNm.serv])
serv_vec = np.unique(raw_df[ColNm.serv])
if not serv_vec.size == 1:
self.log("more than one service in a run", True)
return pd.DataFrame()
Expand Down Expand Up @@ -124,7 +124,7 @@ def categorize_iden(
return pd.DataFrame()
return raw_df

def check_sess_alwd(self, serv: int, sess: int, op_mode: OpMode, ecu_mode) -> bool:
def check_sess_alwd(self, serv: int, sess: int, op_mode: OpMode, ecu_mode: int) -> bool:
"""
check if a certain diagnostic session is available or supported
for a certain service at given analysis mode.
Expand All @@ -149,7 +149,7 @@ def check_resp_alwd(self, serv: int, resp: int) -> bool:
+ self.iso_supp_err_for_all_vec.tolist()
)

def check_sbfn_alwd(self, serv: int, sbfn: int, op_mode: OpMode, ecu_mode) -> bool:
def check_sbfn_alwd(self, serv: int, sbfn: int, op_mode: OpMode, ecu_mode: int) -> bool:
"""
check if a certain sub-function is available or supported
for a certain service at given analysis mode.
Expand Down Expand Up @@ -275,6 +275,8 @@ def get_fail_iden(
supp_serv_vec = self.supp_serv_ven_vec
if op_mode == OpMode.ISO:
supp_serv_vec = self.supp_serv_iso_vec
else:
raise RuntimeError(f'Unsupported op_mode: {op_mode}')

cond_serv_supp = serv in supp_serv_vec
cond_resp_alwd = self.check_resp_alwd(serv, resp)
Expand Down
4 changes: 2 additions & 2 deletions src/gallia/analyzer/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class EmptyTableException(Exception):
exception class for empty table error
"""

def __init__(self):
def __init__(self) -> None:
super().__init__("Empty Table.")


Expand All @@ -17,5 +17,5 @@ class ColumnMismatchException(Exception):
exception class for column mismatch
"""

def __init__(self):
def __init__(self) -> None:
super().__init__("Columns Mismatch.")
54 changes: 18 additions & 36 deletions src/gallia/analyzer/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
"""
import json
from json.decoder import JSONDecodeError
from typing import Tuple
from sqlite3 import OperationalError
from itertools import chain
from typing import cast
import numpy as np
import pandas as pd
from pandas.core.indexing import IndexingError
Expand Down Expand Up @@ -35,8 +35,8 @@ def __init__(self, path: str = "", log_mode: LogMode = LogMode.STD_OUT):
self.ref_ven_df = pd.DataFrame()
self.supp_serv_ven_vec = np.array([])
self.sess_code_vec = np.array([])
self.sess_code_dict = dict()
self.sess_name_dict = dict()
self.sess_code_dict: dict[int, str] = {}
self.sess_name_dict: dict[str, int] = {}
self.load_all_dicts()
if self.connect_db():
self.load_ref_iso()
Expand All @@ -58,7 +58,7 @@ def get_runs(self) -> np.ndarray:
get all a numpy array of all runs in the database.
"""
if self.load_meta(force=True):
return self.run_meta_df.index
return self.run_meta_df.index.to_numpy()
return np.array([])

def get_scan_mode(self, run: int) -> ScanMode:
Expand Down Expand Up @@ -88,10 +88,10 @@ def get_sid(self, run: int) -> int:
return -1
raw_df = self.read_run_db(TblNm.iden, run)
self.check_df(raw_df, TblStruct.iden)
serv_vec = pd.unique(raw_df[ColNm.serv])
serv_vec = np.unique(raw_df[ColNm.serv])
if serv_vec.shape[0] > 1:
self.log("A run has more than one Service ID.", True)
serv_ser = raw_df[ColNm.serv].mode()
serv_ser = raw_df[ColNm.serv].mode(dropna=True)
if serv_ser.shape[0] > 1:
self.log("A run has more than one most frequent Service ID.", True)
except (
Expand All @@ -112,7 +112,7 @@ def get_ecu_mode(self, run: int) -> int:
if not self.load_meta():
return -1
try:
ecu_mode = self.run_meta_df.loc[run, ColNm.ecu_mode]
ecu_mode = cast(int, self.run_meta_df.loc[run, ColNm.ecu_mode])
return ecu_mode
except (KeyError, IndexingError, AttributeError) as exc:
self.log("getting ECU mode failed", True, exc)
Expand All @@ -135,7 +135,7 @@ def get_sess_lu(self) -> np.ndarray:
try:
lu_df = self.read_db(TblNm.ven_lu)
self.check_df(lu_df, TblStruct.ven_lu)
sess_vec = pd.unique(lu_df[ColNm.sess])
sess_vec = np.unique(lu_df[ColNm.sess])
except (
KeyError,
IndexingError,
Expand Down Expand Up @@ -173,36 +173,18 @@ def get_ref_df_from_json(self, path: str) -> pd.DataFrame:
return pd.DataFrame()
return ref_df

def get_dft_err_df(self, run: int) -> Tuple[pd.DataFrame, np.ndarray]:
"""
get data frame that shows most common error(default error)
for each diagnostic session regarding a run.
"""
try:
scan_mode = self.get_scan_mode(run)
if scan_mode == ScanMode.SERV:
raw_df = self.read_run_db(TblNm.serv, run)
self.check_df(raw_df, TblStruct.serv)
else:
raw_df = self.read_run_db(TblNm.iden, run)
self.check_df(raw_df, TblStruct.iden)
except (EmptyTableException, ColumnMismatchException) as exc:
self.log("getting default error data frame failed", True, exc)
return pd.DataFrame()
return self.get_dft_err_df_from_raw(raw_df)

def get_dft_err_df_from_raw(self, raw_df: pd.DataFrame) -> pd.DataFrame:
"""
get summarized data frame that shows most common error(default error)
for each diagnostic session from raw data frame.
"""
try:
sess_vec = pd.unique(raw_df[ColNm.sess])
sess_vec = np.unique(raw_df[ColNm.sess])
dft_err_df = pd.DataFrame([], index=[ColNm.dft], columns=sess_vec)
for sess in sess_vec:
cond = raw_df[ColNm.sess] == sess
dft_err_df.loc[ColNm.dft, sess] = raw_df.loc[cond, ColNm.resp].mode()[0]
dft_err_df.attrs[ColNm.serv] = list(pd.unique(raw_df[ColNm.serv]))
dft_err_df.attrs[ColNm.serv] = list(np.unique(raw_df[ColNm.serv]))
except (
KeyError,
IndexingError,
Expand All @@ -224,7 +206,7 @@ def get_pos_res(self, search_id: int) -> str:
FROM "{TblNm.scan_result}" WHERE "{ColNm.id}" = {str(search_id)};
"""
res_df = self.get_df_by_query(res_sql)
resp = res_df.iloc[0, 0]
resp = cast(str, res_df.iloc[0, 0])
except (KeyError, IndexingError, AttributeError) as exc:
self.log("getting positive response failed", True, exc)
return ""
Expand Down Expand Up @@ -281,7 +263,7 @@ def load_ven_lu(self, force: bool = False, num_modes: int = NUM_ECU_MODES) -> bo
try:
lu_df = self.read_db(TblNm.ven_lu)
self.check_df(lu_df, TblStruct.ven_lu)
supp_serv_vec = np.sort(pd.unique(lu_df[ColNm.serv]))
supp_serv_vec = np.sort(np.unique(lu_df[ColNm.serv]))
mode_vec = np.arange(num_modes)
ven_lu_dict = {}
self.num_modes = 0
Expand All @@ -295,21 +277,21 @@ def load_ven_lu(self, force: bool = False, num_modes: int = NUM_ECU_MODES) -> bo
for serv in supp_serv_vec:
sess_ls = list(
np.sort(
pd.unique(
np.unique(
loi_df.loc[loi_df[ColNm.serv] == serv, ColNm.sess]
)
)
)
sbfn_ls = list(
np.sort(
pd.unique(
np.unique(
loi_df.loc[loi_df[ColNm.serv] == serv, ColNm.sbfn]
)
)
)
iden_ls = list(
np.sort(
pd.unique(
np.unique(
loi_df.loc[
loi_df[ColNm.serv] == serv,
ColNm.iden,
Expand All @@ -323,7 +305,7 @@ def load_ven_lu(self, force: bool = False, num_modes: int = NUM_ECU_MODES) -> bo
)
ven_lu_dict[mode] = ref_df.T
ven_lu_df = pd.concat(ven_lu_dict.values(), axis=1, keys=ven_lu_dict.keys())
self.ref_ven_df: pd.DataFrame = ven_lu_df
self.ref_ven_df = ven_lu_df
self.supp_serv_ven_vec = np.sort(np.array(ven_lu_df.index))
except (
KeyError,
Expand Down Expand Up @@ -413,7 +395,7 @@ def load_lu_iden(self, serv: int, ecu_mode: int) -> bool:
(raw_df[ColNm.serv] == serv) & (raw_df[ColNm.ecu_mode] == ecu_mode)
].copy()
self.lu_iden_df = pd.DataFrame(
pd.unique(
np.unique(
list(
zip(
serv_df[ColNm.sess],
Expand Down Expand Up @@ -503,7 +485,7 @@ def prepare_alwd_sess_boot(
ven_lu_df[ColNm.combi] = list(
zip(ven_lu_df[ColNm.serv], ven_lu_df[ColNm.sess], ven_lu_df[ColNm.boot])
)
entries_vec = pd.unique(ven_lu_df[ColNm.combi])
entries_vec = np.unique(ven_lu_df[ColNm.combi])
for entry in entries_vec:
pair_ls.append((entry[0], entry[1], entry[2]))
pair_df = pd.DataFrame(
Expand Down
6 changes: 3 additions & 3 deletions src/gallia/analyzer/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def load_sid_oi_from_df(self, raw_df: pd.DataFrame, ecu_mode: int = -1) -> bool:
)
if ecu_mode != -1:
cond_abn &= raw_df[ColNm.ecu_mode] == ecu_mode
self.abn_serv_vec = np.sort(pd.unique(raw_df.loc[cond_abn, ColNm.serv]))
self.abn_serv_vec = np.sort(np.unique(raw_df.loc[cond_abn, ColNm.serv]))
except (KeyError, IndexingError, AttributeError) as exc:
self.log("loading services of interest from data frame failed", True, exc)
return False
Expand All @@ -331,7 +331,7 @@ def load_iden_oi_from_df(self, raw_df: pd.DataFrame, ecu_mode: int = -1) -> bool
load identifiers of interest from input raw data frame.
"""
try:
serv_vec = np.sort(pd.unique(raw_df[ColNm.serv]))
serv_vec = np.sort(np.unique(raw_df[ColNm.serv]))
if not serv_vec.size == 1:
self.log("more than one service in a run", True)
return False
Expand All @@ -348,7 +348,7 @@ def load_iden_oi_from_df(self, raw_df: pd.DataFrame, ecu_mode: int = -1) -> bool
)
if ecu_mode != -1:
cond_abn &= raw_df[ColNm.ecu_mode] == ecu_mode
self.abn_iden_vec = np.sort(pd.unique(raw_df.loc[cond_abn, ColNm.iden]))
self.abn_iden_vec = np.sort(np.unique(raw_df.loc[cond_abn, ColNm.iden]))
except (KeyError, IndexingError, AttributeError) as exc:
self.log(
"loading identifiers of interest from data frame failed", True, exc
Expand Down
18 changes: 9 additions & 9 deletions src/gallia/analyzer/xl_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ExcelGenerator(Operator):
def __init__(self, path: str = "", log_mode: LogMode = LogMode.STD_OUT):
Operator.__init__(self, path, log_mode)
self.msg_head = "[ExcelGenerator] "
self.workbook = self.workbook = op.Workbook()
self.workbook: op.Workbook = op.Workbook()
self.worksheet: Any
self.load_color_code(SrcPath.err_src)

Expand Down Expand Up @@ -77,7 +77,7 @@ def save_close_xl(self, out_path: str) -> bool:
except (InvalidFileException, WorkbookAlreadySaved) as exc:
self.log("saving EXCEL failed", True, exc)
return False
return
return True

def add_sum_sheet_serv(
self, raw_df: pd.DataFrame, entries_vec: np.ndarray, sheet_name: str = ""
Expand Down Expand Up @@ -121,8 +121,8 @@ def add_sum_sheet_iden(
try:
self.worksheet = self.workbook.create_sheet(sheet_name)
ref_col = ColNm.iden
serv = pd.unique(raw_df[ColNm.serv])[0]
sbfn_vec = np.sort(pd.unique(raw_df[ColNm.sbfn]))
serv = np.unique(raw_df[ColNm.serv])[0]
sbfn_vec = np.sort(np.unique(raw_df[ColNm.sbfn]))
dft_err_df = self.get_dft_err_df_from_raw(raw_df)
cur_row, cur_col = self.sum_sheet_fill_origin(ScanMode.IDEN, serv, sbfn_vec)
cur_row, cur_col = self.sum_sheet_fill_index(
Expand Down Expand Up @@ -288,7 +288,7 @@ def sum_sheet_fill_resp(
if scan_mode == ScanMode.SERV:
sbfn_vec = np.arange(1)
if scan_mode == ScanMode.IDEN:
sbfn_vec = np.sort(pd.unique(raw_df[ColNm.sbfn]))
sbfn_vec = np.sort(np.unique(raw_df[ColNm.sbfn]))
for sess in sess_vec:
if dft_err_df[sess][0] == -1:
continue
Expand Down Expand Up @@ -344,7 +344,7 @@ def add_failure_sheet(
width = XlDesign.dim_wide
if scan_mode == ScanMode.IDEN:
fail_vec = np.array([Failure.UNDOC_IDEN, Failure.MISS_IDEN])
sbfn_vec = np.sort(pd.unique(raw_df[ColNm.sbfn]))
sbfn_vec = np.sort(np.unique(raw_df[ColNm.sbfn]))
width = XlDesign.dim_middle
cur_row = self.start_row
cur_col = self.start_col
Expand Down Expand Up @@ -390,7 +390,7 @@ def add_failure_sheet(
lambda x, fl=fail: self.check_fail(x, fl)
) & (raw_df[ColNm.sess] == sess)
if scan_mode == ScanMode.SERV:
serv_vec = np.sort(pd.unique(raw_df.loc[cond, ColNm.serv]))
serv_vec = np.sort(np.unique(raw_df.loc[cond, ColNm.serv]))
for serv in serv_vec:
self.worksheet.cell(
cur_row, cur_col
Expand All @@ -406,7 +406,7 @@ def add_failure_sheet(
zip(raw_df[ColNm.iden], raw_df[ColNm.sbfn])
)
iden_sbfn_vec = np.sort(
pd.unique(raw_df.loc[cond, ColNm.combi])
np.unique(raw_df.loc[cond, ColNm.combi])
)
for iden_sbfn in iden_sbfn_vec:
iden = iden_sbfn[0]
Expand All @@ -422,7 +422,7 @@ def add_failure_sheet(
cur_row += 1
cur_col += 1
else:
iden_vec = np.sort(pd.unique(raw_df.loc[cond, ColNm.iden]))
iden_vec = np.sort(np.unique(raw_df.loc[cond, ColNm.iden]))
for iden in iden_vec:
if iden == -1:
entry = CellCnt.no_ent
Expand Down

0 comments on commit d5f62ca

Please sign in to comment.