Skip to content

Commit

Permalink
Improved serial throughput (#117)
Browse files Browse the repository at this point in the history
* more efficient serial reading

* working with example; process is very sloww

* fix integrationn tests

* much faster reads for process simply by reducing time.sleep facepalm

* reduce startup sleep

* misc

* it works

* cleanup

* fix missing timeout

* move data_consumer to end; may raise an exception

* move serial_buf search innside while loop; fixes if the ending is longer than the read buffer

* break up serial buffers into consumed and unconsumed

* Fix between-buffer check

* reduce wait

* reduce wait

* reduce wait

* remove wait
  • Loading branch information
BrianPugh authored Mar 17, 2023
1 parent 122df55 commit 9ac6ddd
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 45 deletions.
1 change: 0 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ jobs:
env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
BELAY_SLEEP_MULTIPLIER: 2.5

steps:
- name: Set OS Environment Variables (Windows)
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,4 @@ profile.json
rp2040js/

.vscode/
*.lprof
17 changes: 9 additions & 8 deletions belay/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,19 +420,20 @@ def __call__(
):
self._cmd_history.append(cmd)

out = None
data_consumer_buffer = []
out = None # Used to store the parsed response object.
data_consumer_buffer = bytearray()

def data_consumer(data):
"""Handle input data stream immediately."""
nonlocal out
if data == b"\x04":
data = data.replace(b"\x04", b"")
if not data:
return
data_consumer_buffer.append(data.decode())
if b"\n" in data:
line = "".join(data_consumer_buffer)
data_consumer_buffer.clear()

data_consumer_buffer.extend(data)
while (i := data_consumer_buffer.find(b"\n")) >= 0:
i += 1
line = data_consumer_buffer[:i].decode()
data_consumer_buffer[:] = data_consumer_buffer[i:]
try:
out = _parse_belay_response(line)
except NotBelayResponseError:
Expand Down
113 changes: 78 additions & 35 deletions belay/pyboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def stdout_write_bytes(b):
stdout.flush()


def _dummy_data_consumer(data):
pass


class PyboardError(Exception):
"""An issue communicating with the board."""

Expand Down Expand Up @@ -185,29 +189,23 @@ def __init__(self, cmd):
)
time.sleep(0.5)

self.buf = b""
self.buf = bytearray()
self.lock = Lock()

def process_output():
assert self.subp.stdout is not None # noqa: S101
while True:
out = self.subp.stdout.read(1)
if out == "" and self.subp.poll() is not None:
break
if out != "":
if out:
with self.lock:
self.buf += out
self.buf.extend(out)
elif self.subp.poll() is not None:
break

thread = Thread(target=process_output)
thread.daemon = True
thread.start()

sleep_multiplier = float(os.environ.get("BELAY_SLEEP_MULTIPLIER", 1.0))
time.sleep(5.0 * sleep_multiplier) # Give process a chance to boot up.
if platform.system() == "Windows":
# Windows needs more time
time.sleep(6.0 * sleep_multiplier)

atexit.register(self.close)

def close(self):
Expand All @@ -216,12 +214,12 @@ def close(self):

def read(self, size=1):
while len(self.buf) < size:
# let the reading thread do its thing.
# yield to the reading threads
time.sleep(0.0001)

with self.lock:
data = self.buf[:size]
self.buf = self.buf[size:]
self.buf[:] = self.buf[size:]

return data

Expand All @@ -231,9 +229,7 @@ def write(self, data):

@property
def in_waiting(self):
if self.buf:
return 1
return 0
return len(self.buf)


class ProcessPtyToTerminal:
Expand Down Expand Up @@ -316,6 +312,8 @@ def __init__(
raise ValueError('"attempts" cannot be 0.')
self.in_raw_repl = False
self.use_raw_paste = True
self._consumed_buf = bytearray()
self._unconsumed_buf = bytearray()
if device.startswith("exec:"):
self.serial = ProcessToSerial(device[len("exec:") :])
elif device.startswith("execpty:"):
Expand Down Expand Up @@ -363,6 +361,9 @@ def close(self):
def read_until(self, ending, timeout=10, data_consumer=None):
"""Read bytes until a specified ending pattern is reached.
warning: in Belay, ``data_consumer`` may raise an exception; so make sure
internal buffers are correct prior to calling ``data_consumer``.
Parameters
----------
data_consumer: Callable
Expand All @@ -371,28 +372,70 @@ def read_until(self, ending, timeout=10, data_consumer=None):
timeout: Union[None, float]
Timeout in seconds.
If None, no timeout.
Returns
-------
data: bytes
Data read up to, and including, ``ending``.
"""
data = self.serial.read(1)
if data_consumer:
data_consumer(data)
timeout_count = 0
while True:
if data.endswith(ending):
break
elif self.serial.in_waiting > 0:
new_data = self.serial.read(1)
if data_consumer:
data_consumer(new_data)
data = data + new_data
timeout_count = 0
else:
timeout_count += 1
if timeout is not None and timeout_count >= 100 * timeout:
if data_consumer is None:
data_consumer = _dummy_data_consumer

if timeout is None:
timeout = float("inf")
deadline = time.time() + timeout

def find(buf):
# slice up to this index
index = buf.find(ending)
if index >= 0:
index += len(ending)
return index

while True: # loop until ``ending`` is found, or timeout
ending_index = find(self._unconsumed_buf)
if ending_index > 0:
data_for_consumer = self._unconsumed_buf[:ending_index]
self._unconsumed_buf[:] = self._unconsumed_buf[ending_index:]
out = self._consumed_buf + data_for_consumer
self._consumed_buf.clear()
data_consumer(data_for_consumer)
return out
elif self._unconsumed_buf:
# consume the unconsumed buffer
og_consumed_buf_len = len(self._consumed_buf)
self._consumed_buf.extend(self._unconsumed_buf)

if (ending_index := find(self._consumed_buf)) > 0:
# The ``ending`` was split across the buffers
out = self._consumed_buf[:ending_index]
self._unconsumed_buf[:] = self._consumed_buf[ending_index:]
data_for_consumer = self._consumed_buf[
og_consumed_buf_len:ending_index
]
self._consumed_buf.clear()
data_consumer(data_for_consumer)
return out

# ``ending`` has still not been found.
data_for_consumer = self._unconsumed_buf.copy()
self._unconsumed_buf.clear()
data_consumer(data_for_consumer)

while not self._unconsumed_buf:
# loop until new data has arrived.
if time.time() > deadline:
raise PyboardError(
f"Timed out reading until {repr(ending)}\n Received: {repr(data)}"
f"Timed out reading until {repr(ending)}\n"
f" Received: {repr(self._consumed_buf)}"
)
time.sleep(0.01)
return data

if not self.serial.in_waiting:
time.sleep(0.001)

if self.serial.in_waiting:
n_bytes = min(2048, self.serial.in_waiting)
self._unconsumed_buf.extend(self.serial.read(n_bytes))

def cancel_running_program(self):
"""Interrupts any running program."""
Expand Down
1 change: 0 additions & 1 deletion examples/10_generators/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def count():
@device.task
def communicate(x):
new_val = yield "Device: " + str(x)
print(new_val)
new_val = yield "Device: " + str(new_val)
new_val = yield "Device: " + str(new_val)

Expand Down

0 comments on commit 9ac6ddd

Please sign in to comment.