From d2465134661a7a05d52f7c6a3fc193f27c0759cb Mon Sep 17 00:00:00 2001 From: Peter Story Date: Wed, 3 Jul 2024 16:28:02 -0400 Subject: [PATCH] Refactor code for pipelines Now we can tell if a pipeline was cancelled. We can't depend on exit codes, since PyInstaller can change them. For #19 --- src/pydiode/gui/common.py | 214 ++++++++++++++++++++----------------- src/pydiode/gui/main.py | 22 ++-- src/pydiode/gui/receive.py | 17 +-- src/pydiode/gui/send.py | 17 +-- 4 files changed, 142 insertions(+), 128 deletions(-) diff --git a/src/pydiode/gui/common.py b/src/pydiode/gui/common.py index 8905d9d..5967c5a 100755 --- a/src/pydiode/gui/common.py +++ b/src/pydiode/gui/common.py @@ -6,84 +6,109 @@ SLEEP = 250 -def get_process_errors(name_popen): - """ - Get a error message describing subprocesses that exited irregularly. - - An error shouldn't be described if: - - The process exited normally (exit code 0) - - The user requested cancellation (KeyboardInterrupt in stderr) - - SIGINT is used when the user presses a "Cancel" button (exit code -2). - SIGTERM is issued when subprocesses are stuck (exit code -15). - However, when subprocesses are run from PyInstaller, all non-regular exits - receive code -1. Thus, we also check stderr. - - :param name_popen: A list of tuples. Each tuple contains the process name - and the subprocess.Popen object. All processes have - terminated. - :returns: A string describing the return code and stderr for subprocesses - that exited irregularly. - """ - error_msgs = [] - for name, popen in name_popen: - trimmed_stderr = popen.stderr.read().decode("utf-8").strip() - # Show errors if: - if popen.returncode != 0 and not trimmed_stderr.endswith( - "KeyboardInterrupt" - ): - error_msg = f'"{name}" exited with code {popen.returncode}' - if trimmed_stderr: - error_msg += f' and stderr "{trimmed_stderr}"' - error_msgs.append(error_msg) - if error_msgs: - error_msgs.insert(0, "Error:") - return "\n".join(error_msgs) - - -def print_premature_errors(name_code): - """ - Print a description of subprocesses that exited prematurely. - - :param name_code: A list of tuples. Each tuple contains the process name - and the return code of the process. Some processes have - terminated, others have not. - """ - returncodes = [code for name, code in name_code] - try: - earliest_running = returncodes.index(None) - for name, code in name_code[(earliest_running + 1) :]: - if code is not None: - print(f'"{name}" exited prematurely.', file=sys.stderr) - except ValueError: - pass - - -def stuck_running(returncodes): - """ - Based on process returncodes, is the pipeline stuck? In a pipeline, earlier - processes should exit first. If a later process exits first, an earlier - process's STDOUT cannot be consumed, so it will never exit. Thus, the - pipeline is stuck if a running process comes before an exited process. - - :param: A list of returncodes from a process pipeline. A None returncode - indicates that a process is still running. A numeric returncode - indicates that a process exited. - :return: A boolean, indicating if the pipeline is stuck. - """ - try: - # The index of the earliest still-running process in the pipeline - earliest_running = returncodes.index(None) - # Have any subsequent processes already exited? - return any(c is not None for c in returncodes[(earliest_running + 1) :]) - except ValueError: - # None won't be found if all the processes have exited. - # If so, the pipeline isn't stuck. - return False +class ProcessPipeline: + + def __init__(self): + self.names = [] + self.popens = [] + # Did the user cancel the pipeline? + self.cancelled = False + + def append(self, name, popen): + self.names.append(name) + self.popens.append(popen) + + def send_signal(self, s): + if s == signal.SIGINT: + self.cancelled = True + for popen in self.popens: + popen.send_signal(s) + + def clear(self): + for popen in self.popens: + if popen.stdout: + popen.stdout.close() + if popen.stderr: + popen.stderr.close() + self.names = [] + self.popens = [] + self.cancelled = False + + def poll(self): + for popen in self.popens: + # If a process has terminated, set returncode. + # If a process is still running, returncode will be None. + popen.poll() + + def still_running(self): + return any(popen.returncode is None for popen in self.popens) + + def _returncodes(self): + return [popen.returncode for popen in self.popens] + + def stuck_running(self): + """ + Based on process returncodes, is the pipeline stuck? In a pipeline, + earlier processes should exit first. If a later process exits first, an + earlier process's STDOUT cannot be consumed, so it will never exit. + Thus, the pipeline is stuck if a running process comes before an exited + process. + + :return: A boolean, indicating if the pipeline is stuck. + """ + try: + # The index of the earliest still-running process in the pipeline + earliest_running = self._returncodes().index(None) + # Have any subsequent processes already exited? + return any( + c is not None + for c in self._returncodes()[(earliest_running + 1) :] + ) + except ValueError: + # None won't be found if all the processes have exited. + # If so, the pipeline isn't stuck. + return False + + def print_premature_errors(self): + """ + Print a description of subprocesses that exited prematurely. + """ + try: + earliest_running = self._returncodes().index(None) + name_code = list(zip(self.names, self._returncodes())) + for name, code in name_code[(earliest_running + 1) :]: + if code is not None: + print(f'"{name}" exited prematurely.', file=sys.stderr) + except ValueError: + pass + + def get_process_errors(self): + """ + Get a error message describing subprocesses that exited irregularly. + + Describe an error if: + - The pipeline wasn't cancelled + - The process exited abnormally (non-zero exit code) + + :returns: A string describing the return code and stderr for + subprocesses that exited irregularly. + """ + error_msgs = [] + for name, popen in zip(self.names, self.popens): + trimmed_stderr = popen.stderr.read().decode("utf-8").strip() + # Show errors if: + if not self.cancelled and popen.returncode != 0: + error_msg = f'"{name}" exited with code {popen.returncode}' + if trimmed_stderr: + error_msg += f' and stderr "{trimmed_stderr}"' + error_msgs.append(error_msg) + if error_msgs: + error_msgs.insert(0, "Error:") + return "\n".join(error_msgs) def check_subprocesses( - widget, cancelled, processes, on_exit=None, cancel_signal=signal.SIGINT + widget, cancelled, pipeline, on_exit=None, cancel_signal=signal.SIGINT ): """ Check whether all the subprocesses have exited. If so, display their error @@ -91,8 +116,7 @@ def check_subprocesses( :param widget: Used to schedule another check :param cancelled: Boolean variable indicating cancellation request - :param processes: An array of tuples, each containing a subprocess's name - and its popen object. + :param pipeline: A ProcessPipeline containing subprocess details :param on_exit: Function to call after all subprocesses have exited. Do not call the function if the subprocesses exited with a non-zero exit code, due to cancellation, or due to getting @@ -104,8 +128,7 @@ def check_subprocesses( # If requested, cancel subprocesses if cancelled.get(): # Signal each process to exit - for name, popen in processes: - popen.send_signal(cancel_signal) + pipeline.send_signal(cancel_signal) # Mark this cancellation request as handled cancelled.set(False) # Don't call on_exit if the user requested cancellation @@ -114,17 +137,15 @@ def check_subprocesses( widget.after( SLEEP, lambda: check_subprocesses( - widget, cancelled, processes, on_exit=on_exit + widget, cancelled, pipeline, on_exit=on_exit ), ) else: - # Get returncodes for exited processes, None for running processes - returncodes = [popen.poll() for name, popen in processes] - # Are any of the subprocesses still running? - still_running = any(code is None for code in returncodes) + # Check the status of subprocesses, updating returncodes + pipeline.poll() # If subprocesses are stuck - if stuck_running(returncodes): + if pipeline.stuck_running(): # Request cancellation cancelled.set(True) widget.after( @@ -132,37 +153,30 @@ def check_subprocesses( lambda: check_subprocesses( widget, cancelled, - processes, + pipeline, on_exit=on_exit, cancel_signal=signal.SIGTERM, ), ) # Describe the issue - process_names = [name for name, popen in processes] - print_premature_errors(list(zip(process_names, returncodes))) + pipeline.print_premature_errors() # If subprocesses are still running, keep waiting for them - elif still_running: + elif pipeline.still_running(): widget.after( SLEEP, lambda: check_subprocesses( - widget, cancelled, processes, on_exit=on_exit + widget, cancelled, pipeline, on_exit=on_exit ), ) # Otherwise, all subprocesses have exited else: # If any subprocesses exited irregularly, describe the issue - error_msgs = get_process_errors(processes) + error_msgs = pipeline.get_process_errors() if error_msgs: showerror(title="Error", message=error_msgs) - # Clean up - for name, popen in processes: - if popen.stdout: - popen.stdout.close() - if popen.stderr: - popen.stderr.close() - # The array of subprocesses should be cleared, so it doesn't grow - # each time more subprocesses are started - processes.clear() + # Clear the pipeline, so it doesn't grow as more subprocesses are + # started + pipeline.clear() # Call the on_exit() function, if it was provided if on_exit and not error_msgs: on_exit() diff --git a/src/pydiode/gui/main.py b/src/pydiode/gui/main.py index 5f1c06d..42f1a0a 100644 --- a/src/pydiode/gui/main.py +++ b/src/pydiode/gui/main.py @@ -1,5 +1,6 @@ import configparser from pathlib import Path +import signal import sys from tkinter import BooleanVar, IntVar, Listbox, StringVar, Tk, ttk @@ -7,8 +8,8 @@ receive_or_cancel, receive_test, set_target_directory, - RECEIVE_PROCESSES, - RECEIVE_TEST_PROCESSES, + RECEIVE_PIPELINE, + RECEIVE_TEST_PIPELINE, SavedWindow, ) from pydiode.gui.send import ( @@ -17,8 +18,8 @@ send_or_cancel, send_test, update_tx_btn, - SEND_PROCESSES, - SEND_TEST_PROCESSES, + SEND_PIPELINE, + SEND_TEST_PIPELINE, ) import pydiode.pydiode import pydiode.tar @@ -229,14 +230,11 @@ def gui_main(): # Start handling user input root.mainloop() - # Cancel send and receive subprocesses - for name, popen in ( - SEND_PROCESSES - + SEND_TEST_PROCESSES - + RECEIVE_PROCESSES - + RECEIVE_TEST_PROCESSES - ): - popen.terminate() + # Terminate send and receive subprocesses + SEND_PIPELINE.send_signal(signal.SIGTERM) + SEND_TEST_PIPELINE.send_signal(signal.SIGTERM) + RECEIVE_PIPELINE.send_signal(signal.SIGTERM) + RECEIVE_TEST_PIPELINE.send_signal(signal.SIGTERM) # Save settings config["pydiode"] = { diff --git a/src/pydiode/gui/receive.py b/src/pydiode/gui/receive.py index a22296a..ec2d096 100644 --- a/src/pydiode/gui/receive.py +++ b/src/pydiode/gui/receive.py @@ -6,11 +6,11 @@ from tkinter.filedialog import askdirectory from tkinter.messagebox import showinfo -from pydiode.gui.common import check_subprocesses, SLEEP +from pydiode.gui.common import check_subprocesses, ProcessPipeline, SLEEP -# Arrays of tuples, each containing a subprocess's name and its popen object -RECEIVE_PROCESSES = [] -RECEIVE_TEST_PROCESSES = [] +# Information about our subprocesses +RECEIVE_PIPELINE = ProcessPipeline() +RECEIVE_TEST_PIPELINE = ProcessPipeline() def set_target_directory(target): @@ -207,9 +207,10 @@ def animate(): stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - RECEIVE_PROCESSES.extend([("pydiode", pydiode), ("tar", tar)]) + RECEIVE_PIPELINE.append("pydiode", pydiode) + RECEIVE_PIPELINE.append("tar", tar) - check_subprocesses(root, cancelled, RECEIVE_PROCESSES, on_exit=repeat) + check_subprocesses(root, cancelled, RECEIVE_PIPELINE, on_exit=repeat) animate() @@ -243,7 +244,7 @@ def update_button(): stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) - RECEIVE_TEST_PROCESSES.extend([("pydiode", pydiode)]) + RECEIVE_TEST_PIPELINE.append("pydiode", pydiode) - check_subprocesses(root, cancelled, RECEIVE_TEST_PROCESSES) + check_subprocesses(root, cancelled, RECEIVE_TEST_PIPELINE) update_button() diff --git a/src/pydiode/gui/send.py b/src/pydiode/gui/send.py index b759619..7ada491 100644 --- a/src/pydiode/gui/send.py +++ b/src/pydiode/gui/send.py @@ -4,7 +4,7 @@ import sys from tkinter.filedialog import askopenfilenames -from pydiode.gui.common import check_subprocesses, SLEEP +from pydiode.gui.common import check_subprocesses, ProcessPipeline, SLEEP # Number of bits in a byte BYTE = 8 @@ -18,9 +18,9 @@ INCREMENT_INTERVAL = 25 # Test message TEST_MESSAGE = b"Testing pydiode" -# Arrays of tuples, each containing a subprocess's name and its popen object -SEND_PROCESSES = [] -SEND_TEST_PROCESSES = [] +# Information about our subprocesses +SEND_PIPELINE = ProcessPipeline() +SEND_TEST_PIPELINE = ProcessPipeline() def add_source_files(sources_var, sources_list): @@ -137,8 +137,9 @@ def send_or_cancel( stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - SEND_PROCESSES.extend([("tar", tar), ("pydiode", pydiode)]) - check_subprocesses(root, cancelled, SEND_PROCESSES) + SEND_PIPELINE.append("tar", tar) + SEND_PIPELINE.append("pydiode", pydiode) + check_subprocesses(root, cancelled, SEND_PIPELINE) increment_size = get_increment_size(sources_list, progress_bar) @@ -195,5 +196,5 @@ def send_test( ) pydiode.stdin.write(TEST_MESSAGE) pydiode.stdin.close() - SEND_TEST_PROCESSES.extend([("pydiode", pydiode)]) - check_subprocesses(root, cancelled, SEND_TEST_PROCESSES) + SEND_TEST_PIPELINE.append("pydiode", pydiode) + check_subprocesses(root, cancelled, SEND_TEST_PIPELINE)