Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve checksum efficiency and avoid checking for data #584

Open
wants to merge 1 commit into
base: melodic-devel
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 158 additions & 51 deletions rosserial_python/src/rosserial_python/SerialClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
import struct
import time
from Queue import Queue
import select
import os
import fcntl
import io
#import line_profiler as Profiler

from serial import Serial, SerialException, SerialTimeoutException

Expand Down Expand Up @@ -253,10 +258,31 @@ def listen(self):
self.startSerialClient()
rospy.loginfo("startSerialClient() exited")

def fileno(self):
return self.socket.fileno()

def startSerialClient(self):
client = SerialClient(self)
# profile = Profiler.LineProfiler()
try:
# profile.add_function(SerialClient.run)
# profile.add_function(SerialClient.setupPublisher)
# profile.add_function(SerialClient.setupSubscriber)
# profile.add_function(SerialClient.setupServiceServerPublisher)
# profile.add_function(SerialClient.setupServiceClientPublisher)
# profile.add_function(SerialClient.setupServiceServerSubscriber)
# profile.add_function(SerialClient.setupServiceClientSubscriber)
# profile.add_function(SerialClient.handleParameterRequest)
# profile.add_function(SerialClient.handleLoggingRequest)
# profile.add_function(SerialClient.tryRead)
# profile.add_function(SerialClient._tryRead)

# wrapped_run = profile(lambda: client.run())
# wrapped_run()
# profile.dump_stats('/tmp/serial.profile')
# profile.print_stats()
client.run()

except KeyboardInterrupt:
pass
except RuntimeError:
Expand Down Expand Up @@ -288,22 +314,33 @@ def write(self, data):
totalsent = 0

while totalsent < length:
sent = self.socket.send(data[totalsent:])
try:
sent = self.socket.send(data[totalsent:])
except io.BlockingIOError as e:
sent = e.characters_written
if sent == 0:
# being a quick hack, we don't turn
# on/off poll flags and handle everything
# in a single threaded loop.
# of course, this will make a bad
# situation worse.
time.sleep(0.001)
continue

if sent == 0:
raise RuntimeError("RosSerialServer.write() socket connection broken")
totalsent = totalsent + sent

def read(self, rqsted_length):
"""Read as much as we can up to rqsted_length."""
self.msg = ''
if not self.isConnected:
return self.msg

while len(self.msg) < rqsted_length:
chunk = self.socket.recv(rqsted_length - len(self.msg))
if chunk == '':
raise RuntimeError("RosSerialServer.read() socket connection broken")
self.msg = self.msg + chunk
return self.msg
chunk = self.socket.recv(rqsted_length)
if chunk == '':
raise RuntimeError("RosSerialServer.read() socket connection broken")
return chunk

def inWaiting(self):
try: # the caller checks just for <1, so we'll peek at just one byte
Expand All @@ -321,6 +358,18 @@ class SerialClient(object):
ServiceServer responds to requests from the serial device.
"""

# Named after the Doctor Who Episode
class Blink(object):
"""
Wake up the poll and queue.get and don't let them sleep anymore.
"""
def __init__(parent):
self.parent = parent

def invoke(self):
self.parent._pipe_w.write('Wakie, Wakie'.encode())
self.parent.write_queue.put(self)

def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=False):
""" Initialize node, connect to bus, attempt to negotiate topics. """

Expand All @@ -329,6 +378,7 @@ def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=Fal
self.write_lock = threading.RLock()
self.write_queue = Queue()
self.write_thread = None
self.byte_buffer = bytearray()

self.lastsync = rospy.Time(0)
self.lastsync_lost = rospy.Time(0)
Expand Down Expand Up @@ -424,28 +474,67 @@ def txStopRequest(self, signal, frame):
sys.exit(0)

def tryRead(self, length):
while len(self.byte_buffer) < length:
self.byte_buffer.extend(self._tryRead(length-len(self.byte_buffer), 65536))
b = self.byte_buffer[0:length]
self.byte_buffer = self.byte_buffer[length:]
return b

def _tryRead(self, length, chunk):
try:
read_start = time.time()
bytes_remaining = length
result = bytearray()
while bytes_remaining != 0 and time.time() - read_start < self.timeout:
with self.read_lock:
received = self.port.read(bytes_remaining)

# Block here until there is something to do.
waittime = self.timeout - (time.time() - read_start)
ready = []
if waittime <= 0:
break

# Hacky, but better than busy waiting.
ready = self.poll.poll(waittime * 1000)

if self.port.fileno() not in [ i[0] for i in ready ]:
break

try:
with self.read_lock:
received = self.port.read(max(chunk, bytes_remaining))
except IOError as e:
continue

if len(received) != 0:
#line_profile next target: 511 13812 1297061.0 93.9 10.5
self.last_read = rospy.Time.now()
result.extend(received)
bytes_remaining -= len(received)
if bytes_remaining < 0:
bytes_remaining = 0

if bytes_remaining != 0:
raise IOError("Returned short (expected %d bytes, received %d instead)." % (length, length - bytes_remaining))

return bytes(result)
return result
except Exception as e:
raise IOError("Serial Port read failure: %s" % e)

def run(self):
""" Forward recieved messages to appropriate publisher. """

# countdown = 1000 for profiling

# Set Non-Blocking so are read ahead in try_read won't block.
# Means we could get short writes in theory.
flag = fcntl.fcntl(self.port.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.port.fileno(), fcntl.F_SETFL, flag | os.O_NONBLOCK)

# Get a pipe object used to wake up the sleepers.
# Be careful, these are not initialized elsewhere.
(self._pipe_r, self._pipe_w) = os.pipe()
rospy.on_shutdown(lambda : SerialClient.Blink(self).invoke())

# Launch write thread.
if self.write_thread is None:
self.write_thread = threading.Thread(target=self.processWriteQueue)
Expand All @@ -455,36 +544,56 @@ def run(self):
# Handle reading.
data = ''
read_step = None
while not rospy.is_shutdown():
if (rospy.Time.now() - self.lastsync).to_sec() > (self.timeout * 3):
if self.synced:
rospy.logerr("Lost sync with device, restarting...")
else:
rospy.logerr("Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino")
self.lastsync_lost = rospy.Time.now()
self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_NO_SYNC)
self.requestTopics()
self.lastsync = rospy.Time.now()

self.poll = select.poll()
self.poll.register(self.port, select.POLLIN)
self.poll.register(self._pipe_r, select.POLLIN)

while True:
# for profiling.
# countdown = countdown - 1
# if countdown <= 0:
# return

# if countdown % 100 == 0:
# rospy.logerr("countdown=%d" %countdown)

if len(self.byte_buffer) == 0:
waittime = 3 * self.timeout - (rospy.Time.now() - self.lastsync).to_sec()

ready_fds = []

if waittime > 0:
ready_fds = self.poll.poll(waittime * 1000)

if not ready_fds:
if self.synced:
rospy.logerr("Lost sync with device, restarting...")
else:
rospy.logerr("Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino")
self.lastsync_lost = rospy.Time.now()
self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR,
ERROR_NO_SYNC)
self.requestTopics()
self.lastsync = rospy.Time.now()
continue
elif self._pipe_r in [ i[0] for i in ready_fds ]:
break

# This try-block is here because we make multiple calls to read(). Any one of them can throw
# an IOError if there's a serial problem or timeout. In that scenario, a single handler at the
# bottom attempts to reconfigure the topics.
try:
with self.read_lock:
if self.port.inWaiting() < 1:
time.sleep(0.001)
continue

# Find sync flag.
flag = [0, 0]
read_step = 'syncflag'
flag[0] = self.tryRead(1)
flag[0] = bytes(self.tryRead(1))
if (flag[0] != '\xff'):
continue

# Find protocol version.
read_step = 'protocol'
flag[1] = self.tryRead(1)
flag[1] = bytes(self.tryRead(1))
if flag[1] != self.protocol_ver:
self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_MISMATCHED_PROTOCOL)
rospy.logerr("Mismatched protocol version in packet (%s): lost sync or rosserial_python is from different ros release than the rosserial client" % repr(flag[1]))
Expand All @@ -503,8 +612,8 @@ def run(self):

# Read message length checksum.
read_step = 'message length checksum'
msg_len_chk = self.tryRead(1)
msg_len_checksum = sum(map(ord, msg_len_bytes)) + ord(msg_len_chk)
msg_len_chk = bytes(self.tryRead(1))
msg_len_checksum = sum(msg_len_bytes) + ord(msg_len_chk)

# Validate message length checksum.
if msg_len_checksum % 256 != 255:
Expand All @@ -529,19 +638,18 @@ def run(self):

# Reada checksum for topic id and msg
read_step = 'data checksum'
chk = self.tryRead(1)
checksum = sum(map(ord, topic_id_header) ) + sum(map(ord, msg)) + ord(chk)
chk = bytes(self.tryRead(1))
checksum = sum(topic_id_header) + sum(msg) + ord(chk)

# Validate checksum.
if checksum % 256 == 255:
self.synced = True
self.lastsync_success = rospy.Time.now()
try:
self.callbacks[topic_id](msg)
self.callbacks[topic_id](bytes(msg))
except KeyError:
rospy.logerr("Tried to publish before configured, topic id %d" % topic_id)
self.requestTopics()
rospy.sleep(0.001)
else:
rospy.loginfo("wrong checksum for topic id and msg")

Expand Down Expand Up @@ -772,24 +880,23 @@ def processWriteQueue(self):
Main loop for the thread that processes outgoing data to write to the serial port.
"""
while not rospy.is_shutdown():
if self.write_queue.empty():
time.sleep(0.01)
else:
data = self.write_queue.get()
while True:
try:
if isinstance(data, tuple):
topic, msg = data
self._send(topic, msg)
elif isinstance(data, basestring):
self._write(data)
else:
rospy.logerr("Trying to write invalid data type: %s" % type(data))
break
except SerialTimeoutException as exc:
rospy.logerr('Write timeout: %s' % exc)
time.sleep(1)

data = self.write_queue.get()
while True:
try:
if isinstance(data, tuple):
topic, msg = data
self._send(topic, msg)
elif isinstance(data, basestring):
self._write(data)
elif isinstance (data, SerialClient.Blink):
break # redundant with below, but good to be explicit
else:
rospy.logerr("Trying to write invalid data type: %s" % type(data))
break
except SerialTimeoutException as exc:
rospy.logerr('Write timeout: %s' % exc)
time.sleep(1)

def sendDiagnostics(self, level, msg_text):
msg = diagnostic_msgs.msg.DiagnosticArray()
status = diagnostic_msgs.msg.DiagnosticStatus()
Expand Down