forked from SpectacularAI/u-blox-capture
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ubx_stdout.py
92 lines (79 loc) · 3.51 KB
/
ubx_stdout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import argparse
import json
from serial import Serial
from ubxtranslator.ubxtranslator.core import Parser
from ubxtranslator.ubxtranslator.predefined import NAV_CLS
from pandas import Timestamp, Timedelta # Use pandas time objects for nanosecond precision
import os
from gps_converter import buildMeasurement
from ubx_logger import parseUBX
import sys
import time
parser = argparse.ArgumentParser(description="Print UBX-NAV-* solution to stdout")
parser.add_argument("device", help="Serial device")
parser.add_argument("-b", help="Baudrate", type=int, default=460800)
parser.add_argument("-v", help="Verbose, prints errors", action="store_true")
parser.add_argument("--json", help="Output in JSON format", action="store_true")
parser.add_argument("--fixStatus", help="Includes the RTK solution's state", action="store_true")
parser.add_argument("--incomplete", help="Allow printing incomplete solutions", action="store_true")
carrSolnDict = {0: "None", 1: "Float", 2: "Fix"} # carrSoln is defined as 0=None, 1=Float, 2=Fix
# http://docs.ros.org/en/noetic/api/ublox_msgs/html/msg/NavPVT.html
def outputSolution(solution, asJson = False):
measurement = buildMeasurement(solution)
if not measurement: return
fixStatus = carrSolnDict[measurement["fixStatus"]["carrSoln"]] # We are only interested in the 'carrSoln'
outputStr = ""
if asJson:
outputStr = json.dumps({
"latitude": measurement["lat"],
"longitude": measurement["lon"],
"altitude": measurement["altitude"],
"monotonicTime": measurement["time"],
"accuracy": measurement["accuracy"],
"verticalAccuracy": measurement["verticalAccuracy"]})
if args.fixStatus: outputStr = outputStr[:-1]; outputStr += ', "fixSolution": "%s"}' % (fixStatus) # append fixStatus to str 'outputStr' by removing the '}' from the tail before appending.
else:
arr = [
# measurement["time"],
measurement["lat"],
measurement["lon"],
measurement["altitude"],
measurement["verticalAccuracy"],
measurement["accuracy"],]
if args.fixStatus: arr.append(fixStatus)
outputStr = ' '.join(str(val) for val in arr)
print(outputStr)
sys.stdout.flush()
def run(args):
device = Serial(args.device, args.b, timeout=10)
parser = Parser([NAV_CLS])
aList = []
currentiTOW = -1
currentSolution = {}
try:
while not aList:
try:
msg, msg_name, payload = parser.receive_from(device)
monoTime = time.monotonic()
raw = parseUBX(payload)
raw["monoTime"] = monoTime
# Start new solution
if raw["iTOW"] != currentiTOW:
if args.incomplete and currentSolution: # Output partial solution
outputSolution(currentSolution, args.json)
currentSolution = {}
# Add payload to current solution
currentSolution[msg_name] = raw
currentiTOW = raw["iTOW"]
# All three desired payloads received, output them
if all (k in currentSolution for k in ("HPPOSLLH","TIMEUTC","PVT")):
outputSolution(currentSolution, args.json)
currentSolution = {}
except (ValueError, IOError) as err:
if args.v:
print(err)
finally:
device.close()
if __name__ == "__main__":
args = parser.parse_args()
run(args)