diff --git a/snap7/util/db.py b/snap7/util/db.py index 7c84261f..ffcb40b5 100644 --- a/snap7/util/db.py +++ b/snap7/util/db.py @@ -137,6 +137,60 @@ logger = getLogger(__name__) +def prepare_tia_export_to_parse(txt_path: str) -> str: + """Return a string that can be ingested by parse_specification + from a .txt file directly copied and pasted from TIA Portal. + It also handles duplicate variable names by progressively appending “_X”. + + Args: + tia_export: path to the .txt + + Returns: + string ready to be parsed + """ + + with open(txt_path, "r") as file: + db_specification = "" + + valid_list = { + "BOOL", + "DWORD", + "INT", + "DINT", + "CHAR", + "STRING", + "DATE_AND_TIME", + "TIME_OF_DAY", + "REAL", + "BYTE", + } + var_names: list[str] = [] + + for line in file: + line = line.lstrip("\t") + parsed_line = line.split("\t") + + var_name = parsed_line[0] + var_type = parsed_line[1].upper() + var_offset = parsed_line[2] + + to_add = "_0" + for name in reversed(var_names): + name = str(name) + if name.rsplit("_")[0] == var_name: + print(name.rsplit("_")[-1]) + to_add = "_" + str(int(name.rsplit("_")[-1]) + 1) + break + + var_name = var_name + to_add + var_names.append(var_name) + + if var_type in valid_list: + new_line = var_offset + "\t" + var_name + "\t" + var_type + db_specification = db_specification + "\n" + new_line + return db_specification + + def parse_specification(db_specification: str) -> Dict[str, Any]: """Create a db specification derived from a dataview of a db in which the byte layout @@ -317,7 +371,9 @@ def make_rows(self) -> None: logger.error(msg) self.index[key] = row - def __getitem__(self, key: str, default: Optional[None] = None) -> Union[None, "Row"]: + def __getitem__( + self, key: str, default: Optional[None] = None + ) -> Union[None, "Row"]: """Access a row of the table through its index. Rows (values) are of type :class:`DB_Row`. @@ -388,7 +444,9 @@ def set_data(self, bytearray_: bytearray) -> None: :obj:`TypeError`: if `bytearray_` is not an instance of :obj:`bytearray` """ if not isinstance(bytearray_, bytearray): - raise TypeError(f"Value bytearray_: {bytearray_} is not from type bytearray") + raise TypeError( + f"Value bytearray_: {bytearray_} is not from type bytearray" + ) self._bytearray = bytearray_ def read(self, client: Client) -> None: @@ -496,7 +554,9 @@ def __init__( self.area = area if not isinstance(bytearray_, (bytearray, DB)): - raise TypeError(f"Value bytearray_ {bytearray_} is not from type (bytearray, DB)") + raise TypeError( + f"Value bytearray_ {bytearray_} is not from type (bytearray, DB)" + ) self._bytearray = bytearray_ self._specification = parse_specification(_specification) @@ -590,17 +650,23 @@ def get_value(self, byte_index: Union[str, int], type_: str) -> ValueType: if type_.startswith("FSTRING"): max_size = re.search(r"\d+", type_) if max_size is None: - raise ValueError("Max size could not be determinate. re.search() returned None") + raise ValueError( + "Max size could not be determinate. re.search() returned None" + ) return get_fstring(bytearray_, byte_index, int(max_size[0])) elif type_.startswith("STRING"): max_size = re.search(r"\d+", type_) if max_size is None: - raise ValueError("Max size could not be determinate. re.search() returned None") + raise ValueError( + "Max size could not be determinate. re.search() returned None" + ) return get_string(bytearray_, byte_index) elif type_.startswith("WSTRING"): max_size = re.search(r"\d+", type_) if max_size is None: - raise ValueError("Max size could not be determinate. re.search() returned None") + raise ValueError( + "Max size could not be determinate. re.search() returned None" + ) return get_wstring(bytearray_, byte_index) else: type_to_func: Dict[str, Callable[[bytearray, int], ValueType]] = { @@ -629,7 +695,9 @@ def get_value(self, byte_index: Union[str, int], type_: str) -> ValueType: return type_to_func[type_](bytearray_, byte_index) raise ValueError - def set_value(self, byte_index: Union[str, int], type_: str, value: Union[bool, str, float]) -> Optional[bytearray]: + def set_value( + self, byte_index: Union[str, int], type_: str, value: Union[bool, str, float] + ) -> Optional[bytearray]: """Sets the value for a specific type in the specified byte index. Args: @@ -649,14 +717,18 @@ def set_value(self, byte_index: Union[str, int], type_: str, value: Union[bool, if type_ == "BOOL" and isinstance(value, bool): byte_index, bool_index = str(byte_index).split(".") - return set_bool(bytearray_, self.get_offset(byte_index), int(bool_index), value) + return set_bool( + bytearray_, self.get_offset(byte_index), int(bool_index), value + ) byte_index = self.get_offset(byte_index) if type_.startswith("FSTRING") and isinstance(value, str): max_size = re.search(r"\d+", type_) if max_size is None: - raise ValueError("Max size could not be determinate. re.search() returned None") + raise ValueError( + "Max size could not be determinate. re.search() returned None" + ) max_size_grouped = max_size.group(0) max_size_int = int(max_size_grouped) set_fstring(bytearray_, byte_index, value, max_size_int) @@ -665,7 +737,9 @@ def set_value(self, byte_index: Union[str, int], type_: str, value: Union[bool, if type_.startswith("STRING") and isinstance(value, str): max_size = re.search(r"\d+", type_) if max_size is None: - raise ValueError("Max size could not be determinate. re.search() returned None") + raise ValueError( + "Max size could not be determinate. re.search() returned None" + ) max_size_grouped = max_size.group(0) max_size_int = int(max_size_grouped) set_string(bytearray_, byte_index, value, max_size_int) @@ -711,7 +785,9 @@ def write(self, client: Client) -> None: :obj:`ValueError`: if the `row_size` is less than 0. """ if not isinstance(self._bytearray, DB): - raise TypeError(f"Value self._bytearray: {self._bytearray} is not from type DB.") + raise TypeError( + f"Value self._bytearray: {self._bytearray} is not from type DB." + ) if self.row_size < 0: raise ValueError("row_size must be greater equal zero.") @@ -741,7 +817,9 @@ def read(self, client: Client) -> None: :obj:`ValueError`: if the `row_size` is less than 0. """ if not isinstance(self._bytearray, DB): - raise TypeError(f"Value self._bytearray:{self._bytearray} is not from type DB.") + raise TypeError( + f"Value self._bytearray:{self._bytearray} is not from type DB." + ) if self.row_size < 0: raise ValueError("row_size must be greater equal zero.") db_nr = self._bytearray.db_number