From 149f11ad5cd56e61076077f2e08062e9a8f3ffc6 Mon Sep 17 00:00:00 2001 From: - <-> Date: Wed, 20 Oct 2021 18:08:31 +0200 Subject: [PATCH] 0.9: - Lib passes a SmlFrameSnippet to the builder --- src/smllib/__version__.py | 2 +- src/smllib/builder/_builder.py | 5 +- src/smllib/builder/message.py | 5 +- src/smllib/sml/sml_choice.py | 7 +- src/smllib/sml/sml_time.py | 12 +- src/smllib/sml_frame.py | 74 ++++++----- src/smllib/sml_frame_snippet.py | 17 +++ tests/builder/test_build.py | 14 +- tests/conftest.py | 2 +- tests/helper.py | 13 ++ tests/sml/test_format.py | 18 +-- tests/sml/test_time.py | 13 +- tests/test_sml_data_types.py | 218 ++++++++++++++++++++++++++++++++ tests/test_sml_fields.py | 14 +- tests/test_sml_frame.py | 142 --------------------- 15 files changed, 338 insertions(+), 218 deletions(-) create mode 100644 src/smllib/sml_frame_snippet.py create mode 100644 tests/helper.py create mode 100644 tests/test_sml_data_types.py delete mode 100644 tests/test_sml_frame.py diff --git a/src/smllib/__version__.py b/src/smllib/__version__.py index 376df7c..e46aee1 100644 --- a/src/smllib/__version__.py +++ b/src/smllib/__version__.py @@ -1 +1 @@ -__version__ = '0.8' +__version__ = '0.9' diff --git a/src/smllib/builder/_builder.py b/src/smllib/builder/_builder.py index f644600..8343b69 100644 --- a/src/smllib/builder/_builder.py +++ b/src/smllib/builder/_builder.py @@ -13,13 +13,14 @@ def __init__(self): def build(self, obj: list, classes: Dict[Type[SmlBaseObj], 'SmlObjBuilder']) -> T_SML_OBJ: # check length - if len(self.fields) != len(obj): + lst = obj.value + if len(self.fields) != len(lst): raise WrongArgCount() out = self.BUILDS() for i, a in enumerate(self.fields.items()): # type: int, Tuple[str, SmlObjFieldInfo] name, field = a - value = obj[i] + value = lst[i].value # rebuild with choice if field.choice is not None: diff --git a/src/smllib/builder/message.py b/src/smllib/builder/message.py index cce2e31..3e1e1c0 100644 --- a/src/smllib/builder/message.py +++ b/src/smllib/builder/message.py @@ -11,8 +11,9 @@ class SmlMessageBuilder(SmlObjBuilder): BUILDS = smllib.sml.SmlMessage def build(self, obj: list, classes: Dict[Type[SmlBaseObj], SmlObjBuilder]) -> smllib.sml.SmlMessage: - if obj[5] is not EndOfSmlMsg: + lst = obj.value + if lst[5].value is not EndOfSmlMsg: raise EndOfSmlMsgExpected() - obj.pop(5) + lst.pop(5) return super().build(obj, classes) diff --git a/src/smllib/sml/sml_choice.py b/src/smllib/sml/sml_choice.py index 8c252e4..609b9a7 100644 --- a/src/smllib/sml/sml_choice.py +++ b/src/smllib/sml/sml_choice.py @@ -2,20 +2,21 @@ from smllib.errors import UnsupportedType, WrongArgCount from smllib.sml import SmlBaseObj +from smllib.sml_frame_snippet import SmlFrameSnippet class SmlChoice: def __init__(self, choices: Dict[int, Union[Type[SmlBaseObj], Callable[[List[Any]], Any]]]): self.choices = choices - def get(self, obj) -> Union[Tuple[None, Any], Tuple[Type[SmlBaseObj], Any]]: + def get(self, obj: List[SmlFrameSnippet]) -> Union[Tuple[None, Any], Tuple[Type[SmlBaseObj], Any]]: if len(obj) != 2: raise WrongArgCount() _type, _value = obj - ret = self.choices.get(_type) + ret = self.choices.get(_type.value) if ret is None: - raise UnsupportedType(_type) + raise UnsupportedType(_type.value) if issubclass(ret, SmlBaseObj): return ret, _value diff --git a/src/smllib/sml/sml_time.py b/src/smllib/sml/sml_time.py index d4adebf..880505c 100644 --- a/src/smllib/sml/sml_time.py +++ b/src/smllib/sml/sml_time.py @@ -12,17 +12,17 @@ def build_time(_in): # This is a workaround for times that are not reported according to specification # Instead of a choice list these devices report just the timestamp - however I am unsure about it. - # todo: remove it and see what happens if isinstance(_in, int): return _in - _type, _value = _in + type_s, value_s = _in + _type = type_s.value if _type == 1: - return _value + return value_s.value if _type == 2: - return datetime.utcfromtimestamp(_value) + return datetime.utcfromtimestamp(value_s.value) if _type == 3: - ts, offset1, offset2 = _value - return datetime.utcfromtimestamp(ts) + timedelta(minutes=offset1) + timedelta(minutes=offset2) + ts, offset1, offset2 = value_s.value + return datetime.utcfromtimestamp(ts.value) + timedelta(minutes=offset1.value) + timedelta(minutes=offset2.value) raise UnsupportedType(_type) diff --git a/src/smllib/sml_frame.py b/src/smllib/sml_frame.py index 062929d..8c733cc 100644 --- a/src/smllib/sml_frame.py +++ b/src/smllib/sml_frame.py @@ -3,6 +3,7 @@ from smllib.builder import create_context, CTX_HINT from smllib.errors import InvalidBufferPos from smllib.sml import EndOfSmlMsg, SmlListEntry, SmlMessage +from smllib.sml_frame_snippet import SmlFrameSnippet class SmlFrame: @@ -13,65 +14,74 @@ def __init__(self, buffer: bytes, build_ctx: CTX_HINT = None, msg_ctx: Optional[ self.next_pos = 0 - self.msg_ctx = msg_ctx + self.msg_ctx = msg_ctx # This is the whole sml message self.build_ctx: CTX_HINT = build_ctx if build_ctx is not None else create_context() - def get_value(self, pos: Optional[int] = None): + def get_value(self, pos: Optional[int] = None) -> SmlFrameSnippet: if pos is None: pos = self.next_pos + snip_start = pos # check start pos if pos >= self.buf_len: raise InvalidBufferPos(f'Start pos bigger than buffer: {pos} > {self.buf_len}') + # advance v = self.buffer[pos] + start = pos + 1 + + # ---------------------------------------- + # types with fixed size + # ---------------------------------------- # No value if v == 0x01: - self.next_pos = pos + 1 - return None - - # Bool - if v == 0x42: - self.next_pos = pos + 2 - return bool(self.buffer[pos + 1]) + self.next_pos = start + return SmlFrameSnippet(None, snip_start, start, self.buffer) # End of a SmlMSg if v == 0x00: - self.next_pos = pos + 1 - return EndOfSmlMsg + self.next_pos = start + return SmlFrameSnippet(EndOfSmlMsg, snip_start, start, self.buffer) + + # Bool + if v == 0x42: + self.next_pos = start + 1 + return SmlFrameSnippet(bool(self.buffer[start]), snip_start, self.next_pos, self.buffer) + # ---------------------------------------- # types with dynamic size - s_pos = pos + # ---------------------------------------- is_long = bool(v & 0x80) _type = v & 0x70 # type _size = v & 0x0F # size including the 1-byte tag while is_long: - s_pos += 1 - v = self.buffer[s_pos] + v = self.buffer[start] _size = _size << 4 | v & 0x0F is_long = bool(v & 0x80) + start += 1 # type is a list if _type == 0x70: - self.next_pos = s_pos + 1 # Must be s_pos because we can have lists with a long length - return [None for _ in range(_size)] + self.next_pos = start # Must be s_pos because we can have lists with a long length + return SmlFrameSnippet([None for _ in range(_size)], snip_start) # End position - e_pos = pos + _size - if e_pos > self.buf_len: - raise InvalidBufferPos(f'Pos bigger than buffer: {e_pos} > {self.buf_len}') - self.next_pos = e_pos + end = pos + _size + if end > self.buf_len: + raise InvalidBufferPos(f'Pos bigger than buffer: {end} > {self.buf_len}') + self.next_pos = end # 0x50: signed integer, 0x60 unsigned integer - if _type in (0x50, 0x60): - return int.from_bytes( - self.buffer[s_pos + 1:e_pos], byteorder='big', signed=True if _type == 0x50 else False + if _type == 0x50 or _type == 0x60: + return SmlFrameSnippet( + int.from_bytes(self.buffer[start:end], byteorder='big', signed=_type == 0x50), + snip_start, end, self.buffer ) # 0x00: octet str if _type == 0x00: - return self.buffer[s_pos + 1:e_pos].hex() + return SmlFrameSnippet(self.buffer[start:end].hex(), snip_start, end, self.buffer) raise ValueError(f'Unknown data type: {_type:02x}!') @@ -87,17 +97,19 @@ def parse_frame(self) -> List[SmlMessage]: ) # This will always return a list - val = self.get_value() - self._parse_msg(val) + val = self._parse_msg(self.get_value()) ret.append(self.build_ctx[SmlMessage].build(val, self.build_ctx)) return ret - def _parse_msg(self, parent_obj=None): + def _parse_msg(self, parent_obj: SmlFrameSnippet) -> SmlFrameSnippet: # it's always a list now - for i, _ in enumerate(parent_obj): - parent_obj[i] = v = self.get_value() - if isinstance(v, list): + _lst = parent_obj.value + for i, _ in enumerate(_lst): + _lst[i] = v = self.get_value() + if isinstance(v.value, list): self._parse_msg(v) + parent_obj.stop_pos(self.next_pos, self.buffer) + return parent_obj def get_obis(self) -> List[SmlListEntry]: """Returns all obis values in the frame without parsing the frame""" @@ -105,7 +117,7 @@ def get_obis(self) -> List[SmlListEntry]: start = -1 while (start := self.bytes.find(b'\x77\x07\x01', start + 1)) != -1: data = self.get_value(start) - if not isinstance(data, list): + if not isinstance(data.value, list): continue self._parse_msg(data) diff --git a/src/smllib/sml_frame_snippet.py b/src/smllib/sml_frame_snippet.py new file mode 100644 index 0000000..f43e863 --- /dev/null +++ b/src/smllib/sml_frame_snippet.py @@ -0,0 +1,17 @@ +from typing import Optional, Union + + +class SmlFrameSnippet: + __slots__ = ('pos', 'value', 'msg') + + def __init__(self, value: Union[None, bool, int, str, float, list], start: int, + stop: Optional[int] = None, buf: Optional[memoryview] = None): + self.value = value + + self.pos = start + self.msg: Optional[memoryview] = None if stop is None else buf[start: stop] + + def stop_pos(self, pos: int, buf: memoryview) -> 'SmlFrameSnippet': + assert self.msg is None + self.msg = buf[self.pos: pos] + return self diff --git a/tests/builder/test_build.py b/tests/builder/test_build.py index cba8bec..e9158cd 100644 --- a/tests/builder/test_build.py +++ b/tests/builder/test_build.py @@ -1,21 +1,22 @@ from smllib.builder import SmlCloseResponseBuilder, SmlGetListResponseBuilder, \ SmlListEntryBuilder, SmlMessageBuilder, SmlObjBuilder from smllib.sml import EndOfSmlMsg, SmlCloseResponse, SmlGetListResponse, SmlListEntry +from tests.helper import in_snip def test_build_entry(): builder = SmlListEntryBuilder() - obj = builder.build(['obis', None, None, None, None, '76616c', None], {SmlListEntry: builder}) + obj = builder.build(in_snip(['obis', None, None, None, None, '76616c', None]), {SmlListEntry: builder}) assert obj.obis == 'obis' assert obj.value == 'val' def test_build_entry_list(): - data = [ + data = in_snip([ None, 'server', None, None, [['obis1', None, None, None, None, '76616c31', None], ['obis2', None, None, None, None, '76616c32', None]], None, None - ] + ]) builder = SmlGetListResponseBuilder() @@ -43,9 +44,9 @@ def build(self, obj: list, classes): def test_build_choice(): + data = in_snip(['t1', 1, 0, [0x0201, ['sig']], 1111, EndOfSmlMsg]) builder = SmlMessageBuilder() - obj = builder.build(['t1', 1, 0, [0x0201, ['sig']], 1111, EndOfSmlMsg], - {SmlCloseResponse: SmlCloseResponseBuilder()}) + obj = builder.build(data, {SmlCloseResponse: SmlCloseResponseBuilder()}) assert obj.transaction_id == 't1' assert obj.group_no == 1 assert obj.abort_on_error == 0 @@ -61,7 +62,8 @@ def build(self, obj: list, classes): ret.global_signature += '_patched' return ret - obj = builder.build(['t1', 1, 0, [0x0201, ['sig']], 1111, EndOfSmlMsg], {SmlCloseResponse: PatchedBuilder()}) + data = in_snip(['t1', 1, 0, [0x0201, ['sig']], 1111, EndOfSmlMsg]) + obj = builder.build(data, {SmlCloseResponse: PatchedBuilder()}) assert obj.transaction_id == 't1' assert obj.group_no == 1 assert obj.abort_on_error == 0 diff --git a/tests/conftest.py b/tests/conftest.py index e13dec1..bbd8045 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,7 @@ import sys from pathlib import Path -# add src dir so tests work +# add src dir so tests work src = Path(__file__).parent.with_name('src') assert src.is_dir(), src if str(src) not in sys.path: diff --git a/tests/helper.py b/tests/helper.py new file mode 100644 index 0000000..3c4a53e --- /dev/null +++ b/tests/helper.py @@ -0,0 +1,13 @@ +from smllib.sml_frame import SmlFrameSnippet + + +def in_snip(obj, pack_top = True) -> SmlFrameSnippet: + if not isinstance(obj, (list, tuple)): + return SmlFrameSnippet(obj, 'from in_snip') + + for i, k in enumerate(obj): + obj[i] = in_snip(k) + + if pack_top: + return SmlFrameSnippet(obj, 'from in_snip') + return obj diff --git a/tests/sml/test_format.py b/tests/sml/test_format.py index 5d537cc..a1a47ca 100644 --- a/tests/sml/test_format.py +++ b/tests/sml/test_format.py @@ -1,9 +1,11 @@ -from smllib.sml import SmlOpenResponse, SmlCloseResponse, SmlGetListResponse, SmlListEntry, SmlMessage -from smllib.builder import SmlOpenResponseBuilder, SmlCloseResponseBuilder, SmlGetListResponseBuilder, SmlListEntryBuilder, SmlMessageBuilder +from smllib.builder import SmlCloseResponseBuilder, \ + SmlGetListResponseBuilder, SmlListEntryBuilder, SmlOpenResponseBuilder +from smllib.sml import SmlListEntry +from tests.helper import in_snip def test_open_response(): - r = SmlOpenResponseBuilder().build([None, None, 'ab', 'cd', None, 1], {}) + r = SmlOpenResponseBuilder().build(in_snip([None, None, 'ab', 'cd', None, 1]), {}) assert r.format_msg() == '\n' \ ' codepage : None\n' \ ' client_id : None\n' \ @@ -14,21 +16,21 @@ def test_open_response(): def test_close_response(): - r = SmlCloseResponseBuilder().build(['my_sig'], {}) + r = SmlCloseResponseBuilder().build(in_snip(['my_sig']), {}) assert r.format_msg() == '\n' \ ' global_signature: my_sig\n' def test_list_entry(): - data = [ + data = in_snip([ None, 'server', None, None, [['obis1', None, None, None, None, '76616c31', None], ['obis2', None, None, None, None, '76616c32', None]], None, None - ] - + ]) + builder = SmlGetListResponseBuilder() obj = builder.build(data, {SmlListEntry: SmlListEntryBuilder()}) - + assert obj.format_msg() == '\n' \ ' client_id : None\n' \ ' sever_id : server\n' \ diff --git a/tests/sml/test_time.py b/tests/sml/test_time.py index cb626f8..be5051d 100644 --- a/tests/sml/test_time.py +++ b/tests/sml/test_time.py @@ -4,20 +4,21 @@ from smllib.errors import UnsupportedType from smllib.sml.sml_time import build_time +from tests.helper import in_snip def test_sml_time(): assert build_time(None) is None - assert build_time([1, 253]) == 253 - assert build_time([2, 1609466461]) == datetime(2021, 1, 1, 2, 1, 1) + assert build_time(in_snip([1, 253], pack_top=False)) == 253 + assert build_time(in_snip([2, 1609466461], pack_top=False)) == datetime(2021, 1, 1, 2, 1, 1) - assert build_time([3, [1609466461, 120, 0]]) == datetime(2021, 1, 1, 4, 1, 1) - assert build_time([3, [1622509261, 120, 60]]) == datetime(2021, 6, 1, 4, 1, 1) - assert build_time([3, [1622509261, 120, 30]]) == datetime(2021, 6, 1, 3, 31, 1) + assert build_time(in_snip([3, [1609466461, 120, 0]], pack_top=False)) == datetime(2021, 1, 1, 4, 1, 1) + assert build_time(in_snip([3, [1622509261, 120, 60]], pack_top=False)) == datetime(2021, 6, 1, 4, 1, 1) + assert build_time(in_snip([3, [1622509261, 120, 30]], pack_top=False)) == datetime(2021, 6, 1, 3, 31, 1) def test_exception(): with pytest.raises(UnsupportedType) as e: - build_time([5, 55]) + build_time([in_snip(5), in_snip(55)]) assert e.value.type == 5 assert str(e.value) == 'Unsupported type 0x05' diff --git a/tests/test_sml_data_types.py b/tests/test_sml_data_types.py new file mode 100644 index 0000000..a45aea3 --- /dev/null +++ b/tests/test_sml_data_types.py @@ -0,0 +1,218 @@ +from binascii import a2b_hex + +from smllib.sml_frame import EndOfSmlMsg, SmlFrame, SmlFrameSnippet + + +def check(f: SmlFrameSnippet, value, msg: str): + if value is True or value is False or value is None or value is EndOfSmlMsg: + assert f.value is value + else: + if isinstance(value, (tuple, list)): + for i, _value in enumerate(value): + assert f.value[i].value == _value + else: + assert f.value == value + assert f.msg.hex() == msg + + +def test_get_int8(): + f = SmlFrame(b'\x52\xff') + check(f.get_value(0), -1, '52ff') + assert f.next_pos == 2 + + f = SmlFrame(b'\x52\x03') + check(f.get_value(0), 3, '5203') + assert f.next_pos == 2 + + f = SmlFrame(b'\x00\x52\x03') + f.next_pos = 9999 + check(f.get_value(1), 3, '5203') + assert f.next_pos == 3 + + +def test_get_uint8(): + f = SmlFrame(b'\x62\xff') + check(f.get_value(0), 255, '62ff') + assert f.next_pos == 2 + + f = SmlFrame(b'\x62\x03') + check(f.get_value(0), 3, '6203') + assert f.next_pos == 2 + + f = SmlFrame(b'\x00\x62\xff') + f.next_pos = 9999 + check(f.get_value(1), 255, '62ff') + assert f.next_pos == 3 + + +def test_get_int16(): + f = SmlFrame(b'\x53\xff\x00') + check(f.get_value(0), -256, '53ff00') + assert f.next_pos == 3 + + f = SmlFrame(b'\x53\x00\x03') + check(f.get_value(0), 3, '530003') + assert f.next_pos == 3 + + f = SmlFrame(b'\x00\x53\xff\x01') + f.next_pos = 9999 + check(f.get_value(1), -255, '53ff01') + assert f.next_pos == 4 + + +def test_get_uint16(): + f = SmlFrame(b'\x63\xff\x00') + check(f.get_value(0), 65280, '63ff00') + assert f.next_pos == 3 + + f = SmlFrame(b'\x63\x00\x03') + check(f.get_value(0), 3, '630003') + assert f.next_pos == 3 + + f = SmlFrame(b'\x00\x63\xff\x01') + f.next_pos = 9999 + check(f.get_value(1), 65281, '63ff01') + assert f.next_pos == 4 + + +def test_get_int32(): + f = SmlFrame(b'\x55\x00\x00\x0a\x8c') + check(f.get_value(0), 2700, '5500000a8c') + assert f.next_pos == 5 + + f = SmlFrame(b'\x01\x01\x55\x00\x00\x0a\x8c') + f.next_pos = 9999 + check(f.get_value(2), 2700, '5500000a8c') + assert f.next_pos == 7 + + +def test_get_uint32(): + f = SmlFrame(b'\x55\x00\x00\x0a\x8c') + check(f.get_value(0), 2700, '5500000a8c') + assert f.next_pos == 5 + + +# def test_int(): +# +# f = SmlFrame(b'\x65\x0c\x6a\x50\xb5') +# assert f.get_value(0) == 208294069 +# +# # too short stuff +# with pytest.raises(InvalidBufferPos): +# f = SmlFrame(b'\x56\x00\x04\xeb\x09') +# assert f.get_value(0) is None +# with pytest.raises(InvalidBufferPos): +# f = SmlFrame(b'\x65\x0c\x6a') +# assert f.get_value(0) is None +# +# # now with indexes > 0 +# f = SmlFrame(b'\xaa\xbb\x56\x00\x04\xeb\x09\x6c') +# assert f.get_value(2) == 82512236 +# f = SmlFrame(b'\x00\x52\xff') +# assert f.get_value(1) == -1 +# f = SmlFrame(b'\x01\xff\x33\x62\x1e') +# assert f.get_value(3) == 30 +# f = SmlFrame(b'\xab\xcd\x87\x44\x65\x0c\x6a\x50\xb5') +# assert f.get_value(4) == 208294069 +# +# # now with stuff appended +# f = SmlFrame(b'\x56\x00\x04\xeb\x09\x6c\x12\x34') +# assert f.get_value(0) == 82512236 +# f = SmlFrame(b'\xaa\xbb\x52\xff\xcc\xdd') +# assert f.get_value(2) == -1 +# f = SmlFrame(b'\x52\x62\x1e\x99') +# assert f.get_value(1) == 30 +# f = SmlFrame(b'\x65\x0c\x6a\x50\xb5\x77\x88') +# assert f.get_value(0) == 208294069 +# +# # first some values starting at index 0 +# f = SmlFrame(b'\x56\x00\x04\xeb\x09\x6c') +# assert check(f.get_value(0), 82512236, '5600') +# + +def test_none(): + f = SmlFrame(b'\x01\x01\x01') + assert f.get_value().value is None + assert f.next_pos == 1 + + f.next_pos = 9999 + assert f.get_value(1).value is None + assert f.next_pos == 2 + + assert f.get_value().value is None + assert f.next_pos == 3 + + +def test_get_list(): + f = SmlFrame(b'\x71\x01') + f.next_pos = 9999 + check(f._parse_msg(f.get_value(0)), [None], '7101') + assert f.next_pos == 2 + + f = SmlFrame(b'\x72\x01\x00') + f.next_pos = 9999 + check(f._parse_msg(f.get_value(0)), [None, EndOfSmlMsg], '720100') + assert f.next_pos == 3 + + f = SmlFrame(b'\x72\x52\xff\x62\x1e') + check(f._parse_msg(f.get_value(0)), [-1, 30], '7252ff621e') + assert f.next_pos == 5 + + f = SmlFrame(a2b_hex( + '77070100010800ff' + '6500000782' + '01' + '621e' + '52ff' + '59000000000dd359d6' + '01' + 'ff')) + check(f._parse_msg(f.get_value(0)), + ['0100010800ff', 1922, None, 30, -1, 231954902, None], + '77070100010800ff650000078201621e52ff59000000000dd359d601') + assert f.next_pos == 28 + + +def test_str(): + f = SmlFrame(b'\x07\x01\x00\x01\x08\x00\xFF') + f.next_pos = 9999 + check(f.get_value(0), '0100010800ff', '070100010800ff') + assert f.next_pos == 7 + + +def test_long_str(): + f = SmlFrame(a2b_hex('8302010203040101010101010101010101010101010101010101010101010101010101010101010101010101010101010102')) # noqa: E501 + check( + f.get_value(0), + '010203040101010101010101010101010101010101010101010101010101010101010101010101010101010101010102', + '8302010203040101010101010101010101010101010101010101010101010101010101010101010101010101010101010102') + assert f.next_pos == 50 + + f = SmlFrame(a2b_hex('8302010203040101010101010101010101010101010101010101010101010101010101010101010101010101010101010102FF')) # noqa: E501 + check( + f.get_value(0), + '010203040101010101010101010101010101010101010101010101010101010101010101010101010101010101010102', + '8302010203040101010101010101010101010101010101010101010101010101010101010101010101010101010101010102') + assert f.next_pos == 50 + assert f.buffer[f.next_pos] == 0xFF + + +def test_bool(): + f = SmlFrame(b'\x42\x01') + f.next_pos = 9999 + check(f.get_value(0), True, '4201') + assert f.next_pos == 2 + + f = SmlFrame(b'\x42\x00') + check(f.get_value(0), False, '4200') + assert f.next_pos == 2 + + +def test_eom(): + f = SmlFrame(b'\x00') + check(f.get_value(0), EndOfSmlMsg, '00') + assert f.next_pos == 1 + + f = SmlFrame(b'\x00\x00') + check(f.get_value(1), EndOfSmlMsg, '00') + assert f.next_pos == 2 diff --git a/tests/test_sml_fields.py b/tests/test_sml_fields.py index 09dca6a..fb2ba4b 100644 --- a/tests/test_sml_fields.py +++ b/tests/test_sml_fields.py @@ -6,17 +6,13 @@ def test_sml_fields(): f = SmlFrame(a2b_hex('77078181c78203ff010101010449534b0177070100000009ff010101010b')) - val_list = f.get_value(0) - for i, _ in enumerate(val_list): - val_list[i] = f.get_value() + val_list = f._parse_msg(f.get_value(0)) o = SmlListEntryBuilder().build(val_list, create_context()) assert o.obis == '8181c78203ff' assert o.value == 'ISK' f = SmlFrame(a2b_hex('77070100010800ff650000018201621e52ff590000000001122334017707')) - val_list = f.get_value(0) - for i, _ in enumerate(val_list): - val_list[i] = f.get_value() + val_list = f._parse_msg(f.get_value(0)) o = SmlListEntryBuilder().build(val_list, create_context()) assert o.obis == '0100010800ff' @@ -30,15 +26,13 @@ def test_sml_fields(): def test_val_time(): # Frame where time is None f = SmlFrame(a2b_hex('77070100600100ff010101010b0a01484c5902000424a001')) - val_list = f.get_value(0) - f._parse_msg(val_list) + val_list = f._parse_msg(f.get_value(0)) o = SmlListEntryBuilder().build(val_list, create_context()) assert o.val_time is None # Frame where secIndex == 1 and time == 0 # -> 7262016200 f = SmlFrame(a2b_hex('77070100600100ff017262016200620052000b0a01445a47000282c0b001')) - val_list = f.get_value(0) - f._parse_msg(val_list) + val_list = f._parse_msg(f.get_value(0)) o = SmlListEntryBuilder().build(val_list, create_context()) assert o.val_time == 0 diff --git a/tests/test_sml_frame.py b/tests/test_sml_frame.py deleted file mode 100644 index 407c468..0000000 --- a/tests/test_sml_frame.py +++ /dev/null @@ -1,142 +0,0 @@ -from binascii import a2b_hex - -import pytest - -from smllib.sml_frame import InvalidBufferPos, SmlFrame - - -def test_get_int(): - - # first some values starting at index 0 - f = SmlFrame(b'\x56\x00\x04\xeb\x09\x6c') - assert f.get_value(0) == 82512236 - - f = SmlFrame(b'\x52\xff') - assert f.get_value(0) == -1 - - f = SmlFrame(b'\x52\x03') - assert f.get_value(0) == 3 - - f = SmlFrame(b'\x62\x1e') - assert f.get_value(0) == 30 - - f = SmlFrame(b'\x65\x0c\x6a\x50\xb5') - assert f.get_value(0) == 208294069 - - # too short stuff - with pytest.raises(InvalidBufferPos): - f = SmlFrame(b'\x56\x00\x04\xeb\x09') - assert f.get_value(0) is None - with pytest.raises(InvalidBufferPos): - f = SmlFrame(b'\x65\x0c\x6a') - assert f.get_value(0) is None - - # now with indexes > 0 - f = SmlFrame(b'\xaa\xbb\x56\x00\x04\xeb\x09\x6c') - assert f.get_value(2) == 82512236 - f = SmlFrame(b'\x00\x52\xff') - assert f.get_value(1) == -1 - f = SmlFrame(b'\x01\xff\x33\x62\x1e') - assert f.get_value(3) == 30 - f = SmlFrame(b'\xab\xcd\x87\x44\x65\x0c\x6a\x50\xb5') - assert f.get_value(4) == 208294069 - - # now with stuff appended - f = SmlFrame(b'\x56\x00\x04\xeb\x09\x6c\x12\x34') - assert f.get_value(0) == 82512236 - f = SmlFrame(b'\xaa\xbb\x52\xff\xcc\xdd') - assert f.get_value(2) == -1 - f = SmlFrame(b'\x52\x62\x1e\x99') - assert f.get_value(1) == 30 - f = SmlFrame(b'\x65\x0c\x6a\x50\xb5\x77\x88') - assert f.get_value(0) == 208294069 - - -def test_get_next_pos(): - f = SmlFrame(b'\x52\xff') - assert f.get_value(0) == -1 - assert f.next_pos == 2 - - f = SmlFrame(b'\x71\xff') - f.next_pos = 9999 - assert f.get_value(0) == [None] - assert f.next_pos == 1 - - -def test_none(): - f = SmlFrame(b'\x01\x01\x01\xff') - assert f.get_value() is None - assert f.next_pos == 1 - - f.next_pos = 9999 - assert f.get_value(1) is None - assert f.next_pos == 2 - - assert f.get_value() is None - assert f.next_pos == 3 - - -def test_get_list(): - f = SmlFrame(b'\x72\x52\xff\x62\x1e') - - r = f.get_value(0) - assert f.next_pos == 1 - assert r == [None, None] - - f.next_pos = 9999 - assert f.get_value(1) == -1 - assert f.next_pos == 3 - - assert f.get_value() == 30 - assert f.next_pos == 5 - - f = SmlFrame(a2b_hex( - '77070100010800ff' - '6500000782' - '01' - '621e' - '52ff' - '59000000000dd359d6' - '01' - 'ff')) - r = f.get_value(0) - assert f.next_pos == 1 - assert r == [None, None, None, None, None, None, None] - - assert f.get_value() == '0100010800ff' - assert f.get_value() == 1922 - assert f.get_value() is None - assert f.get_value() == 30 - assert f.get_value() == -1 - - assert f.get_value() == 231954902 - assert f.get_value() is None - - v_list = f.get_value(0) - for i, _ in enumerate(v_list): - v_list[i] = f.get_value() - assert v_list == ['0100010800ff', 1922, None, 30, -1, 231954902, None] - - -def test_str(): - f = SmlFrame(b'\x07\x01\x00\x01\x08\x00\xFF') - f.next_pos = 9999 - assert f.get_value(0) == '0100010800ff' - assert f.next_pos == 7 - - -def test_bool(): - f = SmlFrame(b'\x42\x01') - f.next_pos = 9999 - assert f.get_value(0) is True - assert f.next_pos == 2 - - f = SmlFrame(b'\x42\x00') - assert f.get_value(0) is False - - -def test_long_str(): - f = SmlFrame(a2b_hex('8302010203040101010101010101010101010101010101010101010101010101010101010101010101010101010101010102FF')) # noqa: E501 - assert f.get_value(0) == '010203040101010101010101010101010101010101010101010101010101010101010101010101010101010101010102' # noqa: E501 - assert f.next_pos == 50 - assert f.buffer[50] == 0xFF