diff --git a/examples/tigerasi_is_busy.py b/examples/tigerasi_is_busy.py new file mode 100644 index 0000000..f8dbff4 --- /dev/null +++ b/examples/tigerasi_is_busy.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +"""Connects to the Tiger Box, moves some axes, returns to starting pose.""" + +from tigerasi.tiger_controller import TigerController + + +PORT_NAME = "COM4" # a string indicating the port name. +# port name can be left as None on Linux if udev rules were installed. + +print("Connecting to Tiger Controller... ", end=" ", flush=True) +box = TigerController(PORT_NAME) +print("done.") + +# Start moving the x and y axes. +box.move_relative(x=-2000, y=-2000) +axis_moving_states = box.are_axes_moving() +axis_moving_states_subset = box.are_axes_moving('x', 'y') +x_is_moving = box.is_axis_moving('x') +print(f"moving states (all axes): {axis_moving_states}") +print(f"moving states (x and y): {axis_moving_states_subset}") +print(f"x axis moving state: {x_is_moving}") +box.move_relative(x=2000, y=2000) \ No newline at end of file diff --git a/src/tigerasi/tiger_controller.py b/src/tigerasi/tiger_controller.py index 047e1df..9d4d0da 100644 --- a/src/tigerasi/tiger_controller.py +++ b/src/tigerasi/tiger_controller.py @@ -1078,35 +1078,57 @@ def get_ttl_output_state(self, wait: bool = True): return bool(int(reply.lstrip(':A '))) def is_moving(self): - """blocks. True if any axes is moving. False otherwise.""" + """True if any axes is moving. False otherwise. Blocks.""" + return self.are_axes_moving() + + def is_axis_moving(self, axis: str): + """True if the specified axis is moving. False otherwise. Blocks.""" + return next(iter(self.are_axes_moving(axis).items()))[-1] # True or False + + @axis_check() + def are_axes_moving(self, *axes: str): + """Return a dict of booleans, keyed by axis, indicating whether the + specified axes are (True) or are not (False) moving. Defaults to all + lettered axes if none are specified. Blocks. Implements + `RDSTAT `_ + command.""" # Wait at least 20[ms] following the last time we sent a command. # (Handles edge case where the last command was sent with wait=False.) time_since_last_cmd = perf_counter() - self._last_cmd_send_time sleep_time = REPLY_WAIT_TIME_S - time_since_last_cmd if sleep_time > 0: sleep(sleep_time) - # Send the inquiry. Handle: ":A \r\n" and ":A\r\n" - reply = self.send(f"{Cmds.STATUS.value}\r").rstrip().rstrip('\r\n') + if not axes: # Default to all lettered axes if none are specified. + axes = [x for x in self.ordered_axes if not x.isnumeric()] + axes_str = ''.join([f" {x.upper()}?" for x in axes]) + # Send the inquiry. Handle: ":A \r\n" and ":A\r\n" and remove ":A " from reply + reply = self.send(f"{Cmds.RDSTAT.value + axes_str}\r").rstrip().rstrip('\r\n').lstrip(ACK).lstrip() # interpret reply. # Sometimes tigerbox replies with ACK to this cmd instead of B or N. # Re-issue cmd if we received an ACK. if reply == ACK: self.log.warning("Received ':A' when we expected 'N' or 'B'. " "Re-issuing command.") - reply = self.send(f"{Cmds.STATUS.value}\r").rstrip('\r\n').strip() - if reply == "B": - return True - elif reply == "N": - return False - else: - raise RuntimeError(f"Error. Cannot tell if device is moving. " + reply = self.send(f"{Cmds.RDSTAT.value + axes_str}\r").rstrip().rstrip('\r\n').lstrip(ACK).lstrip() + + axis_states = list(reply) + if 'B' not in reply and 'N' not in reply: + raise RuntimeError(f"Error. Cannot tell if axes are moving. " f"Received: '{reply}'") + return {x.upper(): state == 'B' for x, state in zip(axes, axis_states)} def wait(self): """Block until tigerbox is idle.""" while self.is_moving(): pass + # TODO: this needs to be tested + @axis_check() + def wait_on_axis(self, *axes: str): + """Block until specified axis is idle.""" + while True in self.is_axis_moving(*axes).items(): + pass + def clear_incoming_message_queue(self): """Clear input buffer and reset skipped replies.""" self.skipped_replies = 0