Skip to content

Commit

Permalink
0.9:
Browse files Browse the repository at this point in the history
- Lib passes a SmlFrameSnippet to the builder
  • Loading branch information
- committed Oct 20, 2021
1 parent 3d42c10 commit 149f11a
Show file tree
Hide file tree
Showing 15 changed files with 338 additions and 218 deletions.
2 changes: 1 addition & 1 deletion src/smllib/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.8'
__version__ = '0.9'
5 changes: 3 additions & 2 deletions src/smllib/builder/_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ def __init__(self):

def build(self, obj: list, classes: Dict[Type[SmlBaseObj], 'SmlObjBuilder']) -> T_SML_OBJ:
# check length
if len(self.fields) != len(obj):
lst = obj.value
if len(self.fields) != len(lst):
raise WrongArgCount()

out = self.BUILDS()
for i, a in enumerate(self.fields.items()): # type: int, Tuple[str, SmlObjFieldInfo]
name, field = a
value = obj[i]
value = lst[i].value

# rebuild with choice
if field.choice is not None:
Expand Down
5 changes: 3 additions & 2 deletions src/smllib/builder/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ class SmlMessageBuilder(SmlObjBuilder):
BUILDS = smllib.sml.SmlMessage

def build(self, obj: list, classes: Dict[Type[SmlBaseObj], SmlObjBuilder]) -> smllib.sml.SmlMessage:
if obj[5] is not EndOfSmlMsg:
lst = obj.value
if lst[5].value is not EndOfSmlMsg:
raise EndOfSmlMsgExpected()
obj.pop(5)
lst.pop(5)

return super().build(obj, classes)
7 changes: 4 additions & 3 deletions src/smllib/sml/sml_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

from smllib.errors import UnsupportedType, WrongArgCount
from smllib.sml import SmlBaseObj
from smllib.sml_frame_snippet import SmlFrameSnippet


class SmlChoice:
def __init__(self, choices: Dict[int, Union[Type[SmlBaseObj], Callable[[List[Any]], Any]]]):
self.choices = choices

def get(self, obj) -> Union[Tuple[None, Any], Tuple[Type[SmlBaseObj], Any]]:
def get(self, obj: List[SmlFrameSnippet]) -> Union[Tuple[None, Any], Tuple[Type[SmlBaseObj], Any]]:
if len(obj) != 2:
raise WrongArgCount()

_type, _value = obj
ret = self.choices.get(_type)
ret = self.choices.get(_type.value)
if ret is None:
raise UnsupportedType(_type)
raise UnsupportedType(_type.value)

if issubclass(ret, SmlBaseObj):
return ret, _value
Expand Down
12 changes: 6 additions & 6 deletions src/smllib/sml/sml_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ def build_time(_in):

# This is a workaround for times that are not reported according to specification
# Instead of a choice list these devices report just the timestamp - however I am unsure about it.
# todo: remove it and see what happens
if isinstance(_in, int):
return _in

_type, _value = _in
type_s, value_s = _in
_type = type_s.value
if _type == 1:
return _value
return value_s.value
if _type == 2:
return datetime.utcfromtimestamp(_value)
return datetime.utcfromtimestamp(value_s.value)
if _type == 3:
ts, offset1, offset2 = _value
return datetime.utcfromtimestamp(ts) + timedelta(minutes=offset1) + timedelta(minutes=offset2)
ts, offset1, offset2 = value_s.value
return datetime.utcfromtimestamp(ts.value) + timedelta(minutes=offset1.value) + timedelta(minutes=offset2.value)

raise UnsupportedType(_type)
74 changes: 43 additions & 31 deletions src/smllib/sml_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from smllib.builder import create_context, CTX_HINT
from smllib.errors import InvalidBufferPos
from smllib.sml import EndOfSmlMsg, SmlListEntry, SmlMessage
from smllib.sml_frame_snippet import SmlFrameSnippet


class SmlFrame:
Expand All @@ -13,65 +14,74 @@ def __init__(self, buffer: bytes, build_ctx: CTX_HINT = None, msg_ctx: Optional[

self.next_pos = 0

self.msg_ctx = msg_ctx
self.msg_ctx = msg_ctx # This is the whole sml message
self.build_ctx: CTX_HINT = build_ctx if build_ctx is not None else create_context()

def get_value(self, pos: Optional[int] = None):
def get_value(self, pos: Optional[int] = None) -> SmlFrameSnippet:
if pos is None:
pos = self.next_pos
snip_start = pos

# check start pos
if pos >= self.buf_len:
raise InvalidBufferPos(f'Start pos bigger than buffer: {pos} > {self.buf_len}')

# advance
v = self.buffer[pos]
start = pos + 1

# ----------------------------------------
# types with fixed size
# ----------------------------------------

# No value
if v == 0x01:
self.next_pos = pos + 1
return None

# Bool
if v == 0x42:
self.next_pos = pos + 2
return bool(self.buffer[pos + 1])
self.next_pos = start
return SmlFrameSnippet(None, snip_start, start, self.buffer)

# End of a SmlMSg
if v == 0x00:
self.next_pos = pos + 1
return EndOfSmlMsg
self.next_pos = start
return SmlFrameSnippet(EndOfSmlMsg, snip_start, start, self.buffer)

# Bool
if v == 0x42:
self.next_pos = start + 1
return SmlFrameSnippet(bool(self.buffer[start]), snip_start, self.next_pos, self.buffer)

# ----------------------------------------
# types with dynamic size
s_pos = pos
# ----------------------------------------
is_long = bool(v & 0x80)
_type = v & 0x70 # type
_size = v & 0x0F # size including the 1-byte tag
while is_long:
s_pos += 1
v = self.buffer[s_pos]
v = self.buffer[start]
_size = _size << 4 | v & 0x0F
is_long = bool(v & 0x80)
start += 1

# type is a list
if _type == 0x70:
self.next_pos = s_pos + 1 # Must be s_pos because we can have lists with a long length
return [None for _ in range(_size)]
self.next_pos = start # Must be s_pos because we can have lists with a long length
return SmlFrameSnippet([None for _ in range(_size)], snip_start)

# End position
e_pos = pos + _size
if e_pos > self.buf_len:
raise InvalidBufferPos(f'Pos bigger than buffer: {e_pos} > {self.buf_len}')
self.next_pos = e_pos
end = pos + _size
if end > self.buf_len:
raise InvalidBufferPos(f'Pos bigger than buffer: {end} > {self.buf_len}')
self.next_pos = end

# 0x50: signed integer, 0x60 unsigned integer
if _type in (0x50, 0x60):
return int.from_bytes(
self.buffer[s_pos + 1:e_pos], byteorder='big', signed=True if _type == 0x50 else False
if _type == 0x50 or _type == 0x60:
return SmlFrameSnippet(
int.from_bytes(self.buffer[start:end], byteorder='big', signed=_type == 0x50),
snip_start, end, self.buffer
)

# 0x00: octet str
if _type == 0x00:
return self.buffer[s_pos + 1:e_pos].hex()
return SmlFrameSnippet(self.buffer[start:end].hex(), snip_start, end, self.buffer)

raise ValueError(f'Unknown data type: {_type:02x}!')

Expand All @@ -87,25 +97,27 @@ def parse_frame(self) -> List[SmlMessage]:
)

# This will always return a list
val = self.get_value()
self._parse_msg(val)
val = self._parse_msg(self.get_value())
ret.append(self.build_ctx[SmlMessage].build(val, self.build_ctx))
return ret

def _parse_msg(self, parent_obj=None):
def _parse_msg(self, parent_obj: SmlFrameSnippet) -> SmlFrameSnippet:
# it's always a list now
for i, _ in enumerate(parent_obj):
parent_obj[i] = v = self.get_value()
if isinstance(v, list):
_lst = parent_obj.value
for i, _ in enumerate(_lst):
_lst[i] = v = self.get_value()
if isinstance(v.value, list):
self._parse_msg(v)
parent_obj.stop_pos(self.next_pos, self.buffer)
return parent_obj

def get_obis(self) -> List[SmlListEntry]:
"""Returns all obis values in the frame without parsing the frame"""
ret = []
start = -1
while (start := self.bytes.find(b'\x77\x07\x01', start + 1)) != -1:
data = self.get_value(start)
if not isinstance(data, list):
if not isinstance(data.value, list):
continue

self._parse_msg(data)
Expand Down
17 changes: 17 additions & 0 deletions src/smllib/sml_frame_snippet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Optional, Union


class SmlFrameSnippet:
__slots__ = ('pos', 'value', 'msg')

def __init__(self, value: Union[None, bool, int, str, float, list], start: int,
stop: Optional[int] = None, buf: Optional[memoryview] = None):
self.value = value

self.pos = start
self.msg: Optional[memoryview] = None if stop is None else buf[start: stop]

def stop_pos(self, pos: int, buf: memoryview) -> 'SmlFrameSnippet':
assert self.msg is None
self.msg = buf[self.pos: pos]
return self
14 changes: 8 additions & 6 deletions tests/builder/test_build.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
from smllib.builder import SmlCloseResponseBuilder, SmlGetListResponseBuilder, \
SmlListEntryBuilder, SmlMessageBuilder, SmlObjBuilder
from smllib.sml import EndOfSmlMsg, SmlCloseResponse, SmlGetListResponse, SmlListEntry
from tests.helper import in_snip


def test_build_entry():
builder = SmlListEntryBuilder()
obj = builder.build(['obis', None, None, None, None, '76616c', None], {SmlListEntry: builder})
obj = builder.build(in_snip(['obis', None, None, None, None, '76616c', None]), {SmlListEntry: builder})
assert obj.obis == 'obis'
assert obj.value == 'val'


def test_build_entry_list():
data = [
data = in_snip([
None, 'server', None, None,
[['obis1', None, None, None, None, '76616c31', None], ['obis2', None, None, None, None, '76616c32', None]],
None, None
]
])

builder = SmlGetListResponseBuilder()

Expand Down Expand Up @@ -43,9 +44,9 @@ def build(self, obj: list, classes):


def test_build_choice():
data = in_snip(['t1', 1, 0, [0x0201, ['sig']], 1111, EndOfSmlMsg])
builder = SmlMessageBuilder()
obj = builder.build(['t1', 1, 0, [0x0201, ['sig']], 1111, EndOfSmlMsg],
{SmlCloseResponse: SmlCloseResponseBuilder()})
obj = builder.build(data, {SmlCloseResponse: SmlCloseResponseBuilder()})
assert obj.transaction_id == 't1'
assert obj.group_no == 1
assert obj.abort_on_error == 0
Expand All @@ -61,7 +62,8 @@ def build(self, obj: list, classes):
ret.global_signature += '_patched'
return ret

obj = builder.build(['t1', 1, 0, [0x0201, ['sig']], 1111, EndOfSmlMsg], {SmlCloseResponse: PatchedBuilder()})
data = in_snip(['t1', 1, 0, [0x0201, ['sig']], 1111, EndOfSmlMsg])
obj = builder.build(data, {SmlCloseResponse: PatchedBuilder()})
assert obj.transaction_id == 't1'
assert obj.group_no == 1
assert obj.abort_on_error == 0
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys
from pathlib import Path

# add src dir so tests work
# add src dir so tests work
src = Path(__file__).parent.with_name('src')
assert src.is_dir(), src
if str(src) not in sys.path:
Expand Down
13 changes: 13 additions & 0 deletions tests/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from smllib.sml_frame import SmlFrameSnippet


def in_snip(obj, pack_top = True) -> SmlFrameSnippet:
if not isinstance(obj, (list, tuple)):
return SmlFrameSnippet(obj, 'from in_snip')

for i, k in enumerate(obj):
obj[i] = in_snip(k)

if pack_top:
return SmlFrameSnippet(obj, 'from in_snip')
return obj
18 changes: 10 additions & 8 deletions tests/sml/test_format.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from smllib.sml import SmlOpenResponse, SmlCloseResponse, SmlGetListResponse, SmlListEntry, SmlMessage
from smllib.builder import SmlOpenResponseBuilder, SmlCloseResponseBuilder, SmlGetListResponseBuilder, SmlListEntryBuilder, SmlMessageBuilder
from smllib.builder import SmlCloseResponseBuilder, \
SmlGetListResponseBuilder, SmlListEntryBuilder, SmlOpenResponseBuilder
from smllib.sml import SmlListEntry
from tests.helper import in_snip


def test_open_response():
r = SmlOpenResponseBuilder().build([None, None, 'ab', 'cd', None, 1], {})
r = SmlOpenResponseBuilder().build(in_snip([None, None, 'ab', 'cd', None, 1]), {})
assert r.format_msg() == '<SmlOpenResponse>\n' \
' codepage : None\n' \
' client_id : None\n' \
Expand All @@ -14,21 +16,21 @@ def test_open_response():


def test_close_response():
r = SmlCloseResponseBuilder().build(['my_sig'], {})
r = SmlCloseResponseBuilder().build(in_snip(['my_sig']), {})
assert r.format_msg() == '<SmlCloseResponse>\n' \
' global_signature: my_sig\n'


def test_list_entry():
data = [
data = in_snip([
None, 'server', None, None,
[['obis1', None, None, None, None, '76616c31', None], ['obis2', None, None, None, None, '76616c32', None]],
None, None
]
])

builder = SmlGetListResponseBuilder()
obj = builder.build(data, {SmlListEntry: SmlListEntryBuilder()})

assert obj.format_msg() == '<SmlGetListResponse>\n' \
' client_id : None\n' \
' sever_id : server\n' \
Expand Down
13 changes: 7 additions & 6 deletions tests/sml/test_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@

from smllib.errors import UnsupportedType
from smllib.sml.sml_time import build_time
from tests.helper import in_snip


def test_sml_time():
assert build_time(None) is None
assert build_time([1, 253]) == 253
assert build_time([2, 1609466461]) == datetime(2021, 1, 1, 2, 1, 1)
assert build_time(in_snip([1, 253], pack_top=False)) == 253
assert build_time(in_snip([2, 1609466461], pack_top=False)) == datetime(2021, 1, 1, 2, 1, 1)

assert build_time([3, [1609466461, 120, 0]]) == datetime(2021, 1, 1, 4, 1, 1)
assert build_time([3, [1622509261, 120, 60]]) == datetime(2021, 6, 1, 4, 1, 1)
assert build_time([3, [1622509261, 120, 30]]) == datetime(2021, 6, 1, 3, 31, 1)
assert build_time(in_snip([3, [1609466461, 120, 0]], pack_top=False)) == datetime(2021, 1, 1, 4, 1, 1)
assert build_time(in_snip([3, [1622509261, 120, 60]], pack_top=False)) == datetime(2021, 6, 1, 4, 1, 1)
assert build_time(in_snip([3, [1622509261, 120, 30]], pack_top=False)) == datetime(2021, 6, 1, 3, 31, 1)


def test_exception():
with pytest.raises(UnsupportedType) as e:
build_time([5, 55])
build_time([in_snip(5), in_snip(55)])
assert e.value.type == 5
assert str(e.value) == 'Unsupported type 0x05'
Loading

0 comments on commit 149f11a

Please sign in to comment.