-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
serial_terminal.py
281 lines (226 loc) · 9.87 KB
/
serial_terminal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#!/usr/bin/env python3
"""
serial_terminal.py
Gabriel Staples
Originally Written: 14 Nov. 2018
References:
- https://pyserial.readthedocs.io/en/latest/pyserial_api.html
- *****https://www.tutorialspoint.com/python/python_multithreading.htm
- *****https://en.wikibooks.org/wiki/Python_Programming/Threading
- https://stackoverflow.com/questions/1607612/python-how-do-i-make-a-subclass-from-a-superclass
- https://docs.python.org/3/library/queue.html
- https://docs.python.org/3.7/library/threading.html
- https://docs.python.org/3/library/enum.html
To install PySerial: `sudo python3 -m pip install pyserial`
To run this program: `python3 serial_terminal.py`
"""
# Internal Modules
import user_config as config
# External Modules
import queue
import threading
import time
import serial
import datetime
import sys
import enum
import os
import inspect
# Global variables & "constants"
TERMINAL_PROMPT_STR = "terminal> "
TP_SPACES = ' '*len(TERMINAL_PROMPT_STR) # Terminal Prompt spaces string
user_config_path = 'unk'
# Copied in from user configuration file
REAL_SERIAL = config.REAL_SERIAL
LOGGING_ON = config.LOGGING_ON
LOG_FOLDER = config.LOG_FOLDER
EXIT_COMMAND = config.EXIT_COMMAND
port = config.port
baudrate = config.baudrate
def print2(*args_tuple, **kwargs_dict):
"""
Print from terminal
A print() wrapper to append a short string in front of prints coming from this program itself.
This helps distinguish data being received over serial from data being printed by this program's internals.
"""
# Append TERMINAL_PROMPT_STR to front of first element in tuple, rebuilding the tuple
if (len(args_tuple) > 1):
args_tuple = (TERMINAL_PROMPT_STR + args_tuple[0], args_tuple[1:])
else:
args_tuple = (TERMINAL_PROMPT_STR + args_tuple[0],)
print(*args_tuple, **kwargs_dict)
def read_kbd_input(inputQueue, threadEvent):
global EXIT_COMMAND
# Wait here until the other thread calls "threadEvent.set()"
threadEvent.wait()
threadEvent.clear()
print2('Ready for keyboard input. To exit the serial terminal, type "{}".'.format(EXIT_COMMAND))
while (True):
# Receive keyboard input from user.
input_str = input()
# Enqueue this input string.
# Note: Lock not required here since we are only calling a single Queue method, not a sequence of them
# which would otherwise need to be treated as one atomic operation.
inputQueue.put(input_str)
def main():
global EXIT_COMMAND
global LOG_FOLDER
global user_config_path
TERMINATING_CHARS = '\r' # For terminating serial output
# Open serial port
# Note: The port is immediately opened on object creation when a port is given. See:
# https://pyserial.readthedocs.io/en/latest/pyserial_api.html.
if (REAL_SERIAL == False):
print2("SIMULATED SERIAL: ")
print2(('Opening serial port using PySerial.\n' +
TP_SPACES + 'PySerial serial.Version = {}\n' +
TP_SPACES + 'port = "{}"\n' +
TP_SPACES + 'baudrate = {}'
).format(serial.VERSION, port, baudrate))
if (REAL_SERIAL == True):
ser = serial.Serial(
port = port,
baudrate = baudrate,
parity = serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
)
# queueLock = threading.Lock() # To enforce atomic access to a chunk of multiple queue method calls in a row
#Keyboard input queue
inputQueue = queue.Queue()
# For synchronizing threads.
threadEvent = threading.Event()
# Create & start a thread to read keyboard input.
# Set daemon to True to auto-kill this thread when all other non-daemonic threads are exited. This is desired since
# this thread has no cleanup to do, which would otherwise require a more graceful approach to clean up then exit.
inputThread = threading.Thread(target=read_kbd_input, args=(inputQueue, threadEvent), daemon=True)
inputThread.start()
# File logging
if (LOGGING_ON == True):
# Ensure the log folder exists; this is the same as `mkdir -p "$LOG_FOLDER"` in Bash.
os.makedirs(LOG_FOLDER, exist_ok=True)
# Get a filename, in desired format.
# See: https://stackoverflow.com/a/32490661/4561887 and http://strftime.org/
filename = datetime.datetime.today().strftime('%Y%m%d-%H%Mhrs%Ssec_serialdata.txt')
path = LOG_FOLDER + filename
file = open(path, "w")
print2(('Logging all incoming serial messages to\n' +
TP_SPACES + '"{}".').format(path))
# Don't let the inputThread continue until we are ready to start the main loop. Let it continue now.
threadEvent.set()
# main loop
while (True):
# Read incoming serial data
if (REAL_SERIAL == True):
if (ser.inWaiting() > 0):
data_bytes = ser.read(ser.inWaiting())
# Print as ascii-decoded data:
if config.PRINT_FORMAT == "ASCII":
try:
data_str = data_bytes.decode('ascii')
except UnicodeDecodeError as e:
data_str = ""
# For what is inside the `e` exception object, see:
# 1. https://docs.python.org/3/library/exceptions.html#UnicodeDecodeError
# 2. https://docs.python.org/3/library/exceptions.html#UnicodeError
print2("Error: UnicodeDecodeError: the bytes could not be decoded as " +
"ASCII.")
print2(f" Error: {e}")
print2(f" Invalid bytes are: {data_bytes[e.start:e.end]}")
print2(f" encoding: {e.encoding}")
print2(f" reason: {e.reason}")
print2(f" object: {e.object}")
print2(f" start: {e.start}")
print2(f" end: {e.end}")
if config.REPLACE_BACKLASH_r_n:
data_str = data_str.replace('\r\n', '\n')
print(data_str, end='')
# OR: print as binary data that has been converted to a string-representable format
# (ex: make \n and \r printable):
elif config.PRINT_FORMAT == "REPR":
data_str = repr(data_bytes)
print(data_str)
if LOGGING_ON:
file.write(data_str)
file.flush() # Force immediate write to file instead of buffering
# Read keyboard inputs
# Note: if this queue were being read in multiple places we would need to use locks to ensure multi-method-call
# atomic access. Since this is the only place we can remove from the queue, however, no locks are required.
if (inputQueue.qsize() > 0):
input_str = inputQueue.get()
# print2("input_str = {}".format(input_str))
if (input_str == EXIT_COMMAND):
print2("Exiting serial terminal.")
break
# TODO: add the ability to read in arrow keys (ex: up arrow to show the last command)
# This may take a bit of effort, as the below code does not work.
# elif (input_str == "^[[A"):
# print2("You pressed Up.")
if (REAL_SERIAL == True):
input_str += TERMINATING_CHARS
input_str_encoded = input_str.encode('ascii')
ser.write(input_str_encoded)
# Sleep for a short time to prevent this thread from sucking up all of your CPU resources on your PC.
time.sleep(0.01)
# Cleanup before quitting
if (REAL_SERIAL == True):
ser.close()
if (LOGGING_ON == True):
file.close()
print2("End.")
class ParseArgsErr(enum.Enum):
OK = 0
EXIT = 1
def parseArgs():
global port
global baudrate
global user_config_path
parseArgsErr = ParseArgsErr.OK
# Obtain location of the user configuration path so that the user knows where it is to modify it.
# Source: Retrieving python module path: https://stackoverflow.com/a/12154601/4561887
user_config_path = inspect.getfile(config)
print2('Using User config file path: \n' +
TP_SPACES + '"{}".'.format(user_config_path))
# Interpret incoming arguments. Note that sys.argv[0] is the python filename itself.
# Ex. command: `python3 this_filename.py /dev/ttyUSB1 115200`
# len(sys.argv) = 3
# sys.argv[0] = "this_filename.py"
# sys.argv[1] = "/dev/ttyUSB1"
# sys.argv[2] = "115200"
argsLen = len(sys.argv)
maxArgsLen = 3
# FOR DEBUGGING
# # print arguments
# print("argsLen = " + str(argsLen))
# print("Arguments list:")
# for i in range(len(sys.argv)):
# print("sys.argv[" + str(i) + "] = " + str(sys.argv[i]))
# print()
# help_str = ''
# Too many args
if (argsLen > maxArgsLen):
print("Error: too many arguments.");
elif (argsLen > 1):
# Read in the 2nd argument
# 'h' or '-h'
if (sys.argv[1] == 'h' or sys.argv[1] == '-h'):
print('Command syntax: `serial_terminal (optional)<serial_port> (optional)<baudrate>`\n'
'Examples:\n'
' `serial_terminal`\n'
' `serial_terminal /dev/ttyUSB1`\n'
' `serial_terminal /dev/ttyUSB1 115200`')
parseArgsErr = ParseArgsErr.EXIT
# <serial_port>
else:
port = sys.argv[1]
# print('port = "{}"'.format(port))
if (argsLen > 2):
# Read in 3rd argument
# <baudrate>
baudrate = int(sys.argv[2])
# print('baudrate = {}'.format(baudrate))
return parseArgsErr
if (__name__ == '__main__'):
parseArgsErr = parseArgs()
if (parseArgsErr == ParseArgsErr.OK):
main()