From e7875553fd6957d2dfc7cfa7e6b2002fe471b5bf Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Fri, 17 Nov 2023 11:58:06 -0500 Subject: [PATCH] Sync --- debug_webserver.py | 19 ++++++++++--------- journal_lock.py | 30 ++++++++++++++++-------------- myNotebook.py | 23 ++++++++++++----------- theme.py | 40 +++++++++++++++++++++------------------- timeout_session.py | 27 ++++++++++++++------------- update.py | 16 +++++++++------- 6 files changed, 82 insertions(+), 73 deletions(-) diff --git a/debug_webserver.py b/debug_webserver.py index b96d420bd8..87f7eaed0e 100644 --- a/debug_webserver.py +++ b/debug_webserver.py @@ -1,4 +1,6 @@ """Simple HTTP listener to be used with debugging various EDMC sends.""" +from __future__ import annotations + import gzip import json import pathlib @@ -6,8 +8,9 @@ import threading import zlib from http import server -from typing import Any, Callable, Literal, Tuple, Union, Dict +from typing import Any, Callable, Literal from urllib.parse import parse_qs + from config import appname from EDMCLogging import get_main_logger @@ -21,9 +24,6 @@ class LoggingHandler(server.BaseHTTPRequestHandler): """HTTP Handler implementation that logs to EDMCs logger and writes data to files on disk.""" - def __init__(self, request, client_address: Tuple[str, int], server) -> None: - super().__init__(request, client_address, server) - def log_message(self, format: str, *args: Any) -> None: """Override default handler logger with EDMC logger.""" logger.info(format % args) @@ -44,7 +44,7 @@ def do_POST(self) -> None: # noqa: N802 # I cant change it elif len(target_path) == 1 and target_path[0] == '/': target_path = 'WEB_ROOT' - response: Union[Callable[[str], str], str, None] = DEFAULT_RESPONSES.get(target_path) + response: Callable[[str], str] | str | None = DEFAULT_RESPONSES.get(target_path) if callable(response): response = response(to_save) @@ -71,11 +71,11 @@ def do_POST(self) -> None: # noqa: N802 # I cant change it logger.warning(f'DATA FOLLOWS\n{data}') return - with output_lock, target_file.open('a') as f: - f.write(to_save + "\n\n") + with output_lock, target_file.open('a') as file: + file.write(to_save + "\n\n") @staticmethod - def get_printable(data: bytes, compression: Union[Literal['deflate'], Literal['gzip'], str, None] = None) -> str: + def get_printable(data: bytes, compression: Literal['deflate'] | Literal['gzip'] | str | None = None) -> str: """ Convert an incoming data stream into a string. @@ -84,6 +84,7 @@ def get_printable(data: bytes, compression: Union[Literal['deflate'], Literal['g :raises ValueError: If compression is unknown :return: printable strings """ + ret: bytes = b'' if compression is None: ret = data @@ -130,7 +131,7 @@ def generate_inara_response(raw_data: str) -> str: return json.dumps(out) -def extract_edsm_data(data: str) -> Dict[str, Any]: +def extract_edsm_data(data: str) -> dict[str, Any]: """Extract relevant data from edsm data.""" res = parse_qs(data) return {name: data[0] for name, data in res.items()} diff --git a/journal_lock.py b/journal_lock.py index 5be029d8e6..4d04992f81 100644 --- a/journal_lock.py +++ b/journal_lock.py @@ -5,13 +5,16 @@ Licensed under the GNU General Public License. See LICENSE file. """ +from __future__ import annotations + import pathlib import sys import tkinter as tk from enum import Enum from os import getpid as os_getpid from tkinter import ttk -from typing import TYPE_CHECKING, Callable, Optional +from typing import TYPE_CHECKING, Callable + from config import config from EDMCLogging import get_main_logger @@ -37,10 +40,10 @@ class JournalLock: def __init__(self) -> None: """Initialise where the journal directory and lock file are.""" - self.journal_dir: Optional[str] = config.get_str('journaldir') or config.default_journal_dir - self.journal_dir_path: Optional[pathlib.Path] = None + self.journal_dir: str | None = config.get_str('journaldir') or config.default_journal_dir + self.journal_dir_path: pathlib.Path | None = None self.set_path_from_journaldir() - self.journal_dir_lockfile_name: Optional[pathlib.Path] = None + self.journal_dir_lockfile_name: pathlib.Path | None = None # We never test truthiness of this, so let it be defined when first assigned. Avoids type hint issues. # self.journal_dir_lockfile: Optional[IO] = None self.locked = False @@ -61,7 +64,6 @@ def open_journal_dir_lockfile(self) -> bool: """Open journal_dir lockfile ready for locking.""" self.journal_dir_lockfile_name = self.journal_dir_path / 'edmc-journal-lock.txt' # type: ignore logger.trace_if('journal-lock', f'journal_dir_lockfile_name = {self.journal_dir_lockfile_name!r}') - self.journal_dir_lockfile = None # Initialize with None try: self.journal_dir_lockfile = open(self.journal_dir_lockfile_name, mode='w+', encoding='utf-8') @@ -105,7 +107,7 @@ def _obtain_lock(self) -> JournalLockResult: import msvcrt try: - msvcrt.locking(self.journal_dir_lockfile.fileno(), msvcrt.LK_NBLCK, 4096) # type: ignore + msvcrt.locking(self.journal_dir_lockfile.fileno(), msvcrt.LK_NBLCK, 4096) except Exception as e: logger.info(f"Exception: Couldn't lock journal directory \"{self.journal_dir}\"" @@ -123,15 +125,15 @@ def _obtain_lock(self) -> JournalLockResult: return JournalLockResult.LOCKED try: - fcntl.flock(self.journal_dir_lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) # type: ignore + fcntl.flock(self.journal_dir_lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) except Exception as e: logger.info(f"Exception: Couldn't lock journal directory \"{self.journal_dir}\", " f"assuming another process running: {e!r}") return JournalLockResult.ALREADY_LOCKED - self.journal_dir_lockfile.write(f"Path: {self.journal_dir}\nPID: {os_getpid()}\n") # type: ignore - self.journal_dir_lockfile.flush() # type: ignore + self.journal_dir_lockfile.write(f"Path: {self.journal_dir}\nPID: {os_getpid()}\n") + self.journal_dir_lockfile.flush() logger.trace_if('journal-lock', 'Done') self.locked = True @@ -156,8 +158,8 @@ def release_lock(self) -> bool: try: # Need to seek to the start first, as lock range is relative to # current position - self.journal_dir_lockfile.seek(0) # type: ignore - msvcrt.locking(self.journal_dir_lockfile.fileno(), msvcrt.LK_UNLCK, 4096) # type: ignore + self.journal_dir_lockfile.seek(0) + msvcrt.locking(self.journal_dir_lockfile.fileno(), msvcrt.LK_UNLCK, 4096) except Exception as e: logger.info(f"Exception: Couldn't unlock journal directory \"{self.journal_dir}\": {e!r}") @@ -175,7 +177,7 @@ def release_lock(self) -> bool: return True # Lie about being unlocked try: - fcntl.flock(self.journal_dir_lockfile, fcntl.LOCK_UN) # type: ignore + fcntl.flock(self.journal_dir_lockfile, fcntl.LOCK_UN) except Exception as e: logger.info(f"Exception: Couldn't unlock journal directory \"{self.journal_dir}\": {e!r}") @@ -183,9 +185,9 @@ def release_lock(self) -> bool: else: unlocked = True - # Close the file whether the unlocking succeeded. + # Close the file whether or not the unlocking succeeded. if hasattr(self, 'journal_dir_lockfile'): - self.journal_dir_lockfile.close() # type: ignore + self.journal_dir_lockfile.close() # Doing this makes it impossible for tests to ensure the file # is removed as a part of cleanup. So don't. diff --git a/myNotebook.py b/myNotebook.py index 499e01597a..6fa3577463 100644 --- a/myNotebook.py +++ b/myNotebook.py @@ -10,10 +10,11 @@ Entire file may be imported by plugins. """ +from __future__ import annotations + import sys import tkinter as tk from tkinter import ttk -from typing import Optional # Can't do this with styles on OSX - http://www.tkdocs.com/tutorial/styles.html#whydifficult if sys.platform == 'darwin': @@ -29,7 +30,7 @@ class Notebook(ttk.Notebook): """Custom ttk.Notebook class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Frame] = None, **kw): + def __init__(self, master: ttk.Frame | None = None, **kw): ttk.Notebook.__init__(self, master, **kw) style = ttk.Style() @@ -58,7 +59,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw): class Frame(sys.platform == 'darwin' and tk.Frame or ttk.Frame): # type: ignore """Custom t(t)k.Frame class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Notebook] = None, **kw): + def __init__(self, master: ttk.Notebook | None = None, **kw): if sys.platform == 'darwin': kw['background'] = kw.pop('background', PAGEBG) tk.Frame.__init__(self, master, **kw) @@ -75,11 +76,11 @@ def __init__(self, master: Optional[ttk.Notebook] = None, **kw): class Label(tk.Label): """Custom tk.Label class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Frame] = None, **kw): + def __init__(self, master: ttk.Frame | None = None, **kw): # This format chosen over `sys.platform in (...)` as mypy and friends dont understand that if sys.platform in ('darwin', 'win32'): - kw['foreground'] = kw.pop('foreground', PAGEFG) # type: ignore - kw['background'] = kw.pop('background', PAGEBG) # type: ignore + kw['foreground'] = kw.pop('foreground', PAGEFG) + kw['background'] = kw.pop('background', PAGEBG) else: kw['foreground'] = kw.pop('foreground', ttk.Style().lookup('TLabel', 'foreground')) kw['background'] = kw.pop('background', ttk.Style().lookup('TLabel', 'background')) @@ -89,7 +90,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw): class Entry(sys.platform == 'darwin' and tk.Entry or ttk.Entry): # type: ignore """Custom t(t)k.Entry class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Frame] = None, **kw): + def __init__(self, master: ttk.Frame | None = None, **kw): if sys.platform == 'darwin': kw['highlightbackground'] = kw.pop('highlightbackground', PAGEBG) tk.Entry.__init__(self, master, **kw) @@ -100,7 +101,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw): class Button(sys.platform == 'darwin' and tk.Button or ttk.Button): # type: ignore """Custom t(t)k.Button class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Frame] = None, **kw): + def __init__(self, master: ttk.Frame | None = None, **kw): if sys.platform == 'darwin': kw['highlightbackground'] = kw.pop('highlightbackground', PAGEBG) tk.Button.__init__(self, master, **kw) @@ -113,7 +114,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw): class ColoredButton(sys.platform == 'darwin' and tk.Label or tk.Button): # type: ignore """Custom t(t)k.ColoredButton class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Frame] = None, **kw): + def __init__(self, master: ttk.Frame | None = None, **kw): if sys.platform == 'darwin': # Can't set Button background on OSX, so use a Label instead kw['relief'] = kw.pop('relief', tk.RAISED) @@ -131,7 +132,7 @@ def _press(self, event): class Checkbutton(sys.platform == 'darwin' and tk.Checkbutton or ttk.Checkbutton): # type: ignore """Custom t(t)k.Checkbutton class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Frame] = None, **kw): + def __init__(self, master: ttk.Frame | None = None, **kw): if sys.platform == 'darwin': kw['foreground'] = kw.pop('foreground', PAGEFG) kw['background'] = kw.pop('background', PAGEBG) @@ -145,7 +146,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw): class Radiobutton(sys.platform == 'darwin' and tk.Radiobutton or ttk.Radiobutton): # type: ignore """Custom t(t)k.Radiobutton class to fix some display issues.""" - def __init__(self, master: Optional[ttk.Frame] = None, **kw): + def __init__(self, master: ttk.Frame | None = None, **kw): if sys.platform == 'darwin': kw['foreground'] = kw.pop('foreground', PAGEFG) kw['background'] = kw.pop('background', PAGEBG) diff --git a/theme.py b/theme.py index 30215e4459..bfb76c55d2 100644 --- a/theme.py +++ b/theme.py @@ -8,13 +8,16 @@ Because of various ttk limitations this app is an unholy mix of Tk and ttk widgets. So can't use ttk's theme support. So have to change colors manually. """ +from __future__ import annotations + import os import sys import tkinter as tk from os.path import join from tkinter import font as tk_font from tkinter import ttk -from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Set, Tuple, Union +from typing import TYPE_CHECKING, Callable + from config import config from EDMCLogging import get_main_logger from ttkHyperlinkLabel import HyperlinkLabel @@ -133,16 +136,16 @@ class _Theme: THEME_TRANSPARENT = 2 def __init__(self) -> None: - self.active: Optional[int] = None # Starts out with no theme - self.minwidth: Optional[int] = None - self.widgets: Dict[Union[tk.Widget, tk.BitmapImage], Set] = {} - self.widgets_pair: List = [] - self.defaults: Dict = {} - self.current: Dict = {} - self.default_ui_scale: Optional[float] = None # None == not yet known - self.startup_ui_scale: Optional[int] = None - - def register(self, widget: Union[tk.Widget, tk.BitmapImage]) -> None: # noqa: CCR001, C901 + self.active: int | None = None # Starts out with no theme + self.minwidth: int | None = None + self.widgets: dict[tk.Widget | tk.BitmapImage, set] = {} + self.widgets_pair: list = [] + self.defaults: dict = {} + self.current: dict = {} + self.default_ui_scale: float | None = None # None == not yet known + self.startup_ui_scale: int | None = None + + def register(self, widget: tk.Widget | tk.BitmapImage) -> None: # noqa: CCR001, C901 # Note widget and children for later application of a theme. Note if # the widget has explicit fg or bg attributes. assert isinstance(widget, (tk.BitmapImage, tk.Widget)), widget @@ -206,17 +209,17 @@ def register(self, widget: Union[tk.Widget, tk.BitmapImage]) -> None: # noqa: C for child in widget.winfo_children(): self.register(child) - def register_alternate(self, pair: Tuple, gridopts: Dict) -> None: + def register_alternate(self, pair: tuple, gridopts: dict) -> None: self.widgets_pair.append((pair, gridopts)) def button_bind( - self, widget: tk.Widget, command: Callable, image: Optional[tk.BitmapImage] = None + self, widget: tk.Widget, command: Callable, image: tk.BitmapImage | None = None ) -> None: widget.bind('', command) widget.bind('', lambda e: self._enter(e, image)) widget.bind('', lambda e: self._leave(e, image)) - def _enter(self, event: tk.Event, image: Optional[tk.BitmapImage]) -> None: + def _enter(self, event: tk.Event, image: tk.BitmapImage | None) -> None: widget = event.widget if widget and widget['state'] != tk.DISABLED: try: @@ -233,7 +236,7 @@ def _enter(self, event: tk.Event, image: Optional[tk.BitmapImage]) -> None: except Exception: logger.exception(f'Failure configuring image: {image=}') - def _leave(self, event: tk.Event, image: Optional[tk.BitmapImage]) -> None: + def _leave(self, event: tk.Event, image: tk.BitmapImage | None) -> None: widget = event.widget if widget and widget['state'] != tk.DISABLED: try: @@ -312,11 +315,11 @@ def update(self, widget: tk.Widget) -> None: self._update_widget(child) # Apply current theme to a single widget - def _update_widget(self, widget: Union[tk.Widget, tk.BitmapImage]) -> None: # noqa: CCR001, C901 + def _update_widget(self, widget: tk.Widget | tk.BitmapImage) -> None: # noqa: CCR001, C901 if widget not in self.widgets: if isinstance(widget, tk.Widget): w_class = widget.winfo_class() - w_keys: List[str] = widget.keys() + w_keys: list[str] = widget.keys() else: # There is no tk.BitmapImage.winfo_class() @@ -327,7 +330,7 @@ def _update_widget(self, widget: Union[tk.Widget, tk.BitmapImage]) -> None: # n assert_str = f'{w_class} {widget} "{"text" in w_keys and widget["text"]}"' raise AssertionError(assert_str) - attribs: Set = self.widgets.get(widget, set()) + attribs: set = self.widgets.get(widget, set()) try: if isinstance(widget, tk.BitmapImage): @@ -421,7 +424,6 @@ def apply(self, root: tk.Tk) -> None: # noqa: CCR001, C901 if self.active == theme: return # Don't need to mess with the window manager - self.active = theme if sys.platform == 'darwin': diff --git a/timeout_session.py b/timeout_session.py index 611b6aa365..55fc4dc0e7 100644 --- a/timeout_session.py +++ b/timeout_session.py @@ -5,8 +5,9 @@ Licensed under the GNU General Public License. See LICENSE file. """ -from typing import Optional, Any -from requests import PreparedRequest, Response, Session +from __future__ import annotations + +from requests import Session, Response from requests.adapters import HTTPAdapter from config import user_agent @@ -23,17 +24,15 @@ def __init__(self, timeout: int, *args, **kwargs): super().__init__(*args, **kwargs) - def send(self, request: PreparedRequest, *args, **kwargs: Any) -> Response: + def send(self, *args, **kwargs) -> Response: """Send, but with a timeout always set.""" if kwargs["timeout"] is None: kwargs["timeout"] = self.default_timeout - return super().send(request, *args, **kwargs) + return super().send(*args, **kwargs) -def new_session( - timeout: int = REQUEST_TIMEOUT, session: Optional[Session] = None -) -> Session: +def new_session(timeout: int = REQUEST_TIMEOUT, session: Session | None = None) -> Session: """ Create a new requests.Session and override the default HTTPAdapter with a TimeoutAdapter. @@ -41,9 +40,11 @@ def new_session( :param session: the Session object to attach the Adapter to, defaults to a new session :return: The created Session """ - with Session() as session: - session.headers['User-Agent'] = user_agent - adapter = TimeoutAdapter(timeout) - session.mount("http://", adapter) - session.mount("https://", adapter) - return session + session = session or Session() + session.headers.setdefault("User-Agent", user_agent) + + adapter = TimeoutAdapter(timeout) + for prefix in ("http://", "https://"): + session.mount(prefix, adapter) + + return session diff --git a/update.py b/update.py index f3e6c74ccd..f88daa38cd 100644 --- a/update.py +++ b/update.py @@ -5,14 +5,15 @@ Licensed under the GNU General Public License. See LICENSE file. """ +from __future__ import annotations + import os import sys import threading from os.path import dirname, join from traceback import print_exc -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from xml.etree import ElementTree - import requests import semantic_version from config import appname, appversion_nobuild, config, update_feed @@ -21,6 +22,7 @@ if TYPE_CHECKING: import tkinter as tk + logger = get_main_logger() @@ -68,16 +70,16 @@ def use_internal(self) -> bool: return False - def __init__(self, tkroot: Optional['tk.Tk'] = None, provider: str = 'internal'): + def __init__(self, tkroot: tk.Tk | None = None, provider: str = 'internal'): """ Initialise an Updater instance. :param tkroot: reference to the root window of the GUI :param provider: 'internal' or other string if not """ - self.root: Optional['tk.Tk'] = tkroot + self.root: tk.Tk | None = tkroot self.provider: str = provider - self.thread: Optional[threading.Thread] = None + self.thread: threading.Thread | None = None if self.use_internal(): return @@ -86,7 +88,7 @@ def __init__(self, tkroot: Optional['tk.Tk'] = None, provider: str = 'internal') import ctypes try: - self.updater: Optional[ctypes.CDLL] = ctypes.cdll.WinSparkle + self.updater: ctypes.CDLL | None = ctypes.cdll.WinSparkle # Set the appcast URL self.updater.win_sparkle_set_appcast_url(update_feed.encode()) @@ -155,7 +157,7 @@ def check_for_updates(self) -> None: elif sys.platform == 'darwin' and self.updater: self.updater.checkForUpdates_(None) - def check_appcast(self) -> Optional[EDMCVersion]: + def check_appcast(self) -> EDMCVersion | None: """ Manually (no Sparkle or WinSparkle) check the update_feed appcast file.