Skip to content

Commit

Permalink
Added fallback to type detection to allow automatic ENUM usage + Adde…
Browse files Browse the repository at this point in the history
…d exception raising for unknown types
  • Loading branch information
RobertoRoos committed Jul 25, 2024
1 parent 8086c9f commit 2eb27f9
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 27 deletions.
21 changes: 15 additions & 6 deletions pyads/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
_dict_slice_generator,
bytes_from_dict,
size_of_structure,
ADSError,
)
from .symbol import AdsSymbol
from .utils import decode_ads
Expand Down Expand Up @@ -173,16 +174,16 @@ def _query_plc_datatype_from_name(self, data_name: str,
If cache_symbol_info is True then the SymbolInfo will be cached and adsGetSymbolInfo
will only used once.
"""
info = None
if cache_symbol_info:
info = self._symbol_info_cache.get(data_name)
if info is None:
info = adsGetSymbolInfo(self._port, self._adr, data_name)
self._symbol_info_cache[data_name] = info
else:
if info is None:
info = adsGetSymbolInfo(self._port, self._adr, data_name)
return AdsSymbol.get_type_from_str(info.symbol_type)
if cache_symbol_info:
self._symbol_info_cache[data_name] = info

return AdsSymbol.get_type_from_info(info)

def open(self) -> None:
"""Connect to the TwinCAT message router."""
Expand Down Expand Up @@ -537,6 +538,10 @@ def read_by_name(
plc_datatype = self._query_plc_datatype_from_name(data_name,
cache_symbol_info)

if plc_datatype is None:
# `adsSyncReadReqEx2()` will fail for a None type
raise ADSError(None, f"Failed to detect datatype for `{data_name}`")

return adsSyncReadByNameEx(
self._port,
self._adr,
Expand Down Expand Up @@ -682,6 +687,10 @@ def write_by_name(
plc_datatype = self._query_plc_datatype_from_name(data_name,
cache_symbol_info)

if plc_datatype is None:
# `adsSyncWriteReqEx()` will fail for a None type
raise ADSError(None, f"Failed to detect datatype for `{data_name}`")

return adsSyncWriteByNameEx(
self._port, self._adr, data_name, value, plc_datatype, handle=handle
)
Expand Down
62 changes: 47 additions & 15 deletions pyads/symbol.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
from typing import TYPE_CHECKING, Any, Optional, List, Tuple, Callable, Union, Type

from . import constants # To access all constants, use package notation
from .constants import PLCDataType
from .constants import PLCDataType, ads_type_to_ctype
from .pyads_ex import adsGetSymbolInfo
from .structs import NotificationAttrib
from .structs import NotificationAttrib, SAdsSymbolEntry
from .ads import ADSError

# ads.Connection relies on structs.AdsSymbol (but in type hints only), so use
# this 'if' to only include it when type hinting (False during execution)
Expand Down Expand Up @@ -125,7 +126,7 @@ def __init__(
self.name = name
self.index_offset = index_offset
self.index_group = index_group
self.symbol_type = symbol_type
self.symbol_type = None # Apply `symbol_type` later
self.comment = comment
self._value: Any = None

Expand All @@ -137,15 +138,14 @@ def __init__(
from .ads import size_of_structure
self._structure_size = size_of_structure(self.structure_def * self.array_size)

self.plc_type: Optional[Type[PLCDataType]] = None

if missing_info:
self._create_symbol_from_info() # Perform remote lookup

# Now `self.symbol_type` should have a value, find the actual PLCTYPE
# from it.
# This is relevant for both lookup and full user definition.

self.plc_type: Optional[Type[PLCDataType]] = None
if self.symbol_type is not None:
# Apply user-provided type (overriding auto detect if any):
if symbol_type is not None:
self.symbol_type = symbol_type
if isinstance(self.symbol_type, str): # Perform lookup if string
self.plc_type = AdsSymbol.get_type_from_str(self.symbol_type)
else: # Otherwise `symbol_type` is probably a pyads.PLCTYPE_* constant
Expand All @@ -166,12 +166,8 @@ def _create_symbol_from_info(self) -> None:
if info.comment:
self.comment = info.comment

# info.dataType is an integer mapping to a type in
# constants.ads_type_to_ctype.
# However, this type ignores whether the variable is really an array!
# So are not going to be using this and instead rely on the textual
# type
self.symbol_type = info.symbol_type # Save the type as string
self.plc_type = self.get_type_from_info(info)
self.symbol_type = info.symbol_type # Also save the type as string

def _check_for_open_connection(self) -> None:
"""Assert the current object is ready to read from/write to.
Expand All @@ -195,6 +191,12 @@ def read(self) -> Any:
structure_size=self._structure_size,
array_size=self.array_size)
else:
if self.plc_type is None:
raise ADSError(
None,
f"Cannot read data with unknown datatype for symbol "
f"{self.name} ({self.symbol_type})"
)
self._value = self._plc.read(self.index_group, self.index_offset, self.plc_type)

return self._value
Expand All @@ -218,6 +220,12 @@ def write(self, new_value: Optional[Any] = None) -> None:
self._plc.write_structure_by_name(self.name, new_value, self.structure_def,
structure_size=self._structure_size, array_size=self.array_size)
else:
if self.plc_type is None:
raise ADSError(
None,
f"Cannot write data with unknown datatype for symbol "
f"{self.name} ({self.symbol_type})"
)
self._plc.write(self.index_group, self.index_offset, new_value, self.plc_type)

def __repr__(self) -> str:
Expand Down Expand Up @@ -283,6 +291,30 @@ def _value_callback(self, notification: Any, data_name: Any) -> None:
)
self._value = value

@classmethod
def get_type_from_info(cls, info: SAdsSymbolEntry) -> Optional[Type[PLCDataType]]:
"""Get PLCTYPE_* from symbol info struct
Also see :meth:`~get_type_from_str`.
"""
plc_type = cls.get_type_from_str(info.symbol_type)
if plc_type is not None:
return plc_type

# Failed to detect by name (e.g. type is enum)

# Use `ADST_*` integer to detect type instead
plc_type = ads_type_to_ctype.get(info.dataType, None)
if plc_type is not None:
array_size = int(info.size / sizeof(plc_type))
# However, `dataType` is always a scalar, even if the object is an array:
if array_size > 1:
plc_type = plc_type * array_size

return plc_type

return None

@staticmethod
def get_type_from_str(type_str: str) -> Optional[Type[PLCDataType]]:
"""Get PLCTYPE_* from PLC name string
Expand Down
35 changes: 29 additions & 6 deletions tests/test_symbol.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import pyads
from pyads.testserver import AdsTestServer, AdvancedHandler, PLCVariable
from pyads import constants, AdsSymbol, bytes_from_dict
from pyads import constants, AdsSymbol, ADSError, bytes_from_dict

from tests.test_connection_class import create_notification_struct

Expand Down Expand Up @@ -143,8 +143,8 @@ def test_init_by_name_matrix_style(self):
struct.pack("<21b", *range(21)),
ads_type=constants.ADST_VOID,
symbol_type="matrix_21_int8_T", # Simulink array looks like this
index_group = 123,
index_offset = 100,
index_group=123,
index_offset=100,
)
self.handler.add_variable(var)
self.plc.open()
Expand All @@ -158,7 +158,7 @@ def test_init_by_name_matrix_style(self):

# Verify looked up info
self.assertEqual(constants.PLCTYPE_ARR_SINT(21), symbol.plc_type)
self.assertEqual(var.symbol_type, symbol.symbol_type)
self.assertEqual("matrix_21_int8_T", symbol.symbol_type)

self.assertAdsRequestsCount(0) # No requests

Expand Down Expand Up @@ -265,10 +265,10 @@ def test_init_invalid_type(self):
# Create symbol while providing everything:
symbol = AdsSymbol(self.plc, name=var.name)
self.assertEqual(var.symbol_type, symbol.symbol_type)
with self.assertRaises(TypeError) as cm:
with self.assertRaises(ADSError) as cm:
# Error is thrown inside pyads_ex
symbol.read()
self.assertIn("NoneType", str(cm.exception))
self.assertIn("unknown datatype", str(cm.exception))
self.assertAdsRequestsCount(1) # Only a WRITE followed by a READ

def test_read_write_errors(self):
Expand Down Expand Up @@ -370,6 +370,29 @@ def test_read_structure_array(self):

self.assertEqual(values, read_values)

def test_read_enum(self):
"""Test reading from a symbol when it's an ENUM type"""
self.handler.add_variable(
PLCVariable("TestEnum", 7, constants.ADST_UINT16, symbol_type="E_MyEnum"))

with self.plc:
symbol = self.plc.get_symbol("TestEnum")
value = symbol.read()

self.assertEqual(7, value)

def test_read_enum_array(self):
"""Test reading from a symbol when it's an array of an ENUM type"""
value_bytes = struct.pack("<3h", 1, 2, 3)
self.handler.add_variable(
PLCVariable("TestEnumList", value_bytes, constants.ADST_UINT16, symbol_type="ARRAY [1..3] OF E_MyEnum"))

with self.plc:
symbol = self.plc.get_symbol("TestEnumList")
value = symbol.read()

self.assertEqual([1, 2, 3], value)

def test_write(self):
"""Test symbol value writing"""
with self.plc:
Expand Down

0 comments on commit 2eb27f9

Please sign in to comment.