From c04ea3fb1c2e9295ef186334d78a6388622be1de Mon Sep 17 00:00:00 2001
From: semuadmin <28569967+semuadmin@users.noreply.github.com>
Date: Mon, 20 May 2024 16:44:01 +0100
Subject: [PATCH 1/5] streamline variant processing
---
README.md | 4 +-
docs/pyubx2.rst | 8 +
src/pyubx2/socket_stream.py | 4 +-
src/pyubx2/ubxmessage.py | 316 ++----------------------------------
src/pyubx2/ubxreader.py | 45 ++---
src/pyubx2/ubxvariants.py | 291 +++++++++++++++++++++++++++++++++
tests/test_exceptions.py | 31 ++--
7 files changed, 350 insertions(+), 349 deletions(-)
create mode 100644 src/pyubx2/ubxvariants.py
diff --git a/README.md b/README.md
index d5b33eb..3015d18 100644
--- a/README.md
+++ b/README.md
@@ -449,9 +449,7 @@ The UBX protocol is principally defined in the modules `ubxtypes_*.py` as a seri
Repeating attribute names are parsed with a two-digit suffix (svid_01, svid_02, etc.). Nested repeating groups are supported. See CFG-VALGET, MON-SPAN, NAV-PVT, NAV-SAT and RXM-RLM by way of examples.
-In most cases, a UBX message's content (payload) is uniquely defined by its class, id and mode; accommodating the message simply requires the addition of an appropriate dictionary entry to the relevant `ubxtypes_*.py` module(s).
-
-However, there are a handful of message types which have multiple possible payload definitions for the same class, id and mode. These exceptional message types require dedicated routines in `ubxmessage.py` which examine elements of the payload itself in order to determine the appropriate dictionary definition. This currently applies to the following message types: CFG-NMEA, NAV-RELPOSNED, RXM-PMP, RXM-PMREQ, RXM-RLM, TIM-VCOCAL.
+In most cases, a UBX message's content (payload) is uniquely defined by its class, id and mode; accommodating the message simply requires the addition of an appropriate dictionary entry to the relevant `ubxtypes_*.py` module(s). However, there are a handful of message types which have multiple possible payload variants for the same class, id and mode. These exceptional message types are handled in `ubxvariants.py`, to which any additional variants should be added.
---
## Troubleshooting
diff --git a/docs/pyubx2.rst b/docs/pyubx2.rst
index 3f04321..a94205b 100644
--- a/docs/pyubx2.rst
+++ b/docs/pyubx2.rst
@@ -92,6 +92,14 @@ pyubx2.ubxtypes\_set module
:undoc-members:
:show-inheritance:
+pyubx2.ubxvariants module
+-------------------------
+
+.. automodule:: pyubx2.ubxvariants
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
Module contents
---------------
diff --git a/src/pyubx2/socket_stream.py b/src/pyubx2/socket_stream.py
index c8573e3..b94ddff 100644
--- a/src/pyubx2/socket_stream.py
+++ b/src/pyubx2/socket_stream.py
@@ -48,7 +48,7 @@ def _recv(self) -> bool:
try:
data = self._socket.recv(self._bufsize)
if len(data) == 0:
- return False
+ return False # pragma: no cover
self._buffer += data
except (OSError, TimeoutError):
return False
@@ -100,7 +100,7 @@ def readline(self) -> bytes:
if line[-1:] == b"\n": # LF
break
else:
- break
+ break # pragma: no cover
return line
diff --git a/src/pyubx2/ubxmessage.py b/src/pyubx2/ubxmessage.py
index d357ffa..f3aa27f 100644
--- a/src/pyubx2/ubxmessage.py
+++ b/src/pyubx2/ubxmessage.py
@@ -1,4 +1,6 @@
"""
+ubxmessage.py
+
Main UBX Message Protocol Class.
Created on 26 Sep 2020
@@ -17,6 +19,7 @@
import pyubx2.ubxtypes_get as ubg
import pyubx2.ubxtypes_poll as ubp
import pyubx2.ubxtypes_set as ubs
+import pyubx2.ubxvariants as umv
from pyubx2.ubxhelpers import (
attsiz,
bytes2val,
@@ -437,7 +440,7 @@ def _get_dict(self, **kwargs) -> dict:
"""
Get payload dictionary corresponding to message mode (GET/SET/POLL)
Certain message types need special handling as alternate payload
- definitions exist for the same ubxClass/ubxID.
+ variants exist for the same ubxClass/ubxID/mode.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
@@ -446,312 +449,29 @@ def _get_dict(self, **kwargs) -> dict:
"""
try:
- if self._mode == ubt.POLL:
- # CFG-TP5 POLL
- if self._ubxClass == b"\x06" and self._ubxID == b"\x31":
- pdict = self._get_cfgtp5_version(**kwargs)
- else:
- pdict = ubp.UBX_PAYLOADS_POLL[self.identity]
+ msg = self._ubxClass + self._ubxID
+ variant = umv.VARIANTS[self._mode].get(msg, False)
+ if variant and msg[0] == 0x13: # MGA
+ pdict = variant(msg, self._mode, **kwargs)
+ elif variant:
+ pdict = variant(**kwargs)
+ elif self._mode == ubt.POLL:
+ pdict = ubp.UBX_PAYLOADS_POLL[self.identity]
elif self._mode == ubt.SET:
- # MGA SET
- if self._ubxClass == b"\x13" and self._ubxID != b"\x80":
- pdict = self._get_mga_version(ubt.SET, **kwargs)
- # RXM-PMP SET
- elif self._ubxClass == b"\x02" and self._ubxID == b"\x72":
- pdict = self._get_rxmpmp_version(**kwargs)
- # RXM-PMREQ SET
- elif self._ubxClass == b"\x02" and self._ubxID == b"\x41":
- pdict = self._get_rxmpmreq_version(**kwargs)
- # TIM-VCOCAL SET
- elif self._ubxClass == b"\x0d" and self._ubxID == b"\x15":
- pdict = self._get_timvcocal_version(**kwargs)
- # CFG-DAT SET
- elif self._ubxClass == b"\x06" and self._ubxID == b"\x06":
- pdict = self._get_cfgdat_version(**kwargs)
- else:
- pdict = ubs.UBX_PAYLOADS_SET[self.identity]
+ pdict = ubs.UBX_PAYLOADS_SET[self.identity]
else:
- # MGA GET
- if self._ubxClass == b"\x13" and self._ubxID != b"\x80":
- pdict = self._get_mga_version(ubt.GET, **kwargs)
- # RXM-PMP GET
- elif self._ubxClass == b"\x02" and self._ubxID == b"\x72":
- pdict = self._get_rxmpmp_version(**kwargs)
- # RXM-RLM GET
- elif self._ubxClass == b"\x02" and self._ubxID == b"\x59":
- pdict = self._get_rxmrlm_version(**kwargs)
- # CFG-NMEA GET
- elif self._ubxClass == b"\x06" and self._ubxID == b"\x17":
- pdict = self._get_cfgnmea_version(**kwargs)
- # NAV-AOPSTATUS GET
- elif self._ubxClass == b"\x01" and self._ubxID == b"\x60":
- pdict = self._get_aopstatus_version(**kwargs)
- # NAV-RELPOSNED GET
- elif self._ubxClass == b"\x01" and self._ubxID == b"\x3C":
- pdict = self._get_relposned_version(**kwargs)
# Unknown GET message, parsed to nominal definition
- elif self.identity[-7:] == "NOMINAL":
+ if self.identity[-7:] == "NOMINAL":
pdict = {}
else:
pdict = ubg.UBX_PAYLOADS_GET[self.identity]
return pdict
except KeyError as err:
- raise KeyError(
- f"{err} - Check 'msgmode' keyword argument is appropriate for data stream"
- ) from err
-
- def _get_cfgtp5_version(self, **kwargs) -> dict:
- """
- Select appropriate CFG-TP5 POLL payload definition by checking
- presence of tpIdx or payload argument.
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
-
- """
-
- lp = 0
- if "payload" in kwargs:
- lp = len(kwargs["payload"])
- elif "tpIdx" in kwargs:
- lp = 1
- if lp == 1:
- pdict = ubp.UBX_PAYLOADS_POLL["CFG-TP5-TPX"]
- else:
- pdict = ubp.UBX_PAYLOADS_POLL["CFG-TP5"]
- return pdict
-
- def _get_mga_version(self, mode: int, **kwargs) -> dict:
- """
- Select appropriate MGA payload definition by checking
- value of 'type' attribute (1st byte of payload).
-
- :param str mode: mode (0=GET, 1=SET, 2=POLL)
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
- :raises: UBXMessageError
-
- """
-
- if "type" in kwargs:
- typ = val2bytes(kwargs["type"], ubt.U1)
- elif "payload" in kwargs:
- typ = kwargs["payload"][0:1]
- else:
- raise ube.UBXMessageError(
- "MGA message definitions must include type or payload keyword"
- )
- identity = ubt.UBX_MSGIDS[self._ubxClass + self._ubxID + typ]
- if mode == ubt.SET:
- pdict = ubs.UBX_PAYLOADS_SET[identity]
- else:
- pdict = ubg.UBX_PAYLOADS_GET[identity]
- return pdict
-
- def _get_rxmpmreq_version(self, **kwargs) -> dict:
- """
- Select appropriate RXM-PMREQ payload definition by checking
- the 'version' keyword or payload length.
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
- :raises: UBXMessageError
-
- """
-
- lpd = 0
- if "version" in kwargs: # assume longer version
- lpd = 16
- elif "payload" in kwargs:
- lpd = len(kwargs["payload"])
- else:
- raise ube.UBXMessageError(
- "RXM-PMREQ message definitions must include version or payload keyword"
- )
- if lpd == 16:
- pdict = ubs.UBX_PAYLOADS_SET["RXM-PMREQ"] # long
- else:
- pdict = ubs.UBX_PAYLOADS_SET["RXM-PMREQ-S"] # short
- return pdict
-
- def _get_rxmpmp_version(self, **kwargs) -> dict:
- """
- Select appropriate RXM-PMP payload definition by checking
- value of 'version' attribute (1st byte of payload).
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
- :raises: UBXMessageError
-
- """
-
- if "version" in kwargs:
- ver = val2bytes(kwargs["version"], ubt.U1)
- elif "payload" in kwargs:
- ver = kwargs["payload"][0:1]
- else:
- raise ube.UBXMessageError(
- "RXM-PMP message definitions must include version or payload keyword"
- )
- if ver == b"\x00":
- pdict = ubs.UBX_PAYLOADS_SET["RXM-PMP-V0"]
- else:
- pdict = ubs.UBX_PAYLOADS_SET["RXM-PMP-V1"]
- return pdict
-
- def _get_rxmrlm_version(self, **kwargs) -> dict:
- """
- Select appropriate RXM-RLM payload definition by checking
- value of 'type' attribute (2nd byte of payload).
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
- :raises: UBXMessageError
-
- """
-
- if "type" in kwargs:
- typ = val2bytes(kwargs["type"], ubt.U1)
- elif "payload" in kwargs:
- typ = kwargs["payload"][1:2]
- else:
- raise ube.UBXMessageError(
- "RXM-RLM message definitions must include type or payload keyword"
- )
- if typ == b"\x01":
- pdict = ubg.UBX_PAYLOADS_GET["RXM-RLM-S"] # short
- else:
- pdict = ubg.UBX_PAYLOADS_GET["RXM-RLM-L"] # long
- return pdict
-
- def _get_cfgnmea_version(self, **kwargs) -> dict:
- """
- Select appropriate payload definition version for older
- generations of CFG-NMEA message by checking payload length.
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
- :raises: UBXMessageError
-
- """
-
- if "payload" in kwargs:
- lpd = len(kwargs["payload"])
- else:
- raise ube.UBXMessageError(
- "CFG-NMEA message definitions must include payload keyword"
- )
- if lpd == 4:
- pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEAvX"]
- elif lpd == 12:
- pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEAv0"]
- else:
- pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEA"]
- return pdict
-
- def _get_aopstatus_version(self, **kwargs) -> dict:
- """
- Select appropriate payload definition version for older
- generations of NAV-AOPSTATUS message by checking payload length.
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
- :raises: UBXMessageError
-
- """
-
- if "payload" in kwargs:
- lpd = len(kwargs["payload"])
- else:
- raise ube.UBXMessageError(
- "NAV-AOPSTATUS message definitions must include payload keyword"
- )
- if lpd == 20:
- pdict = ubg.UBX_PAYLOADS_GET["NAV-AOPSTATUS-L"]
- else:
- pdict = ubg.UBX_PAYLOADS_GET["NAV-AOPSTATUS"]
- return pdict
-
- def _get_relposned_version(self, **kwargs) -> dict:
- """
- Select appropriate NAV-RELPOSNED payload definition by checking
- value of 'version' attribute (1st byte of payload).
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
- :raises: UBXMessageError
-
- """
-
- if "version" in kwargs:
- ver = val2bytes(kwargs["version"], ubt.U1)
- elif "payload" in kwargs:
- ver = kwargs["payload"][0:1]
- else:
raise ube.UBXMessageError(
- "NAV-RELPOSNED message definitions must include version or payload keyword"
- )
- if ver == b"\x00":
- pdict = ubg.UBX_PAYLOADS_GET["NAV-RELPOSNED-V0"]
- else:
- pdict = ubg.UBX_PAYLOADS_GET["NAV-RELPOSNED"]
- return pdict
-
- def _get_timvcocal_version(self, **kwargs) -> dict:
- """
- Select appropriate TIM-VCOCAL SET payload definition by checking
- the payload length.
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
- :raises: UBXMessageError
-
- """
-
- lpd = 1
- typ = 0
- if "type" in kwargs:
- typ = kwargs["type"]
- elif "payload" in kwargs:
- lpd = len(kwargs["payload"])
- else:
- raise ube.UBXMessageError(
- "TIM-VCOCAL SET message definitions must include type or payload keyword"
- )
- if lpd == 1 and typ == 0:
- pdict = ubs.UBX_PAYLOADS_SET["TIM-VCOCAL-V0"] # stop cal
- else:
- pdict = ubs.UBX_PAYLOADS_SET["TIM-VCOCAL"] # cal
- return pdict
-
- def _get_cfgdat_version(self, **kwargs) -> dict:
- """
- Select appropriate CFG-DAT SET payload definition by checking
- presence of datumNum keyword or payload length of 2 bytes.
-
- :param kwargs: optional payload key/value pairs
- :return: dictionary representing payload definition
- :rtype: dict
-
- """
-
- lpd = 0
- if "payload" in kwargs:
- lpd = len(kwargs["payload"])
- if lpd == 2 or "datumNum" in kwargs:
- pdict = ubs.UBX_PAYLOADS_SET["CFG-DAT-NUM"] # datum num set
- else:
- pdict = ubs.UBX_PAYLOADS_SET["CFG-DAT"] # manual datum set
- return pdict
+ f"Unknown message type {escapeall(self._ubxClass + self._ubxID)}"
+ f", mode {["GET", "SET", "POLL"][self._mode]}. "
+ "Check 'msgmode' setting is appropriate for data stream"
+ ) from err
def _calc_num_repeats(
self, attd: dict, payload: bytes, offset: int, offsetend: int = 0
diff --git a/src/pyubx2/ubxreader.py b/src/pyubx2/ubxreader.py
index fb74de8..07dd042 100644
--- a/src/pyubx2/ubxreader.py
+++ b/src/pyubx2/ubxreader.py
@@ -234,7 +234,6 @@ def _parse_ubx(self, hdr: bytes) -> tuple:
parsed_data = self.parse(
raw_data,
validate=self._validate,
- quitonerror=self._quitonerror,
msgmode=self._msgmode,
parsebitfield=self._parsebf,
scaling=self._scaling,
@@ -325,7 +324,7 @@ def _read_line(self) -> bytes:
data = self._stream.readline() # NMEA protocol is CRLF-terminated
if len(data) == 0:
- raise EOFError() # EOF
+ raise EOFError() # pragma: no cover
if data[-1:] != b"\x0a": # truncated stream
raise UBXStreamError(
"Serial stream terminated unexpectedly. "
@@ -367,7 +366,6 @@ def parse(
message: bytes,
msgmode: int = GET,
validate: int = VALCKSUM,
- quitonerror: int = ERR_LOG,
parsebitfield: bool = True,
scaling: bool = True,
) -> object:
@@ -378,8 +376,6 @@ def parse(
:param int msgmode: GET (0), SET (1), POLL (2) (0)
:param int validate: VALCKSUM (1) = Validate checksum,
VALNONE (0) = ignore invalid checksum (1)
- :param int quitonerror: ERR_IGNORE (0) = ignore errors, ERR_LOG (1) = log continue,
- ERR_RAISE (2) = (re)raise (1)
:param bool parsebitfield: 1 = parse bitfields, 0 = leave as bytes (1)
:param bool scaling: 1 = apply scale factors, 0 = do not apply (1)
:return: UBXMessage object
@@ -392,7 +388,6 @@ def parse(
raise UBXParseError(
f"Invalid message mode {msgmode} - must be 0, 1, 2 or 3"
)
- logger = getLogger(__name__)
lenm = len(message)
hdr = message[0:2]
@@ -426,28 +421,16 @@ def parse(
raise UBXParseError(
(f"Message checksum {ckm}" f" invalid - should be {ckv}")
)
- try:
- # if input message (SET or POLL), determine mode automatically
- if msgmode == SETPOLL:
- msgmode = getinputmode(message) # returns SET or POLL
- if payload is None:
- return UBXMessage(clsid, msgid, msgmode)
- return UBXMessage(
- clsid,
- msgid,
- msgmode,
- payload=payload,
- parsebitfield=parsebitfield,
- scaling=scaling,
- )
- except KeyError as err:
- modestr = ["GET", "SET", "POLL"][msgmode]
- errmsg = (
- f"Unknown message type clsid {clsid}, msgid {msgid}, mode {modestr}\n"
- "Check 'msgmode' keyword argument is appropriate for data stream"
- )
- if quitonerror == ERR_RAISE:
- raise UBXParseError(errmsg) from err
- if quitonerror == ERR_LOG:
- logger.error(errmsg)
- return None
+ # if input message (SET or POLL), determine mode automatically
+ if msgmode == SETPOLL:
+ msgmode = getinputmode(message) # returns SET or POLL
+ if payload is None:
+ return UBXMessage(clsid, msgid, msgmode)
+ return UBXMessage(
+ clsid,
+ msgid,
+ msgmode,
+ payload=payload,
+ parsebitfield=parsebitfield,
+ scaling=scaling,
+ )
diff --git a/src/pyubx2/ubxvariants.py b/src/pyubx2/ubxvariants.py
new file mode 100644
index 0000000..960b283
--- /dev/null
+++ b/src/pyubx2/ubxvariants.py
@@ -0,0 +1,291 @@
+"""
+ubxvariants.py
+
+Various routines to get payload dictionaries for message
+types which exist in multiple variants for the same
+message class, id and mode.
+
+Created on 20 May 2024
+
+:author: semuadmin
+:copyright: SEMU Consulting © 2020
+:license: BSD 3-Clause
+"""
+
+from pyubx2.exceptions import UBXMessageError
+from pyubx2.ubxhelpers import val2bytes
+from pyubx2.ubxtypes_core import GET, POLL, SET, U1, UBX_MSGIDS
+from pyubx2.ubxtypes_get import UBX_PAYLOADS_GET
+from pyubx2.ubxtypes_poll import UBX_PAYLOADS_POLL
+from pyubx2.ubxtypes_set import UBX_PAYLOADS_SET
+
+
+def get_cfgtp5_dict(**kwargs) -> dict:
+ """
+ Select appropriate CFG-TP5 POLL payload definition by checking
+ presence of tpIdx or payload argument.
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+
+ """
+
+ lp = 0
+ if "payload" in kwargs:
+ lp = len(kwargs["payload"])
+ elif "tpIdx" in kwargs:
+ lp = 1
+ print(f"DEBUG TP5 dict {kwargs} len payload {lp}")
+ if lp == 1:
+ return UBX_PAYLOADS_POLL["CFG-TP5-TPX"]
+ return UBX_PAYLOADS_POLL["CFG-TP5"] # pragma: no cover
+
+
+def get_mga_dict(msg: bytes, mode: int, **kwargs) -> dict:
+ """
+ Select appropriate MGA payload definition by checking
+ value of 'type' attribute (1st byte of payload).
+
+ :param str mode: mode (0=GET, 1=SET, 2=POLL)
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+ :raises: UBXMessageError
+
+ """
+
+ if "type" in kwargs:
+ typ = val2bytes(kwargs["type"], U1)
+ elif "payload" in kwargs:
+ typ = kwargs["payload"][0:1]
+ else:
+ raise UBXMessageError(
+ "MGA message definitions must include type or payload keyword"
+ )
+ identity = UBX_MSGIDS[msg + typ]
+ if mode == SET:
+ return UBX_PAYLOADS_SET[identity]
+ return UBX_PAYLOADS_GET[identity]
+
+
+def get_rxmpmreq_dict(**kwargs) -> dict:
+ """
+ Select appropriate RXM-PMREQ payload definition by checking
+ the 'version' keyword or payload length.
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+ :raises: UBXMessageError
+
+ """
+
+ lpd = 0
+ if "version" in kwargs: # assume longer version
+ lpd = 16
+ elif "payload" in kwargs:
+ lpd = len(kwargs["payload"])
+ else:
+ raise UBXMessageError(
+ "RXM-PMREQ message definitions must include version or payload keyword"
+ )
+ if lpd == 16:
+ return UBX_PAYLOADS_SET["RXM-PMREQ"] # long
+ return UBX_PAYLOADS_SET["RXM-PMREQ-S"] # short
+
+
+def get_rxmpmp_dict(**kwargs) -> dict:
+ """
+ Select appropriate RXM-PMP payload definition by checking
+ value of 'version' attribute (1st byte of payload).
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+ :raises: UBXMessageError
+
+ """
+
+ if "version" in kwargs:
+ ver = val2bytes(kwargs["version"], U1)
+ elif "payload" in kwargs:
+ ver = kwargs["payload"][0:1]
+ else:
+ raise UBXMessageError(
+ "RXM-PMP message definitions must include version or payload keyword"
+ )
+ if ver == b"\x00":
+ return UBX_PAYLOADS_SET["RXM-PMP-V0"]
+ return UBX_PAYLOADS_SET["RXM-PMP-V1"]
+
+
+def get_rxmrlm_dict(**kwargs) -> dict:
+ """
+ Select appropriate RXM-RLM payload definition by checking
+ value of 'type' attribute (2nd byte of payload).
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+ :raises: UBXMessageError
+
+ """
+
+ if "type" in kwargs:
+ typ = val2bytes(kwargs["type"], U1)
+ elif "payload" in kwargs:
+ typ = kwargs["payload"][1:2]
+ else:
+ raise UBXMessageError(
+ "RXM-RLM message definitions must include type or payload keyword"
+ )
+ if typ == b"\x01":
+ return UBX_PAYLOADS_GET["RXM-RLM-S"] # short
+ return UBX_PAYLOADS_GET["RXM-RLM-L"] # long
+
+
+def get_cfgnmea_dict(**kwargs) -> dict:
+ """
+ Select appropriate payload definition version for older
+ generations of CFG-NMEA message by checking payload length.
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+ :raises: UBXMessageError
+
+ """
+
+ if "payload" in kwargs:
+ lpd = len(kwargs["payload"])
+ else:
+ raise UBXMessageError(
+ "CFG-NMEA message definitions must include payload keyword"
+ )
+ if lpd == 4:
+ return UBX_PAYLOADS_GET["CFG-NMEAvX"]
+ if lpd == 12:
+ return UBX_PAYLOADS_GET["CFG-NMEAv0"]
+ return UBX_PAYLOADS_GET["CFG-NMEA"]
+
+
+def get_aopstatus_dict(**kwargs) -> dict:
+ """
+ Select appropriate payload definition version for older
+ generations of NAV-AOPSTATUS message by checking payload length.
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+ :raises: UBXMessageError
+
+ """
+
+ if "payload" in kwargs:
+ lpd = len(kwargs["payload"])
+ else:
+ raise UBXMessageError(
+ "NAV-AOPSTATUS message definitions must include payload keyword"
+ )
+ if lpd == 20:
+ return UBX_PAYLOADS_GET["NAV-AOPSTATUS-L"]
+ return UBX_PAYLOADS_GET["NAV-AOPSTATUS"]
+
+
+def get_relposned_dict(**kwargs) -> dict:
+ """
+ Select appropriate NAV-RELPOSNED payload definition by checking
+ value of 'version' attribute (1st byte of payload).
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+ :raises: UBXMessageError
+
+ """
+
+ if "version" in kwargs:
+ ver = val2bytes(kwargs["version"], U1)
+ elif "payload" in kwargs:
+ ver = kwargs["payload"][0:1]
+ else:
+ raise UBXMessageError(
+ "NAV-RELPOSNED message definitions must include version or payload keyword"
+ )
+ if ver == b"\x00":
+ return UBX_PAYLOADS_GET["NAV-RELPOSNED-V0"]
+ return UBX_PAYLOADS_GET["NAV-RELPOSNED"]
+
+
+def get_timvcocal_dict(**kwargs) -> dict:
+ """
+ Select appropriate TIM-VCOCAL SET payload definition by checking
+ the payload length.
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+ :raises: UBXMessageError
+
+ """
+
+ lpd = 1
+ typ = 0
+ if "type" in kwargs:
+ typ = kwargs["type"]
+ elif "payload" in kwargs:
+ lpd = len(kwargs["payload"])
+ else:
+ raise UBXMessageError(
+ "TIM-VCOCAL SET message definitions must include type or payload keyword"
+ )
+ if lpd == 1 and typ == 0:
+ return UBX_PAYLOADS_SET["TIM-VCOCAL-V0"] # stop cal
+ return UBX_PAYLOADS_SET["TIM-VCOCAL"] # cal
+
+
+def get_cfgdat_dict(**kwargs) -> dict:
+ """
+ Select appropriate CFG-DAT SET payload definition by checking
+ presence of datumNum keyword or payload length of 2 bytes.
+
+ :param kwargs: optional payload key/value pairs
+ :return: dictionary representing payload definition
+ :rtype: dict
+
+ """
+
+ lpd = 0
+ if "payload" in kwargs:
+ lpd = len(kwargs["payload"])
+ if lpd == 2 or "datumNum" in kwargs:
+ return UBX_PAYLOADS_SET["CFG-DAT-NUM"] # datum num set
+ return UBX_PAYLOADS_SET["CFG-DAT"] # manual datum set
+
+
+VARIANTS = {
+ POLL: {b"\x06\x31": get_cfgtp5_dict}, # CFG-TP5
+ SET: {
+ b"\x13\x00": get_mga_dict, # MGA GPS
+ b"\x13\x02": get_mga_dict, # MGA GAL
+ b"\x13\x03": get_mga_dict, # MGA BDS
+ b"\x13\x05": get_mga_dict, # MGA QZSS
+ b"\x13\x06": get_mga_dict, # MGA GLO
+ b"\x13\x21": get_mga_dict, # MGA FLASH
+ b"\x13\x40": get_mga_dict, # MGA INI
+ b"\x02\x72": get_rxmpmp_dict, # RXM-PMP
+ b"\x02\x41": get_rxmpmreq_dict, # RXM-PMREQ
+ b"\x0d\x15": get_timvcocal_dict, # TIM-VCOCAL
+ b"\x06\x06": get_cfgdat_dict, # CFG-DAT
+ },
+ GET: {
+ b"\x13\x21": get_mga_dict, # MGA FLASH
+ b"\x13\x60": get_mga_dict, # MGA ACK NAK
+ b"\x02\x72": get_rxmpmp_dict, # RXM-PMP
+ b"\x02\x59": get_rxmrlm_dict, # RXM-RLM
+ b"\x06\x17": get_cfgnmea_dict, # CFG-NMEA
+ b"\x01\x60": get_aopstatus_dict, # NAV-AOPSTATUS
+ b"\x01\x3C": get_relposned_dict, # NAV-RELPOSNED
+ },
+}
diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py
index 9f25eb7..d223f15 100644
--- a/tests/test_exceptions.py
+++ b/tests/test_exceptions.py
@@ -13,6 +13,7 @@
import os
from io import BytesIO
import unittest
+from logging import ERROR
from pyubx2 import (
UBXMessage,
@@ -25,6 +26,7 @@
SET,
POLL,
VALCKSUM,
+ ERR_LOG,
ERR_RAISE,
)
from pyubx2.ubxhelpers import (
@@ -332,9 +334,10 @@ def testParseMode(self): # test invalid parse message mode
)
def testParseMode2(self): # test parser with incorrect mode for input message
- EXPECTED_ERROR = "Unknown message type clsid (.*), msgid (.*), mode GET"
- with self.assertRaisesRegex(UBXParseError, EXPECTED_ERROR):
- UBXReader.parse(self.mga_ini, validate=VALCKSUM, quitonerror=ERR_RAISE)
+ EXPECTED_ERROR = "Unknown message type b'\\x13\\x40', mode GET. Check 'msgmode' setting is appropriate for data stream"
+ with self.assertRaises(UBXMessageError) as context:
+ UBXReader.parse(self.mga_ini, validate=VALCKSUM)
+ self.assertTrue(EXPECTED_ERROR in str(context.exception))
def testStreamMode(self): # test invalid stream message mode
EXPECTED_ERROR = "Invalid stream mode 4 - must be 0, 1, 2 or 3"
@@ -352,19 +355,17 @@ def testBytes2Val(self): # test invalid attribute type
bytes2val(b"\x01", "Z001")
def testWRONGMSGMODE(self): # test parse of SET message with GET msgmode
- EXPECTED_ERROR = (
- "Unknown message type clsid (.*), msgid (.*), mode GET\n"
- + "Check 'msgmode' keyword argument is appropriate for data stream"
- )
+ EXPECTED_ERROR = "Unknown message type b'\\x06\\x8a', mode GET. Check 'msgmode' setting is appropriate for data stream"
EXPECTED_RESULT = ""
- res = UBXMessage(
- "CFG",
- "CFG-VALSET",
- SET,
- payload=b"\x00\x03\x00\x00\x01\x00\x52\x40\x80\x25\x00\x00",
- )
- with self.assertRaisesRegex(UBXParseError, EXPECTED_ERROR):
- msg = UBXReader.parse(res.serialize(), quitonerror=ERR_RAISE)
+ with self.assertRaises(UBXMessageError) as context:
+ res = UBXMessage(
+ "CFG",
+ "CFG-VALSET",
+ SET,
+ payload=b"\x00\x03\x00\x00\x01\x00\x52\x40\x80\x25\x00\x00",
+ )
+ msg = UBXReader.parse(res.serialize())
+ self.assertTrue(EXPECTED_ERROR in str(context.exception))
msg = UBXReader.parse(res.serialize(), msgmode=SET)
self.assertEqual(str(msg), EXPECTED_RESULT)
From 7221f39c3791331ad553cc6b5d86bef3c6b9ad06 Mon Sep 17 00:00:00 2001
From: semuadmin <28569967+semuadmin@users.noreply.github.com>
Date: Mon, 20 May 2024 21:48:24 +0100
Subject: [PATCH 2/5] minor performance tweaks
---
src/pyubx2/ubxmessage.py | 168 ++++++++++++++++++++-------------------
src/pyubx2/ubxreader.py | 8 --
2 files changed, 88 insertions(+), 88 deletions(-)
diff --git a/src/pyubx2/ubxmessage.py b/src/pyubx2/ubxmessage.py
index f3aa27f..f4a72f0 100644
--- a/src/pyubx2/ubxmessage.py
+++ b/src/pyubx2/ubxmessage.py
@@ -14,12 +14,7 @@
import struct
-import pyubx2.exceptions as ube
-import pyubx2.ubxtypes_core as ubt
-import pyubx2.ubxtypes_get as ubg
-import pyubx2.ubxtypes_poll as ubp
-import pyubx2.ubxtypes_set as ubs
-import pyubx2.ubxvariants as umv
+from pyubx2.exceptions import UBXMessageError, UBXTypeError
from pyubx2.ubxhelpers import (
attsiz,
bytes2val,
@@ -34,6 +29,29 @@
nomval,
val2bytes,
)
+from pyubx2.ubxtypes_core import (
+ CH,
+ GET,
+ POLL,
+ SCALROUND,
+ SET,
+ U1,
+ U2,
+ U4,
+ UBX_CLASSES,
+ UBX_HDR,
+ UBX_MSGIDS,
+ X1,
+ X2,
+ X4,
+ X6,
+ X8,
+ X24,
+)
+from pyubx2.ubxtypes_get import UBX_PAYLOADS_GET
+from pyubx2.ubxtypes_poll import UBX_PAYLOADS_POLL
+from pyubx2.ubxtypes_set import UBX_PAYLOADS_SET
+from pyubx2.ubxvariants import VARIANTS
class UBXMessage:
@@ -45,7 +63,6 @@ def __init__(
ubxID,
msgmode: int,
parsebitfield: bool = True,
- scaling: bool = True,
**kwargs,
):
"""Constructor.
@@ -62,7 +79,6 @@ def __init__(
:param object msgID: message ID as str, int or byte
:param int msgmode: message mode (0=GET, 1=SET, 2=POLL)
:param bool parsebitfield: parse bitfields ('X' type attributes) Y/N
- :param bool scaling: apply scale factors Y/N
:param kwargs: optional payload keyword arguments
:raises: UBXMessageError
"""
@@ -73,12 +89,10 @@ def __init__(
self._payload = b""
self._length = b""
self._checksum = b""
-
self._parsebf = parsebitfield # parsing bitfields Y/N?
- self._scaling = scaling # apply scale factors Y/N?
- if msgmode not in (ubt.GET, ubt.SET, ubt.POLL):
- raise ube.UBXMessageError(f"Invalid msgmode {msgmode} - must be 0, 1 or 2")
+ if msgmode not in (GET, SET, POLL):
+ raise UBXMessageError(f"Invalid msgmode {msgmode} - must be 0, 1 or 2")
# accommodate different formats of msgClass and msgID
if isinstance(ubxClass, str) and isinstance(
@@ -126,7 +140,7 @@ def _do_attributes(self, **kwargs):
TypeError,
ValueError,
) as err:
- raise ube.UBXTypeError(
+ raise UBXTypeError(
(
f"Incorrect type for attribute '{anam}' "
f"in {['GET', 'SET', 'POLL'][self._mode]} message "
@@ -134,7 +148,7 @@ def _do_attributes(self, **kwargs):
)
) from err
except (OverflowError,) as err:
- raise ube.UBXTypeError(
+ raise UBXTypeError(
(
f"Overflow error for attribute '{anam}' "
f"in {['GET', 'SET', 'POLL'][self._mode]} message "
@@ -163,7 +177,7 @@ def _set_attribute(
adef, tuple
): # repeating group of attributes or subdefined bitfield
numr, _ = adef
- if numr in (ubt.X1, ubt.X2, ubt.X4, ubt.X6, ubt.X8, ubt.X24): # bitfield
+ if numr in (X1, X2, X4, X6, X8, X24): # bitfield
if self._parsebf: # if we're parsing bitfields
(offset, index) = self._set_attribute_bitfield(
adef, offset, index, **kwargs
@@ -201,8 +215,8 @@ def _set_attribute_group(
# if CFG-VALGET or CFG-VALSET message, use dedicated method to
# parse as configuration key value pairs
if self._ubxClass == b"\x06" and (
- (self._ubxID == b"\x8b" and self._mode == ubt.GET)
- or (self._ubxID == b"\x8a" and self._mode == ubt.SET)
+ (self._ubxID == b"\x8b" and self._mode == GET)
+ or (self._ubxID == b"\x8a" and self._mode == SET)
):
self._set_attribute_cfgval(offset, **kwargs)
else:
@@ -217,7 +231,7 @@ def _set_attribute_group(
if (
self._ubxClass == b"\x10"
and self._ubxID == b"\x02"
- and self._mode == ubt.SET
+ and self._mode == SET
):
if getattr(self, "calibTtagValid", 0):
gsiz += 1
@@ -256,7 +270,7 @@ def _set_attribute_single(
# if attribute is scaled
ares = 1
- if isinstance(adef, list) and self._scaling:
+ if isinstance(adef, list):
ares = adef[1] # attribute resolution (i.e. scaling factor)
adef = adef[0] # attribute definition
@@ -267,7 +281,7 @@ def _set_attribute_single(
anami += f"_{i:02d}"
# determine attribute size (bytes)
- if adef == ubt.CH: # variable length string
+ if adef == CH: # variable length string
asiz = len(self._payload)
else:
asiz = attsiz(adef)
@@ -279,7 +293,7 @@ def _set_attribute_single(
if ares == 1:
val = bytes2val(valb, adef)
else:
- val = round(bytes2val(valb, adef) * ares, ubt.SCALROUND)
+ val = round(bytes2val(valb, adef) * ares, SCALROUND)
else:
# if individual keyword has been provided,
# set to provided value, else set to
@@ -293,14 +307,11 @@ def _set_attribute_single(
if anami[0:3] == "_HP": # high precision component of earlier attribute
# add standard and high precision values in a single attribute
- setattr(
- self, anami[3:], round(getattr(self, anami[3:]) + val, ubt.SCALROUND)
- )
+ setattr(self, anami[3:], round(getattr(self, anami[3:]) + val, SCALROUND))
else:
setattr(self, anami, val)
- offset += asiz
- return offset
+ return offset + asiz
def _set_attribute_bitfield(
self, atyp: str, offset: int, index: list, **kwargs
@@ -336,11 +347,10 @@ def _set_attribute_bitfield(
)
# update payload
- offset += bsiz
if "payload" not in kwargs:
self._payload += bitfield.to_bytes(bsiz, "little")
- return (offset, index)
+ return (offset + bsiz, index)
def _set_attribute_bits(
self,
@@ -375,16 +385,14 @@ def _set_attribute_bits(
atts = attsiz(keyt) # determine flag size in bits
if "payload" in kwargs:
- mask = pow(2, atts) - 1
- val = (bitfield >> bfoffset) & mask
+ val = (bitfield >> bfoffset) & ((1 << atts) - 1)
else:
val = kwargs.get(keyr, 0)
bitfield = bitfield | (val << bfoffset)
if key[0:8] != "reserved": # don't bother to set reserved bits
setattr(self, keyr, val)
- bfoffset += atts
- return (bitfield, bfoffset)
+ return (bitfield, bfoffset + atts)
def _set_attribute_cfgval(self, offset: int, **kwargs):
"""
@@ -401,7 +409,7 @@ def _set_attribute_cfgval(self, offset: int, **kwargs):
if "payload" in kwargs:
self._payload = kwargs["payload"]
else:
- raise ube.UBXMessageError(
+ raise UBXMessageError(
"CFG-VALGET message definitions must include payload keyword"
)
cfglen = len(self._payload[offset:])
@@ -427,14 +435,11 @@ def _do_len_checksum(self):
"""
Calculate and format payload length and checksum as bytes."""
- if self._payload is None:
- self._length = val2bytes(0, ubt.U2)
- self._checksum = calc_checksum(self._ubxClass + self._ubxID + self._length)
- else:
- self._length = val2bytes(len(self._payload), ubt.U2)
- self._checksum = calc_checksum(
- self._ubxClass + self._ubxID + self._length + self._payload
- )
+ payload = b"" if self._payload is None else self._payload
+ self._length = val2bytes(len(payload), U2)
+ self._checksum = calc_checksum(
+ self._ubxClass + self._ubxID + self._length + payload
+ )
def _get_dict(self, **kwargs) -> dict:
"""
@@ -450,24 +455,24 @@ def _get_dict(self, **kwargs) -> dict:
try:
msg = self._ubxClass + self._ubxID
- variant = umv.VARIANTS[self._mode].get(msg, False)
+ variant = VARIANTS[self._mode].get(msg, False)
if variant and msg[0] == 0x13: # MGA
pdict = variant(msg, self._mode, **kwargs)
elif variant:
pdict = variant(**kwargs)
- elif self._mode == ubt.POLL:
- pdict = ubp.UBX_PAYLOADS_POLL[self.identity]
- elif self._mode == ubt.SET:
- pdict = ubs.UBX_PAYLOADS_SET[self.identity]
+ elif self._mode == POLL:
+ pdict = UBX_PAYLOADS_POLL[self.identity]
+ elif self._mode == SET:
+ pdict = UBX_PAYLOADS_SET[self.identity]
else:
# Unknown GET message, parsed to nominal definition
if self.identity[-7:] == "NOMINAL":
pdict = {}
else:
- pdict = ubg.UBX_PAYLOADS_GET[self.identity]
+ pdict = UBX_PAYLOADS_GET[self.identity]
return pdict
except KeyError as err:
- raise ube.UBXMessageError(
+ raise UBXMessageError(
f"Unknown message type {escapeall(self._ubxClass + self._ubxID)}"
f", mode {["GET", "SET", "POLL"][self._mode]}. "
"Check 'msgmode' setting is appropriate for data stream"
@@ -539,11 +544,11 @@ def __str__(self) -> str:
self._ubxClass == b"\x06" and self._ubxID == b"\x01"
):
if att in ["clsID", "msgClass"]:
- clsid = val2bytes(val, ubt.U1)
- val = ubt.UBX_CLASSES.get(clsid, clsid)
+ clsid = val2bytes(val, U1)
+ val = UBX_CLASSES.get(clsid, clsid)
if att == "msgID" and clsid:
- msgid = val2bytes(val, ubt.U1)
- val = ubt.UBX_MSGIDS.get(clsid + msgid, clsid + msgid)
+ msgid = val2bytes(val, U1)
+ val = UBX_MSGIDS.get(clsid + msgid, clsid + msgid)
stg += att + "=" + str(val)
if i < len(self.__dict__) - 1:
stg += ", "
@@ -577,7 +582,7 @@ def __setattr__(self, name, value):
"""
if self._immutable:
- raise ube.UBXMessageError(
+ raise UBXMessageError(
f"Object is immutable. Updates to {name} not permitted after initialisation."
)
@@ -592,11 +597,14 @@ def serialize(self) -> bytes:
"""
- output = ubt.UBX_HDR + self._ubxClass + self._ubxID + self._length
- output += (
- self._checksum if self._payload is None else self._payload + self._checksum
+ return (
+ UBX_HDR
+ + self._ubxClass
+ + self._ubxID
+ + self._length
+ + (b"" if self._payload is None else self._payload)
+ + self._checksum
)
- return output
@property
def identity(self) -> str:
@@ -616,15 +624,15 @@ def identity(self) -> str:
# all MGA messages except MGA-DBD need to be identified by the
# 'type' attribute - the first byte of the payload
if self._ubxClass == b"\x13" and self._ubxID != b"\x80":
- umsg_name = ubt.UBX_MSGIDS[
+ umsg_name = UBX_MSGIDS[
self._ubxClass + self._ubxID + self._payload[0:1]
]
else:
- umsg_name = ubt.UBX_MSGIDS[self._ubxClass + self._ubxID]
+ umsg_name = UBX_MSGIDS[self._ubxClass + self._ubxID]
except KeyError:
# unrecognised u-blox message, parsed to UBX-NOMINAL definition
- if self._ubxClass in ubt.UBX_CLASSES: # known class
- cls = ubt.UBX_CLASSES[self._ubxClass]
+ if self._ubxClass in UBX_CLASSES: # known class
+ cls = UBX_CLASSES[self._ubxClass]
else: # unknown class
cls = "UNKNOWN"
umsg_name = (
@@ -666,7 +674,7 @@ def length(self) -> int:
"""
- return bytes2val(self._length, ubt.U2)
+ return bytes2val(self._length, U2)
@property
def payload(self) -> bytes:
@@ -712,13 +720,13 @@ def config_set(layers: int, transaction: int, cfgData: list) -> object:
num = len(cfgData)
if num > 64:
- raise ube.UBXMessageError(
+ raise UBXMessageError(
f"Number of configuration tuples {num} exceeds maximum of 64"
)
- version = val2bytes(0 if transaction == 0 else 1, ubt.U1)
- layers = val2bytes(layers, ubt.U1)
- transaction = val2bytes(transaction, ubt.U1)
+ version = val2bytes(0 if transaction == 0 else 1, U1)
+ layers = val2bytes(layers, U1)
+ transaction = val2bytes(transaction, U1)
payload = version + layers + transaction + b"\x00"
lis = b""
@@ -729,11 +737,11 @@ def config_set(layers: int, transaction: int, cfgData: list) -> object:
(key, att) = cfgname2key(key) # lookup keyID & attribute type
else:
(_, att) = cfgkey2name(key) # lookup attribute type
- keyb = val2bytes(key, ubt.U4)
+ keyb = val2bytes(key, U4)
valb = val2bytes(val, att)
lis = lis + keyb + valb
- return UBXMessage("CFG", "CFG-VALSET", ubt.SET, payload=payload + lis)
+ return UBXMessage("CFG", "CFG-VALSET", SET, payload=payload + lis)
@staticmethod
def config_del(layers: int, transaction: int, keys: list) -> object:
@@ -755,23 +763,23 @@ def config_del(layers: int, transaction: int, keys: list) -> object:
num = len(keys)
if num > 64:
- raise ube.UBXMessageError(
+ raise UBXMessageError(
f"Number of configuration keys {num} exceeds maximum of 64"
)
- version = val2bytes(0 if transaction == 0 else 1, ubt.U1)
- layers = val2bytes(layers, ubt.U1)
- transaction = val2bytes(transaction, ubt.U1)
+ version = val2bytes(0 if transaction == 0 else 1, U1)
+ layers = val2bytes(layers, U1)
+ transaction = val2bytes(transaction, U1)
payload = version + layers + transaction + b"\x00"
lis = b""
for key in keys:
if isinstance(key, str): # if keyname as a string
(key, _) = cfgname2key(key) # lookup keyID
- keyb = val2bytes(key, ubt.U4)
+ keyb = val2bytes(key, U4)
lis = lis + keyb
- return UBXMessage("CFG", "CFG-VALDEL", ubt.SET, payload=payload + lis)
+ return UBXMessage("CFG", "CFG-VALDEL", SET, payload=payload + lis)
@staticmethod
def config_poll(layer: int, position: int, keys: list) -> object:
@@ -793,20 +801,20 @@ def config_poll(layer: int, position: int, keys: list) -> object:
num = len(keys)
if num > 64:
- raise ube.UBXMessageError(
+ raise UBXMessageError(
f"Number of configuration keys {num} exceeds maximum of 64"
)
- version = val2bytes(0, ubt.U1)
- layer = val2bytes(layer, ubt.U1)
- position = val2bytes(position, ubt.U2)
+ version = val2bytes(0, U1)
+ layer = val2bytes(layer, U1)
+ position = val2bytes(position, U2)
payload = version + layer + position
lis = b""
for key in keys:
if isinstance(key, str): # if keyname as a string
(key, _) = cfgname2key(key) # lookup keyID
- keyb = val2bytes(key, ubt.U4)
+ keyb = val2bytes(key, U4)
lis = lis + keyb
- return UBXMessage("CFG", "CFG-VALGET", ubt.POLL, payload=payload + lis)
+ return UBXMessage("CFG", "CFG-VALGET", POLL, payload=payload + lis)
diff --git a/src/pyubx2/ubxreader.py b/src/pyubx2/ubxreader.py
index 07dd042..05be2c1 100644
--- a/src/pyubx2/ubxreader.py
+++ b/src/pyubx2/ubxreader.py
@@ -65,7 +65,6 @@ def __init__(
protfilter: int = NMEA_PROTOCOL | UBX_PROTOCOL | RTCM3_PROTOCOL,
quitonerror: int = ERR_LOG,
parsebitfield: bool = True,
- scaling: bool = True,
labelmsm: int = 1,
bufsize: int = 4096,
parsing: bool = True,
@@ -82,7 +81,6 @@ def __init__(
:param int quitonerror: ERR_IGNORE (0) = ignore errors, ERR_LOG (1) = log continue,
ERR_RAISE (2) = (re)raise (1)
:param bool parsebitfield: 1 = parse bitfields, 0 = leave as bytes (1)
- :param bool scaling: 1 = apply scale factors, 0 = do not apply (1)
:param int labelmsm: RTCM3 MSM label type 1 = RINEX, 2 = BAND (1)
:param int bufsize: socket recv buffer size (4096)
:param bool parsing: True = parse data, False = don't parse data (output raw only) (True)
@@ -100,7 +98,6 @@ def __init__(
self._errorhandler = errorhandler
self._validate = validate
self._parsebf = parsebitfield
- self._scaling = scaling
self._labelmsm = labelmsm
self._msgmode = msgmode
self._parsing = parsing
@@ -236,7 +233,6 @@ def _parse_ubx(self, hdr: bytes) -> tuple:
validate=self._validate,
msgmode=self._msgmode,
parsebitfield=self._parsebf,
- scaling=self._scaling,
)
else:
parsed_data = None
@@ -286,7 +282,6 @@ def _parse_rtcm3(self, hdr: bytes) -> tuple:
parsed_data = RTCMReader.parse(
raw_data,
validate=self._validate,
- scaling=self._scaling,
labelmsm=self._labelmsm,
)
else:
@@ -367,7 +362,6 @@ def parse(
msgmode: int = GET,
validate: int = VALCKSUM,
parsebitfield: bool = True,
- scaling: bool = True,
) -> object:
"""
Parse UBX byte stream to UBXMessage object.
@@ -377,7 +371,6 @@ def parse(
:param int validate: VALCKSUM (1) = Validate checksum,
VALNONE (0) = ignore invalid checksum (1)
:param bool parsebitfield: 1 = parse bitfields, 0 = leave as bytes (1)
- :param bool scaling: 1 = apply scale factors, 0 = do not apply (1)
:return: UBXMessage object
:rtype: UBXMessage
:raises: Exception (if data stream contains invalid data or unknown message type)
@@ -432,5 +425,4 @@ def parse(
msgmode,
payload=payload,
parsebitfield=parsebitfield,
- scaling=scaling,
)
From 2e08d59449de15aa64b9c1b1344607f48f909a83 Mon Sep 17 00:00:00 2001
From: semuadmin <28569967+semuadmin@users.noreply.github.com>
Date: Tue, 21 May 2024 16:00:57 +0100
Subject: [PATCH 3/5] update to v 1.2.42
---
.vscode/settings.json | 2 +-
RELEASE_NOTES.md | 6 ++++++
pyproject.toml | 2 +-
src/pyubx2/_version.py | 2 +-
4 files changed, 9 insertions(+), 3 deletions(-)
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 8207b9a..4d79b44 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -4,5 +4,5 @@
"editor.formatOnSave": true,
"modulename": "${workspaceFolderBasename}",
"distname": "${workspaceFolderBasename}",
- "moduleversion": "1.2.41",
+ "moduleversion": "1.2.42",
}
\ No newline at end of file
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index ad0a326..a9f1c6c 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -1,5 +1,11 @@
# pyubx2 Release Notes
+### RELEASE 1.2.42
+
+ENHANCEMENTS:
+
+1. Internal refactoring to improve performance and exception handling.
+
### RELEASE 1.2.41
ENHANCEMENTS:
diff --git a/pyproject.toml b/pyproject.toml
index e0af81f..cb7e611 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,7 +7,7 @@ name = "pyubx2"
authors = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }]
maintainers = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }]
description = "UBX protocol parser and generator"
-version = "1.2.41"
+version = "1.2.42"
license = { file = "LICENSE" }
readme = "README.md"
requires-python = ">=3.8"
diff --git a/src/pyubx2/_version.py b/src/pyubx2/_version.py
index cb58647..30bc74b 100644
--- a/src/pyubx2/_version.py
+++ b/src/pyubx2/_version.py
@@ -8,4 +8,4 @@
:license: BSD 3-Clause
"""
-__version__ = "1.2.41"
+__version__ = "1.2.42"
From e01b6a7304cc405e0607b7a2774067faab106649 Mon Sep 17 00:00:00 2001
From: semuadmin <28569967+semuadmin@users.noreply.github.com>
Date: Thu, 23 May 2024 13:28:23 +0100
Subject: [PATCH 4/5] update min pyrtcm version to 1.1.1
---
pyproject.toml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pyproject.toml b/pyproject.toml
index cb7e611..f12c532 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -33,7 +33,7 @@ classifiers = [
"Topic :: Scientific/Engineering :: GIS",
]
-dependencies = ["pynmeagps >= 1.0.36", "pyrtcm >= 1.1.0"]
+dependencies = ["pynmeagps >= 1.0.36", "pyrtcm >= 1.1.1"]
[project.urls]
homepage = "https://github.com/semuconsulting/pyubx2"
From 5313883814ebdeffaf6a80802397a9c75977fbe7 Mon Sep 17 00:00:00 2001
From: semuadmin <28569967+semuadmin@users.noreply.github.com>
Date: Fri, 24 May 2024 07:51:39 +0100
Subject: [PATCH 5/5] pylint issue
---
src/pyubx2/ubxmessage.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/pyubx2/ubxmessage.py b/src/pyubx2/ubxmessage.py
index f4a72f0..3daaa0c 100644
--- a/src/pyubx2/ubxmessage.py
+++ b/src/pyubx2/ubxmessage.py
@@ -472,9 +472,9 @@ def _get_dict(self, **kwargs) -> dict:
pdict = UBX_PAYLOADS_GET[self.identity]
return pdict
except KeyError as err:
+ mode = ["GET", "SET", "POLL"][self._mode]
raise UBXMessageError(
- f"Unknown message type {escapeall(self._ubxClass + self._ubxID)}"
- f", mode {["GET", "SET", "POLL"][self._mode]}. "
+ f"Unknown message type {escapeall(self._ubxClass + self._ubxID)}, mode {mode}. "
"Check 'msgmode' setting is appropriate for data stream"
) from err