Skip to content

Commit

Permalink
Handle Cancellation Properly When Using PyInstaller (#20)
Browse files Browse the repository at this point in the history
Refactor subprocess code by introducing ProcessPipeline class.

Now we can tell if a pipeline was cancelled by the user. We can't
depend on exit codes, since PyInstaller can replace non-zero
exit codes with -1.

For #19
  • Loading branch information
peterstory authored Jul 8, 2024
1 parent 63a54d3 commit 773aa63
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 129 deletions.
215 changes: 126 additions & 89 deletions src/pydiode/gui/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,87 +6,134 @@
SLEEP = 250


def get_process_errors(name_popen):
"""
Get a error message describing subprocesses that exited irregularly.
class ProcessPipeline:

:param name_popen: A list of tuples. Each tuple contains the process name
and the subprocess.Popen object. All processes have
terminated.
:returns: A string describing the return code and stderr for subprocesses
that exited irregularly.
"""
error_msgs = []
for name, popen in name_popen:
# Ignore:
# -2: SIGINT, possibly from user-initiated cancellation.
# 0: normal exit.
#
# Show all other exit codes, including:
# -15: SIGTERM, possibly from stuck subprocesses.
if popen.returncode not in {-2, 0}:
trimmed_stderr = popen.stderr.read().decode("utf-8").strip()
error_msg = f'"{name}" exited with code {popen.returncode}'
if trimmed_stderr:
error_msg += f' and stderr "{trimmed_stderr}"'
error_msgs.append(error_msg)
if error_msgs:
error_msgs.insert(0, "Error:")
return "\n".join(error_msgs)
def __init__(self):
self.names = []
self.popens = []
# Did the user cancel the pipeline?
self.cancelled = False

def append(self, name, popen):
self.names.append(name)
self.popens.append(popen)

def print_premature_errors(name_code):
"""
Print a description of subprocesses that exited prematurely.
def send_signal(self, s):
"""
:param s: Send this signal to each process in the pipeline.
"""
if s == signal.SIGINT:
self.cancelled = True
for popen in self.popens:
popen.send_signal(s)

:param name_code: A list of tuples. Each tuple contains the process name
and the return code of the process. Some processes have
terminated, others have not.
"""
returncodes = [code for name, code in name_code]
try:
earliest_running = returncodes.index(None)
for name, code in name_code[(earliest_running + 1) :]:
if code is not None:
print(f'"{name}" exited prematurely.', file=sys.stderr)
except ValueError:
pass
def clear(self):
"""
Clear the pipeline, so it doesn't grow as more subprocesses are
started. This method should be called after all processes have exited.
"""
for popen in self.popens:
if popen.stdout:
popen.stdout.close()
if popen.stderr:
popen.stderr.close()
self.names = []
self.popens = []
self.cancelled = False

def poll(self):
"""
poll each process for its returncode.
"""
for popen in self.popens:
# If a process has terminated, set returncode.
# If a process is still running, returncode will be None.
popen.poll()

def stuck_running(returncodes):
"""
Based on process returncodes, is the pipeline stuck? In a pipeline, earlier
processes should exit first. If a later process exits first, an earlier
process's STDOUT cannot be consumed, so it will never exit. Thus, the
pipeline is stuck if a running process comes before an exited process.
:param: A list of returncodes from a process pipeline. A None returncode
indicates that a process is still running. A numeric returncode
indicates that a process exited.
:return: A boolean, indicating if the pipeline is stuck.
"""
try:
# The index of the earliest still-running process in the pipeline
earliest_running = returncodes.index(None)
# Have any subsequent processes already exited?
return any(c is not None for c in returncodes[(earliest_running + 1) :])
except ValueError:
# None won't be found if all the processes have exited.
# If so, the pipeline isn't stuck.
return False
def still_running(self):
"""
:returns: True if at least one process is still running,
False if all processes have exited.
"""
return any(popen.returncode is None for popen in self.popens)

def _returncodes(self):
"""
:returns: A list of all processes' returncodes.
"""
return [popen.returncode for popen in self.popens]

def stuck_running(self):
"""
Based on process returncodes, is the pipeline stuck? In a pipeline,
earlier processes should exit first. If a later process exits first, an
earlier process's STDOUT cannot be consumed, so it will never exit.
Thus, the pipeline is stuck if a running process comes before an exited
process.
:return: A boolean, indicating if the pipeline is stuck.
"""
try:
# The index of the earliest still-running process in the pipeline
earliest_running = self._returncodes().index(None)
# Have any subsequent processes already exited?
return any(
c is not None
for c in self._returncodes()[(earliest_running + 1) :]
)
except ValueError:
# None won't be found if all the processes have exited.
# If so, the pipeline isn't stuck.
return False

def print_premature_errors(self):
"""
Print a description of subprocesses that exited prematurely.
"""
try:
earliest_running = self._returncodes().index(None)
name_code = list(zip(self.names, self._returncodes()))
for name, code in name_code[(earliest_running + 1) :]:
if code is not None:
print(f'"{name}" exited prematurely.', file=sys.stderr)
except ValueError:
pass

def get_process_errors(self):
"""
Get an error message describing subprocesses that exited irregularly.
Describe an error if:
- The pipeline wasn't cancelled
- The process exited abnormally (non-zero exit code)
:returns: A string describing the return code and stderr for
subprocesses that exited irregularly.
"""
error_msgs = []
for name, popen in zip(self.names, self.popens):
trimmed_stderr = popen.stderr.read().decode("utf-8").strip()
# Show errors if:
if not self.cancelled and popen.returncode != 0:
error_msg = f'"{name}" exited with code {popen.returncode}'
if trimmed_stderr:
error_msg += f' and stderr "{trimmed_stderr}"'
error_msgs.append(error_msg)
if error_msgs:
error_msgs.insert(0, "Error:")
return "\n".join(error_msgs)


def check_subprocesses(
widget, cancelled, processes, on_exit=None, cancel_signal=signal.SIGINT
widget, cancelled, pipeline, on_exit=None, cancel_signal=signal.SIGINT
):
"""
Check whether all the subprocesses have exited. If so, display their error
messages and clean up after them.
:param widget: Used to schedule another check
:param cancelled: Boolean variable indicating cancellation request
:param processes: An array of tuples, each containing a subprocess's name
and its popen object.
:param pipeline: A ProcessPipeline containing subprocess details
:param on_exit: Function to call after all subprocesses have exited. Do
not call the function if the subprocesses exited with a
non-zero exit code, due to cancellation, or due to getting
Expand All @@ -98,65 +145,55 @@ def check_subprocesses(
# If requested, cancel subprocesses
if cancelled.get():
# Signal each process to exit
for name, popen in processes:
popen.send_signal(cancel_signal)
pipeline.send_signal(cancel_signal)
# Mark this cancellation request as handled
cancelled.set(False)
# Don't call on_exit if the user requested cancellation
on_exit = None if cancel_signal == signal.SIGINT else on_exit
on_exit = None if pipeline.cancelled else on_exit
# At the next check, hopefully the processes will have exited
widget.after(
SLEEP,
lambda: check_subprocesses(
widget, cancelled, processes, on_exit=on_exit
widget, cancelled, pipeline, on_exit=on_exit
),
)
else:
# Get returncodes for exited processes, None for running processes
returncodes = [popen.poll() for name, popen in processes]
# Are any of the subprocesses still running?
still_running = any(code is None for code in returncodes)
# Check the status of subprocesses, updating returncodes
pipeline.poll()

# If subprocesses are stuck
if stuck_running(returncodes):
if pipeline.stuck_running():
# Request cancellation
cancelled.set(True)
widget.after(
SLEEP,
lambda: check_subprocesses(
widget,
cancelled,
processes,
pipeline,
on_exit=on_exit,
cancel_signal=signal.SIGTERM,
),
)
# Describe the issue
process_names = [name for name, popen in processes]
print_premature_errors(list(zip(process_names, returncodes)))
pipeline.print_premature_errors()
# If subprocesses are still running, keep waiting for them
elif still_running:
elif pipeline.still_running():
widget.after(
SLEEP,
lambda: check_subprocesses(
widget, cancelled, processes, on_exit=on_exit
widget, cancelled, pipeline, on_exit=on_exit
),
)
# Otherwise, all subprocesses have exited
else:
# If any subprocesses exited irregularly, describe the issue
error_msgs = get_process_errors(processes)
error_msgs = pipeline.get_process_errors()
if error_msgs:
showerror(title="Error", message=error_msgs)
# Clean up
for name, popen in processes:
if popen.stdout:
popen.stdout.close()
if popen.stderr:
popen.stderr.close()
# The array of subprocesses should be cleared, so it doesn't grow
# each time more subprocesses are started
processes.clear()
# Clear the pipeline, so it doesn't grow as more subprocesses are
# started
pipeline.clear()
# Call the on_exit() function, if it was provided
if on_exit and not error_msgs:
on_exit()
22 changes: 10 additions & 12 deletions src/pydiode/gui/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import configparser
from pathlib import Path
import signal
import sys
from tkinter import BooleanVar, IntVar, Listbox, StringVar, Tk, ttk

from pydiode.gui.receive import (
receive_or_cancel,
receive_test,
set_target_directory,
RECEIVE_PROCESSES,
RECEIVE_TEST_PROCESSES,
RECEIVE_PIPELINE,
RECEIVE_TEST_PIPELINE,
SavedWindow,
)
from pydiode.gui.send import (
Expand All @@ -17,8 +18,8 @@
send_or_cancel,
send_test,
update_tx_btn,
SEND_PROCESSES,
SEND_TEST_PROCESSES,
SEND_PIPELINE,
SEND_TEST_PIPELINE,
)
import pydiode.pydiode
import pydiode.tar
Expand Down Expand Up @@ -229,14 +230,11 @@ def gui_main():
# Start handling user input
root.mainloop()

# Cancel send and receive subprocesses
for name, popen in (
SEND_PROCESSES
+ SEND_TEST_PROCESSES
+ RECEIVE_PROCESSES
+ RECEIVE_TEST_PROCESSES
):
popen.terminate()
# Terminate send and receive subprocesses
SEND_PIPELINE.send_signal(signal.SIGTERM)
SEND_TEST_PIPELINE.send_signal(signal.SIGTERM)
RECEIVE_PIPELINE.send_signal(signal.SIGTERM)
RECEIVE_TEST_PIPELINE.send_signal(signal.SIGTERM)

# Save settings
config["pydiode"] = {
Expand Down
17 changes: 9 additions & 8 deletions src/pydiode/gui/receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from tkinter.filedialog import askdirectory
from tkinter.messagebox import showinfo

from pydiode.gui.common import check_subprocesses, SLEEP
from pydiode.gui.common import check_subprocesses, ProcessPipeline, SLEEP

# Arrays of tuples, each containing a subprocess's name and its popen object
RECEIVE_PROCESSES = []
RECEIVE_TEST_PROCESSES = []
# Information about our subprocesses
RECEIVE_PIPELINE = ProcessPipeline()
RECEIVE_TEST_PIPELINE = ProcessPipeline()


def set_target_directory(target):
Expand Down Expand Up @@ -207,9 +207,10 @@ def animate():
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
RECEIVE_PROCESSES.extend([("pydiode", pydiode), ("tar", tar)])
RECEIVE_PIPELINE.append("pydiode", pydiode)
RECEIVE_PIPELINE.append("tar", tar)

check_subprocesses(root, cancelled, RECEIVE_PROCESSES, on_exit=repeat)
check_subprocesses(root, cancelled, RECEIVE_PIPELINE, on_exit=repeat)
animate()


Expand Down Expand Up @@ -243,7 +244,7 @@ def update_button():
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
RECEIVE_TEST_PROCESSES.extend([("pydiode", pydiode)])
RECEIVE_TEST_PIPELINE.append("pydiode", pydiode)

check_subprocesses(root, cancelled, RECEIVE_TEST_PROCESSES)
check_subprocesses(root, cancelled, RECEIVE_TEST_PIPELINE)
update_button()
Loading

0 comments on commit 773aa63

Please sign in to comment.