Skip to content

Commit

Permalink
add auto-detection of SET/POLL mode
Browse files Browse the repository at this point in the history
  • Loading branch information
semuadmin committed Feb 14, 2024
1 parent 8da5894 commit 26ef9d3
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 18 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ conda install -c conda-forge pyubx2
| POLL (0x02) | query input *to* the receiver | `ubxtypes_poll.py` |

If you're simply streaming and/or parsing the *output* of a UBX receiver, the mode is implicitly GET. If you want to create
or parse an *input* (command or query) message, you must set the mode parameter to SET or POLL.
or parse an *input* (command or query) message, you must set the mode parameter to SET or POLL. If the parser mode is set to
0x03 (SETPOLL), `pyubx2` will automatically determine the applicable input mode (SET or POLL) based on the message payload.

---
## <a name="reading">Reading (Streaming)</a>
Expand Down
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CHANGES:

1. Add val2sphp helper method to convert high precision (9dp) coordinate to separate standard and high precision components, as required by some CFG and NAV messages.
1. Add utc2itow helper method to convert utc datetime to GPS week number and time of week.
1. Add getinputmode helper to determinate mode of input UBX message (SET or POLL)
1. Add getinputmode helper to determinate mode of input UBX message (SET or POLL). Add new UBXReader msgmode of SETPOLL (0x03), which will automatically determine input mode.

### RELEASE 1.2.37

Expand Down
4 changes: 2 additions & 2 deletions src/pyubx2/ubxmessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def __init__(
self._parsebf = parsebitfield # parsing bitfields Y/N?
self._scaling = scaling # apply scale factors Y/N?

if msgmode not in (0, 1, 2):
raise ube.UBXMessageError(f"Invalid msgmode {msgmode} - must be 0, 1 or 2.")
if msgmode not in (ubt.GET, ubt.SET, ubt.POLL):
raise ube.UBXMessageError(f"Invalid msgmode {msgmode} - must be 0, 1 or 2")

# accommodate different formats of msgClass and msgID
if isinstance(ubxClass, str) and isinstance(
Expand Down
24 changes: 17 additions & 7 deletions src/pyubx2/ubxreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
'protfilter' governs which protocols (NMEA, UBX or RTCM3) are processed
'quitonerror' governs how errors are handled
'msgmode' indicates the type of UBX datastream (input GET, output SET, query POLL)
'msgmode' indicates the type of UBX datastream (output GET, input SET, query POLL)
If msgmode set to SETPOLL, input mode will be automatically detected by parser.
Created on 2 Oct 2020
Expand All @@ -32,14 +34,17 @@
UBXTypeError,
)
from pyubx2.socket_stream import SocketStream
from pyubx2.ubxhelpers import bytes2val, calc_checksum, val2bytes
from pyubx2.ubxhelpers import bytes2val, calc_checksum, getinputmode, val2bytes
from pyubx2.ubxmessage import UBXMessage
from pyubx2.ubxtypes_core import (
ERR_LOG,
ERR_RAISE,
GET,
NMEA_PROTOCOL,
POLL,
RTCM3_PROTOCOL,
SET,
SETPOLL,
U2,
UBX_HDR,
UBX_PROTOCOL,
Expand Down Expand Up @@ -69,7 +74,7 @@ def __init__(
"""Constructor.
:param datastream stream: input data stream
:param int msgmode: 0=GET, 1=SET, 2=POLL (0)
:param int msgmode: 0=GET, 1=SET, 2=POLL, 3=SETPOLL (0)
:param int validate: 0 = ignore invalid checksum, 1 = validate checksum (1)
:param int protfilter: protocol filter 1 = NMEA, 2 = UBX, 4 = RTCM3 (3)
:param int quitonerror: 0 = ignore errors, 1 = log continue, 2 = (re)raise (1)
Expand Down Expand Up @@ -97,9 +102,9 @@ def __init__(
self._msgmode = msgmode
self._parsing = parsing

if self._msgmode not in (0, 1, 2):
if self._msgmode not in (GET, SET, POLL, SETPOLL):
raise UBXStreamError(
f"Invalid stream mode {self._msgmode} - must be 0, 1 or 2"
f"Invalid stream mode {self._msgmode} - must be 0, 1, 2 or 3"
)

def __iter__(self):
Expand Down Expand Up @@ -374,8 +379,10 @@ def parse(
"""
# pylint: disable=too-many-arguments

if msgmode not in (0, 1, 2):
raise UBXParseError(f"Invalid message mode {msgmode} - must be 0, 1 or 2")
if msgmode not in (GET, SET, POLL, SETPOLL):
raise UBXParseError(
f"Invalid message mode {msgmode} - must be 0, 1, 2 or 3"
)

lenm = len(message)
hdr = message[0:2]
Expand Down Expand Up @@ -410,6 +417,9 @@ def parse(
(f"Message checksum {ckm}" f" invalid - should be {ckv}")
)
try:
# if input message (SET or POLL), determine mode automatically
if msgmode == SETPOLL:
msgmode = getinputmode(message) # returns SET or POLL
if payload is None:
return UBXMessage(clsid, msgid, msgmode)
return UBXMessage(
Expand Down
1 change: 1 addition & 0 deletions src/pyubx2/ubxtypes_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
GET = 0
SET = 1
POLL = 2
SETPOLL = 3
VALNONE = 0
VALCKSUM = 1
NMEA_PROTOCOL = 1
Expand Down
12 changes: 6 additions & 6 deletions tests/test_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def tearDown(self):
pass

def testInvMode(self): # test invalid mode
EXPECTED_ERROR = "Invalid msgmode 3 - must be 0, 1 or 2"
EXPECTED_ERROR = "Invalid msgmode 4 - must be 0, 1 or 2"
with self.assertRaisesRegex(UBXMessageError, EXPECTED_ERROR):
UBXMessage("CFG", "CFG-MSG", 3, msgClass=240, msgID=5)
UBXMessage("CFG", "CFG-MSG", 4, msgClass=240, msgID=5)

def testAckCkT(self): # bad checksum
EXPECTED_ERROR = "Message checksum (.*) invalid - should be (.*)"
Expand Down Expand Up @@ -320,10 +320,10 @@ def testFill_CFGVALGET1(
UBXMessage("CFG", "CFG-VALGET", GET, version=0, layer=0)

def testParseMode(self): # test invalid parse message mode
EXPECTED_ERROR = "Invalid message mode 3 - must be 0, 1 or 2"
EXPECTED_ERROR = "Invalid message mode 4 - must be 0, 1, 2 or 3"
with self.assertRaisesRegex(UBXParseError, EXPECTED_ERROR):
UBXReader.parse(
b"\xb5b\x05\x01\x02\x00\x06\x01\x0f\x38", validate=VALCKSUM, msgmode=3
b"\xb5b\x05\x01\x02\x00\x06\x01\x0f\x38", validate=VALCKSUM, msgmode=4
)

def testParseMode2(self): # test parser with incorrect mode for input message
Expand All @@ -332,9 +332,9 @@ def testParseMode2(self): # test parser with incorrect mode for input message
UBXReader.parse(self.mga_ini, validate=VALCKSUM, quitonerror=ERR_RAISE)

def testStreamMode(self): # test invalid stream message mode
EXPECTED_ERROR = "Invalid stream mode 3 - must be 0, 1 or 2"
EXPECTED_ERROR = "Invalid stream mode 4 - must be 0, 1, 2 or 3"
with self.assertRaisesRegex(UBXStreamError, EXPECTED_ERROR):
UBXReader(None, validate=VALCKSUM, msgmode=3)
UBXReader(None, validate=VALCKSUM, msgmode=4)

def testVal2Bytes(self): # test invalid attribute type
EXPECTED_ERROR = "Unknown attribute type Z001"
Expand Down
16 changes: 15 additions & 1 deletion tests/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import unittest

from pyubx2 import UBXMessage, UBXReader, VALCKSUM, VALNONE, SET
from pyubx2 import UBXMessage, UBXReader, VALCKSUM, VALNONE, SET, SETPOLL


class ParseTest(unittest.TestCase):
Expand Down Expand Up @@ -270,6 +270,20 @@ def testMGAINI2(self): # test parser of MGA-INI input message with args
"<UBX(MGA-INI-POS-LLH, type=1, version=0, reserved0=513, lat=6.7305985, lon=6.7305985, alt=67305985, posAcc=67305985)>",
)

def testSETPOLL(self): # test auto detection of SET or POLL mode
msg = UBXMessage.config_poll(0,0,["CFG_UART1_BAUDRATE", 0x40530001]).serialize()
res = UBXReader.parse(msg, msgmode=SETPOLL)
self.assertEqual(
str(res),
"<UBX(CFG-VALGET, version=0, layer=0, position=0, keys_01=1079115777, keys_02=1079181313)>",
)
msg = UBXMessage.config_set(0,0,[("CFG_UART1_BAUDRATE", 9600), (0x40530001, 38400)]).serialize()
res = UBXReader.parse(msg, msgmode=SETPOLL)
self.assertEqual(
str(res),
"<UBX(CFG-VALSET, version=0, ram=0, bbr=0, flash=0, action=0, reserved0=0, CFG_UART1_BAUDRATE=9600, CFG_UART2_BAUDRATE=38400)>",
)

def testESFSTATUS(self): # test parser of ESF-STATUS message
res = UBXReader.parse(self.esf_status)
# print(res)
Expand Down
2 changes: 2 additions & 0 deletions tests/test_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ def testgetinputmode(self):
self.assertEqual(res, SET)
res = getinputmode(UBXMessage("CFG","CFG-INF", POLL, protocolID=1).serialize())
self.assertEqual(res, POLL)
res = getinputmode(UBXMessage("CFG","CFG-INF", SET, protocolID=1, infMsgMask_01=1,infMsgMask_02=1).serialize())
self.assertEqual(res, SET)

if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']
Expand Down

0 comments on commit 26ef9d3

Please sign in to comment.