Skip to content

Commit

Permalink
Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Rixxan committed Nov 17, 2023
1 parent a8525b0 commit e787555
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 73 deletions.
19 changes: 10 additions & 9 deletions debug_webserver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Simple HTTP listener to be used with debugging various EDMC sends."""
from __future__ import annotations

import gzip
import json
import pathlib
import tempfile
import threading
import zlib
from http import server
from typing import Any, Callable, Literal, Tuple, Union, Dict
from typing import Any, Callable, Literal
from urllib.parse import parse_qs

from config import appname
from EDMCLogging import get_main_logger

Expand All @@ -21,9 +24,6 @@
class LoggingHandler(server.BaseHTTPRequestHandler):
"""HTTP Handler implementation that logs to EDMCs logger and writes data to files on disk."""

def __init__(self, request, client_address: Tuple[str, int], server) -> None:
super().__init__(request, client_address, server)

def log_message(self, format: str, *args: Any) -> None:
"""Override default handler logger with EDMC logger."""
logger.info(format % args)
Expand All @@ -44,7 +44,7 @@ def do_POST(self) -> None: # noqa: N802 # I cant change it
elif len(target_path) == 1 and target_path[0] == '/':
target_path = 'WEB_ROOT'

response: Union[Callable[[str], str], str, None] = DEFAULT_RESPONSES.get(target_path)
response: Callable[[str], str] | str | None = DEFAULT_RESPONSES.get(target_path)
if callable(response):
response = response(to_save)

Expand All @@ -71,11 +71,11 @@ def do_POST(self) -> None: # noqa: N802 # I cant change it
logger.warning(f'DATA FOLLOWS\n{data}')
return

with output_lock, target_file.open('a') as f:
f.write(to_save + "\n\n")
with output_lock, target_file.open('a') as file:
file.write(to_save + "\n\n")

@staticmethod
def get_printable(data: bytes, compression: Union[Literal['deflate'], Literal['gzip'], str, None] = None) -> str:
def get_printable(data: bytes, compression: Literal['deflate'] | Literal['gzip'] | str | None = None) -> str:
"""
Convert an incoming data stream into a string.
Expand All @@ -84,6 +84,7 @@ def get_printable(data: bytes, compression: Union[Literal['deflate'], Literal['g
:raises ValueError: If compression is unknown
:return: printable strings
"""
ret: bytes = b''
if compression is None:
ret = data

Expand Down Expand Up @@ -130,7 +131,7 @@ def generate_inara_response(raw_data: str) -> str:
return json.dumps(out)


def extract_edsm_data(data: str) -> Dict[str, Any]:
def extract_edsm_data(data: str) -> dict[str, Any]:
"""Extract relevant data from edsm data."""
res = parse_qs(data)
return {name: data[0] for name, data in res.items()}
Expand Down
30 changes: 16 additions & 14 deletions journal_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations

import pathlib
import sys
import tkinter as tk
from enum import Enum
from os import getpid as os_getpid
from tkinter import ttk
from typing import TYPE_CHECKING, Callable, Optional
from typing import TYPE_CHECKING, Callable

from config import config
from EDMCLogging import get_main_logger

Expand All @@ -37,10 +40,10 @@ class JournalLock:

def __init__(self) -> None:
"""Initialise where the journal directory and lock file are."""
self.journal_dir: Optional[str] = config.get_str('journaldir') or config.default_journal_dir
self.journal_dir_path: Optional[pathlib.Path] = None
self.journal_dir: str | None = config.get_str('journaldir') or config.default_journal_dir
self.journal_dir_path: pathlib.Path | None = None
self.set_path_from_journaldir()
self.journal_dir_lockfile_name: Optional[pathlib.Path] = None
self.journal_dir_lockfile_name: pathlib.Path | None = None
# We never test truthiness of this, so let it be defined when first assigned. Avoids type hint issues.
# self.journal_dir_lockfile: Optional[IO] = None
self.locked = False
Expand All @@ -61,7 +64,6 @@ def open_journal_dir_lockfile(self) -> bool:
"""Open journal_dir lockfile ready for locking."""
self.journal_dir_lockfile_name = self.journal_dir_path / 'edmc-journal-lock.txt' # type: ignore
logger.trace_if('journal-lock', f'journal_dir_lockfile_name = {self.journal_dir_lockfile_name!r}')
self.journal_dir_lockfile = None # Initialize with None
try:
self.journal_dir_lockfile = open(self.journal_dir_lockfile_name, mode='w+', encoding='utf-8')

Expand Down Expand Up @@ -105,7 +107,7 @@ def _obtain_lock(self) -> JournalLockResult:
import msvcrt

try:
msvcrt.locking(self.journal_dir_lockfile.fileno(), msvcrt.LK_NBLCK, 4096) # type: ignore
msvcrt.locking(self.journal_dir_lockfile.fileno(), msvcrt.LK_NBLCK, 4096)

except Exception as e:
logger.info(f"Exception: Couldn't lock journal directory \"{self.journal_dir}\""
Expand All @@ -123,15 +125,15 @@ def _obtain_lock(self) -> JournalLockResult:
return JournalLockResult.LOCKED

try:
fcntl.flock(self.journal_dir_lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) # type: ignore
fcntl.flock(self.journal_dir_lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)

except Exception as e:
logger.info(f"Exception: Couldn't lock journal directory \"{self.journal_dir}\", "
f"assuming another process running: {e!r}")
return JournalLockResult.ALREADY_LOCKED

self.journal_dir_lockfile.write(f"Path: {self.journal_dir}\nPID: {os_getpid()}\n") # type: ignore
self.journal_dir_lockfile.flush() # type: ignore
self.journal_dir_lockfile.write(f"Path: {self.journal_dir}\nPID: {os_getpid()}\n")
self.journal_dir_lockfile.flush()

logger.trace_if('journal-lock', 'Done')
self.locked = True
Expand All @@ -156,8 +158,8 @@ def release_lock(self) -> bool:
try:
# Need to seek to the start first, as lock range is relative to
# current position
self.journal_dir_lockfile.seek(0) # type: ignore
msvcrt.locking(self.journal_dir_lockfile.fileno(), msvcrt.LK_UNLCK, 4096) # type: ignore
self.journal_dir_lockfile.seek(0)
msvcrt.locking(self.journal_dir_lockfile.fileno(), msvcrt.LK_UNLCK, 4096)

except Exception as e:
logger.info(f"Exception: Couldn't unlock journal directory \"{self.journal_dir}\": {e!r}")
Expand All @@ -175,17 +177,17 @@ def release_lock(self) -> bool:
return True # Lie about being unlocked

try:
fcntl.flock(self.journal_dir_lockfile, fcntl.LOCK_UN) # type: ignore
fcntl.flock(self.journal_dir_lockfile, fcntl.LOCK_UN)

except Exception as e:
logger.info(f"Exception: Couldn't unlock journal directory \"{self.journal_dir}\": {e!r}")

else:
unlocked = True

# Close the file whether the unlocking succeeded.
# Close the file whether or not the unlocking succeeded.
if hasattr(self, 'journal_dir_lockfile'):
self.journal_dir_lockfile.close() # type: ignore
self.journal_dir_lockfile.close()

# Doing this makes it impossible for tests to ensure the file
# is removed as a part of cleanup. So don't.
Expand Down
23 changes: 12 additions & 11 deletions myNotebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
Entire file may be imported by plugins.
"""
from __future__ import annotations

import sys
import tkinter as tk
from tkinter import ttk
from typing import Optional

# Can't do this with styles on OSX - http://www.tkdocs.com/tutorial/styles.html#whydifficult
if sys.platform == 'darwin':
Expand All @@ -29,7 +30,7 @@
class Notebook(ttk.Notebook):
"""Custom ttk.Notebook class to fix some display issues."""

def __init__(self, master: Optional[ttk.Frame] = None, **kw):
def __init__(self, master: ttk.Frame | None = None, **kw):

ttk.Notebook.__init__(self, master, **kw)
style = ttk.Style()
Expand Down Expand Up @@ -58,7 +59,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw):
class Frame(sys.platform == 'darwin' and tk.Frame or ttk.Frame): # type: ignore
"""Custom t(t)k.Frame class to fix some display issues."""

def __init__(self, master: Optional[ttk.Notebook] = None, **kw):
def __init__(self, master: ttk.Notebook | None = None, **kw):
if sys.platform == 'darwin':
kw['background'] = kw.pop('background', PAGEBG)
tk.Frame.__init__(self, master, **kw)
Expand All @@ -75,11 +76,11 @@ def __init__(self, master: Optional[ttk.Notebook] = None, **kw):
class Label(tk.Label):
"""Custom tk.Label class to fix some display issues."""

def __init__(self, master: Optional[ttk.Frame] = None, **kw):
def __init__(self, master: ttk.Frame | None = None, **kw):
# This format chosen over `sys.platform in (...)` as mypy and friends dont understand that
if sys.platform in ('darwin', 'win32'):
kw['foreground'] = kw.pop('foreground', PAGEFG) # type: ignore
kw['background'] = kw.pop('background', PAGEBG) # type: ignore
kw['foreground'] = kw.pop('foreground', PAGEFG)
kw['background'] = kw.pop('background', PAGEBG)
else:
kw['foreground'] = kw.pop('foreground', ttk.Style().lookup('TLabel', 'foreground'))
kw['background'] = kw.pop('background', ttk.Style().lookup('TLabel', 'background'))
Expand All @@ -89,7 +90,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw):
class Entry(sys.platform == 'darwin' and tk.Entry or ttk.Entry): # type: ignore
"""Custom t(t)k.Entry class to fix some display issues."""

def __init__(self, master: Optional[ttk.Frame] = None, **kw):
def __init__(self, master: ttk.Frame | None = None, **kw):
if sys.platform == 'darwin':
kw['highlightbackground'] = kw.pop('highlightbackground', PAGEBG)
tk.Entry.__init__(self, master, **kw)
Expand All @@ -100,7 +101,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw):
class Button(sys.platform == 'darwin' and tk.Button or ttk.Button): # type: ignore
"""Custom t(t)k.Button class to fix some display issues."""

def __init__(self, master: Optional[ttk.Frame] = None, **kw):
def __init__(self, master: ttk.Frame | None = None, **kw):
if sys.platform == 'darwin':
kw['highlightbackground'] = kw.pop('highlightbackground', PAGEBG)
tk.Button.__init__(self, master, **kw)
Expand All @@ -113,7 +114,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw):
class ColoredButton(sys.platform == 'darwin' and tk.Label or tk.Button): # type: ignore
"""Custom t(t)k.ColoredButton class to fix some display issues."""

def __init__(self, master: Optional[ttk.Frame] = None, **kw):
def __init__(self, master: ttk.Frame | None = None, **kw):
if sys.platform == 'darwin':
# Can't set Button background on OSX, so use a Label instead
kw['relief'] = kw.pop('relief', tk.RAISED)
Expand All @@ -131,7 +132,7 @@ def _press(self, event):
class Checkbutton(sys.platform == 'darwin' and tk.Checkbutton or ttk.Checkbutton): # type: ignore
"""Custom t(t)k.Checkbutton class to fix some display issues."""

def __init__(self, master: Optional[ttk.Frame] = None, **kw):
def __init__(self, master: ttk.Frame | None = None, **kw):
if sys.platform == 'darwin':
kw['foreground'] = kw.pop('foreground', PAGEFG)
kw['background'] = kw.pop('background', PAGEBG)
Expand All @@ -145,7 +146,7 @@ def __init__(self, master: Optional[ttk.Frame] = None, **kw):
class Radiobutton(sys.platform == 'darwin' and tk.Radiobutton or ttk.Radiobutton): # type: ignore
"""Custom t(t)k.Radiobutton class to fix some display issues."""

def __init__(self, master: Optional[ttk.Frame] = None, **kw):
def __init__(self, master: ttk.Frame | None = None, **kw):
if sys.platform == 'darwin':
kw['foreground'] = kw.pop('foreground', PAGEFG)
kw['background'] = kw.pop('background', PAGEBG)
Expand Down
40 changes: 21 additions & 19 deletions theme.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
Because of various ttk limitations this app is an unholy mix of Tk and ttk widgets.
So can't use ttk's theme support. So have to change colors manually.
"""
from __future__ import annotations

import os
import sys
import tkinter as tk
from os.path import join
from tkinter import font as tk_font
from tkinter import ttk
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import TYPE_CHECKING, Callable

from config import config
from EDMCLogging import get_main_logger
from ttkHyperlinkLabel import HyperlinkLabel
Expand Down Expand Up @@ -133,16 +136,16 @@ class _Theme:
THEME_TRANSPARENT = 2

def __init__(self) -> None:
self.active: Optional[int] = None # Starts out with no theme
self.minwidth: Optional[int] = None
self.widgets: Dict[Union[tk.Widget, tk.BitmapImage], Set] = {}
self.widgets_pair: List = []
self.defaults: Dict = {}
self.current: Dict = {}
self.default_ui_scale: Optional[float] = None # None == not yet known
self.startup_ui_scale: Optional[int] = None

def register(self, widget: Union[tk.Widget, tk.BitmapImage]) -> None: # noqa: CCR001, C901
self.active: int | None = None # Starts out with no theme
self.minwidth: int | None = None
self.widgets: dict[tk.Widget | tk.BitmapImage, set] = {}
self.widgets_pair: list = []
self.defaults: dict = {}
self.current: dict = {}
self.default_ui_scale: float | None = None # None == not yet known
self.startup_ui_scale: int | None = None

def register(self, widget: tk.Widget | tk.BitmapImage) -> None: # noqa: CCR001, C901
# Note widget and children for later application of a theme. Note if
# the widget has explicit fg or bg attributes.
assert isinstance(widget, (tk.BitmapImage, tk.Widget)), widget
Expand Down Expand Up @@ -206,17 +209,17 @@ def register(self, widget: Union[tk.Widget, tk.BitmapImage]) -> None: # noqa: C
for child in widget.winfo_children():
self.register(child)

def register_alternate(self, pair: Tuple, gridopts: Dict) -> None:
def register_alternate(self, pair: tuple, gridopts: dict) -> None:
self.widgets_pair.append((pair, gridopts))

def button_bind(
self, widget: tk.Widget, command: Callable, image: Optional[tk.BitmapImage] = None
self, widget: tk.Widget, command: Callable, image: tk.BitmapImage | None = None
) -> None:
widget.bind('<Button-1>', command)
widget.bind('<Enter>', lambda e: self._enter(e, image))
widget.bind('<Leave>', lambda e: self._leave(e, image))

def _enter(self, event: tk.Event, image: Optional[tk.BitmapImage]) -> None:
def _enter(self, event: tk.Event, image: tk.BitmapImage | None) -> None:
widget = event.widget
if widget and widget['state'] != tk.DISABLED:
try:
Expand All @@ -233,7 +236,7 @@ def _enter(self, event: tk.Event, image: Optional[tk.BitmapImage]) -> None:
except Exception:
logger.exception(f'Failure configuring image: {image=}')

def _leave(self, event: tk.Event, image: Optional[tk.BitmapImage]) -> None:
def _leave(self, event: tk.Event, image: tk.BitmapImage | None) -> None:
widget = event.widget
if widget and widget['state'] != tk.DISABLED:
try:
Expand Down Expand Up @@ -312,11 +315,11 @@ def update(self, widget: tk.Widget) -> None:
self._update_widget(child)

# Apply current theme to a single widget
def _update_widget(self, widget: Union[tk.Widget, tk.BitmapImage]) -> None: # noqa: CCR001, C901
def _update_widget(self, widget: tk.Widget | tk.BitmapImage) -> None: # noqa: CCR001, C901
if widget not in self.widgets:
if isinstance(widget, tk.Widget):
w_class = widget.winfo_class()
w_keys: List[str] = widget.keys()
w_keys: list[str] = widget.keys()

else:
# There is no tk.BitmapImage.winfo_class()
Expand All @@ -327,7 +330,7 @@ def _update_widget(self, widget: Union[tk.Widget, tk.BitmapImage]) -> None: # n
assert_str = f'{w_class} {widget} "{"text" in w_keys and widget["text"]}"'
raise AssertionError(assert_str)

attribs: Set = self.widgets.get(widget, set())
attribs: set = self.widgets.get(widget, set())

try:
if isinstance(widget, tk.BitmapImage):
Expand Down Expand Up @@ -421,7 +424,6 @@ def apply(self, root: tk.Tk) -> None: # noqa: CCR001, C901

if self.active == theme:
return # Don't need to mess with the window manager

self.active = theme

if sys.platform == 'darwin':
Expand Down
Loading

0 comments on commit e787555

Please sign in to comment.