From cb2a18025c929b5842c790e3a20b2841c3e73592 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Thu, 30 Nov 2023 21:30:18 -0500 Subject: [PATCH 1/3] [2051] First Pass Remaining Files --- companion.py | 182 ++++++++++++++++++++++----------------------- hotkey/__init__.py | 7 +- hotkey/darwin.py | 14 ++-- hotkey/linux.py | 3 +- hotkey/windows.py | 25 +++---- monitor.py | 124 +++++++++++++++--------------- plug.py | 126 ++++++++++++++++--------------- stats.py | 20 +++-- 8 files changed, 253 insertions(+), 248 deletions(-) diff --git a/companion.py b/companion.py index 603222316..4f729479c 100644 --- a/companion.py +++ b/companion.py @@ -1,10 +1,15 @@ """ -Handle use of Frontier's Companion API (CAPI) service. +companion.py - Handle use of Frontier's Companion API (CAPI) service. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. Deals with initiating authentication for, and use of, CAPI. Some associated code is in protocol.py which creates and handles the edmc:// protocol used for the callback. """ +from __future__ import annotations import base64 import collections @@ -21,13 +26,10 @@ import tkinter as tk import urllib.parse import webbrowser -from builtins import object, range, str from email.utils import parsedate from queue import Queue -from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, OrderedDict, TypeVar, Union - +from typing import TYPE_CHECKING, Any, Mapping, OrderedDict, TypeVar import requests - import config as conf_module import killswitch import protocol @@ -43,7 +45,7 @@ def _(x): return x UserDict = collections.UserDict[str, Any] # indicate to our type checkers what this generic class holds normally else: - UserDict = collections.UserDict # type: ignore # Otherwise simply use the actual class + UserDict = collections.UserDict # Otherwise simply use the actual class capi_query_cooldown = 60 # Minimum time between (sets of) CAPI queries @@ -59,7 +61,7 @@ def _(x): return x SERVER_LEGACY = 'https://legacy-companion.orerve.net' SERVER_BETA = 'https://pts-companion.orerve.net' -commodity_map: Dict = {} +commodity_map: dict = {} class CAPIData(UserDict): @@ -67,10 +69,10 @@ class CAPIData(UserDict): def __init__( self, - data: Union[str, Dict[str, Any], 'CAPIData', None] = None, - source_host: Optional[str] = None, - source_endpoint: Optional[str] = None, - request_cmdr: Optional[str] = None + data: str | dict[str, Any] | 'CAPIData' | None = None, + source_host: str | None = None, + source_endpoint: str | None = None, + request_cmdr: str | None = None ) -> None: if data is None: super().__init__() @@ -102,13 +104,13 @@ def check_modules_ships(self) -> None: This has side-effects of fixing `data` to be as expected in terms of types of those elements. """ - modules: Dict[str, Any] = self.data['lastStarport'].get('modules') + modules: dict[str, Any] = self.data['lastStarport'].get('modules') if modules is None or not isinstance(modules, dict): if modules is None: logger.debug('modules was None. FC or Damaged Station?') elif isinstance(modules, list): - if len(modules) == 0: + if not modules: logger.debug('modules is empty list. Damaged Station?') else: @@ -120,13 +122,13 @@ def check_modules_ships(self) -> None: # Set a safe value self.data['lastStarport']['modules'] = modules = {} - ships: Dict[str, Any] = self.data['lastStarport'].get('ships') + ships: dict[str, Any] = self.data['lastStarport'].get('ships') if ships is None or not isinstance(ships, dict): if ships is None: logger.debug('ships was None') else: - logger.error(f'ships was neither None nor a Dict! type: {type(ships)}, content: {ships}') + logger.error(f'ships was neither None nor a dict! type: {type(ships)}, content: {ships}') # Set a safe value self.data['lastStarport']['ships'] = {'shipyard_list': {}, 'unavailable_list': []} @@ -152,7 +154,7 @@ def __init__(self, raw_data: str, query_time: datetime.datetime): class CAPIDataRaw: """The last obtained raw CAPI response for each endpoint.""" - raw_data: Dict[str, CAPIDataRawEndpoint] = {} + raw_data: dict[str, CAPIDataRawEndpoint] = {} def record_endpoint( self, endpoint: str, @@ -176,14 +178,14 @@ def __str__(self): def __iter__(self): """Make this iterable on its raw_data dict.""" - yield from self.raw_data + yield from self.raw_data.keys() def __getitem__(self, item): """Make the raw_data dict's items get'able.""" - return self.raw_data.__getitem__(item) + return self.raw_data[item] -def listify(thing: Union[List, Dict]) -> List: +def listify(thing: list | dict) -> list: """ Convert actual JSON array or int-indexed dict into a Python list. @@ -196,11 +198,11 @@ def listify(thing: Union[List, Dict]) -> List: if thing is None: return [] # data is not present - elif isinstance(thing, list): + if isinstance(thing, list): return list(thing) # array is not sparse - elif isinstance(thing, dict): - retval: List[Any] = [] + if isinstance(thing, dict): + retval: list[Any] = [] for k, v in thing.items(): idx = int(k) @@ -211,9 +213,7 @@ def listify(thing: Union[List, Dict]) -> List: retval[idx] = v return retval - - else: - raise ValueError(f"expected an array or sparse array, got {thing!r}") + raise ValueError(f"expected an array or sparse array, got {thing!r}") class ServerError(Exception): @@ -297,7 +297,7 @@ def __init__(self, *args) -> None: self.args = (_('Error: Wrong Cmdr'),) -class Auth(object): +class Auth: """Handles authentication with the Frontier CAPI service via oAuth2.""" # Currently the "Elite Dangerous Market Connector (EDCD/Athanasius)" one in @@ -313,15 +313,15 @@ def __init__(self, cmdr: str) -> None: self.cmdr: str = cmdr self.requests_session = requests.Session() self.requests_session.headers['User-Agent'] = user_agent - self.verifier: Union[bytes, None] = None - self.state: Union[str, None] = None + self.verifier: bytes | None = None + self.state: str | None = None def __del__(self) -> None: """Ensure our Session is closed if we're being deleted.""" if self.requests_session: self.requests_session.close() - def refresh(self) -> Optional[str]: + def refresh(self) -> str | None: """ Attempt use of Refresh Token to get a valid Access Token. @@ -347,7 +347,7 @@ def refresh(self) -> Optional[str]: logger.debug(f'idx = {idx}') tokens = config.get_list('fdev_apikeys', default=[]) - tokens = tokens + [''] * (len(cmdrs) - len(tokens)) + tokens += [''] * (len(cmdrs) - len(tokens)) if tokens[idx]: logger.debug('We have a refresh token for that idx') data = { @@ -358,7 +358,7 @@ def refresh(self) -> Optional[str]: logger.debug('Attempting refresh with Frontier...') try: - r: Optional[requests.Response] = None + r: requests.Response | None = None r = self.requests_session.post( FRONTIER_AUTH_SERVER + self.FRONTIER_AUTH_PATH_TOKEN, data=data, @@ -372,11 +372,10 @@ def refresh(self) -> Optional[str]: return data.get('access_token') - else: - logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"") - self.dump(r) + logger.error(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"") + self.dump(r) - except (ValueError, requests.RequestException, ) as e: + except (ValueError, requests.RequestException) as e: logger.exception(f"Frontier CAPI Auth: Can't refresh token for \"{self.cmdr}\"\n{e!r}") if r is not None: self.dump(r) @@ -490,7 +489,7 @@ def authorize(self, payload: str) -> str: # noqa: CCR001 cmdrs = config.get_list('cmdrs', default=[]) idx = cmdrs.index(self.cmdr) tokens = config.get_list('fdev_apikeys', default=[]) - tokens = tokens + [''] * (len(cmdrs) - len(tokens)) + tokens += [''] * (len(cmdrs) - len(tokens)) tokens[idx] = data_token.get('refresh_token', '') config.set('fdev_apikeys', tokens) config.save() # Save settings now for use by command-line app @@ -518,9 +517,9 @@ def authorize(self, payload: str) -> str: # noqa: CCR001 raise CredentialsError(f'{_("Error")}: {error!r}') @staticmethod - def invalidate(cmdr: Optional[str]) -> None: + def invalidate(cmdr: str | None) -> None: """Invalidate Refresh Token for specified Commander.""" - to_set: Optional[list] = None + to_set: list | None = None if cmdr is None: logger.info('Frontier CAPI Auth: Invalidating ALL tokens!') cmdrs = config.get_list('cmdrs', default=[]) @@ -531,7 +530,7 @@ def invalidate(cmdr: Optional[str]) -> None: cmdrs = config.get_list('cmdrs', default=[]) idx = cmdrs.index(cmdr) to_set = config.get_list('fdev_apikeys', default=[]) - to_set = to_set + [''] * (len(cmdrs) - len(to_set)) # type: ignore + to_set += [''] * (len(cmdrs) - len(to_set)) to_set[idx] = '' if to_set is None: @@ -560,7 +559,7 @@ class EDMCCAPIReturn: """Base class for Request, Failure or Response.""" def __init__( - self, query_time: int, tk_response_event: Optional[str] = None, + self, query_time: int, tk_response_event: str | None = None, play_sound: bool = False, auto_update: bool = False ): self.tk_response_event = tk_response_event # Name of tk event to generate when response queued. @@ -577,7 +576,7 @@ class EDMCCAPIRequest(EDMCCAPIReturn): def __init__( self, capi_host: str, endpoint: str, query_time: int, - tk_response_event: Optional[str] = None, + tk_response_event: str | None = None, play_sound: bool = False, auto_update: bool = False ): super().__init__( @@ -612,7 +611,7 @@ def __init__( self.exception: Exception = exception # Exception that recipient should raise. -class Session(object): +class Session: """Methods for handling Frontier Auth and CAPI queries.""" STATE_INIT, STATE_AUTH, STATE_OK = list(range(3)) @@ -628,11 +627,11 @@ class Session(object): def __init__(self) -> None: self.state = Session.STATE_INIT - self.credentials: Optional[Dict[str, Any]] = None + self.credentials: dict[str, Any] | None = None self.requests_session = requests.Session() - self.auth: Optional[Auth] = None + self.auth: Auth | None = None self.retrying = False # Avoid infinite loop when successful auth / unsuccessful query - self.tk_master: Optional[tk.Tk] = None + self.tk_master: tk.Tk | None = None self.capi_raw_data = CAPIDataRaw() # Cache of raw replies from CAPI service # Queue that holds requests for CAPI queries, the items should always @@ -642,7 +641,7 @@ def __init__(self) -> None: # queries back to the requesting code (technically anything checking # this queue, but it should be either EDMarketConnector.AppWindow or # EDMC.py). Items may be EDMCCAPIResponse or EDMCCAPIFailedRequest. - self.capi_response_queue: Queue[Union[EDMCCAPIResponse, EDMCCAPIFailedRequest]] = Queue() + self.capi_response_queue: Queue[EDMCCAPIResponse | EDMCCAPIFailedRequest] = Queue() logger.debug('Starting CAPI queries thread...') self.capi_query_thread = threading.Thread( target=self.capi_query_worker, @@ -667,7 +666,7 @@ def start_frontier_auth(self, access_token: str) -> None: self.state = Session.STATE_OK - def login(self, cmdr: Optional[str] = None, is_beta: Optional[bool] = None) -> bool: + def login(self, cmdr: str | None = None, is_beta: bool | None = None) -> bool: """ Attempt oAuth2 login. @@ -694,7 +693,7 @@ def login(self, cmdr: Optional[str] = None, is_beta: Optional[bool] = None) -> b logger.error('self.credentials is None') raise CredentialsError('Missing credentials') # Shouldn't happen - elif self.state == Session.STATE_OK: + if self.state == Session.STATE_OK: logger.debug('already logged in (state == STATE_OK)') return True # already logged in @@ -704,10 +703,9 @@ def login(self, cmdr: Optional[str] = None, is_beta: Optional[bool] = None) -> b logger.debug(f'already logged in (is_beta = {is_beta})') return True # already logged in - else: - logger.debug('changed account or retrying login during auth') - self.reinit_session() - self.credentials = credentials + logger.debug('changed account or retrying login during auth') + self.reinit_session() + self.credentials = credentials self.state = Session.STATE_INIT self.auth = Auth(self.credentials['cmdr']) @@ -719,11 +717,10 @@ def login(self, cmdr: Optional[str] = None, is_beta: Optional[bool] = None) -> b self.start_frontier_auth(access_token) return True - else: - logger.debug('We do NOT have an access_token') - self.state = Session.STATE_AUTH - return False - # Wait for callback + logger.debug('We do NOT have an access_token') + self.state = Session.STATE_AUTH + return False + # Wait for callback # Callback from protocol handler def auth_callback(self) -> None: @@ -745,7 +742,7 @@ def auth_callback(self) -> None: self.auth = None raise # Bad thing happened if getattr(sys, 'frozen', False): - tk.messagebox.showinfo(title="Authentication Successful", + tk.messagebox.showinfo(title="Authentication Successful", # type: ignore message="Authentication with cAPI Successful.\n" "You may now close the Frontier login tab if it is still open.") @@ -812,11 +809,11 @@ def capi_single_query( raise ServerConnectionError(f'Pretending CAPI down: {capi_endpoint}') if conf_module.capi_debug_access_token is not None: - self.requests_session.headers['Authorization'] = f'Bearer {conf_module.capi_debug_access_token}' # type: ignore # noqa: E501 + self.requests_session.headers['Authorization'] = f'Bearer {conf_module.capi_debug_access_token}' # This is one-shot conf_module.capi_debug_access_token = None - r = self.requests_session.get(capi_host + capi_endpoint, timeout=timeout) # type: ignore + r = self.requests_session.get(capi_host + capi_endpoint, timeout=timeout) logger.trace_if('capi.worker', '... got result...') r.raise_for_status() # Typically 403 "Forbidden" on token expiry @@ -835,21 +832,7 @@ def capi_single_query( raise ServerConnectionError(f'Unable to connect to endpoint: {capi_endpoint}') from e except requests.HTTPError as e: # In response to raise_for_status() - logger.exception(f'Frontier CAPI Auth: GET {capi_endpoint}') - self.dump(r) - - if r.status_code == 401: # CAPI doesn't think we're Auth'd - # TODO: This needs to try a REFRESH, not a full re-auth - # No need for translation, we'll go straight into trying new Auth - # and thus any message would be overwritten. - raise CredentialsRequireRefresh('Frontier CAPI said "unauthorized"') from e - - if r.status_code == 418: # "I'm a teapot" - used to signal maintenance - # LANG: Frontier CAPI returned 418, meaning down for maintenance - raise ServerError(_("Frontier CAPI down for maintenance")) from e - - logger.exception('Frontier CAPI: Misc. Error') - raise ServerError('Frontier CAPI: Misc. Error') from e + handle_http_error(e.response, capi_endpoint) # type: ignore # Handle various HTTP errors except ValueError as e: logger.exception(f'decoding CAPI response content:\n{r.content.decode(encoding="utf-8")}\n') @@ -870,6 +853,28 @@ def capi_single_query( return capi_data + def handle_http_error(response: requests.Response, endpoint: str): + """ + Handle different types of HTTP errors raised during CAPI requests. + + :param response: The HTTP response object. + :param endpoint: The CAPI endpoint that was queried. + :raises: Various exceptions based on the error scenarios. + """ + logger.exception(f'Frontier CAPI Auth: GET {endpoint}') + self.dump(response) + + if response.status_code == 401: + # CAPI doesn't think we're Auth'd + raise CredentialsRequireRefresh('Frontier CAPI said "unauthorized"') + + if response.status_code == 418: + # "I'm a teapot" - used to signal maintenance + raise ServerError(_("Frontier CAPI down for maintenance")) + + logger.exception('Frontier CAPI: Misc. Error') + raise ServerError('Frontier CAPI: Misc. Error') + def capi_station_queries( # noqa: CCR001 capi_host: str, timeout: int = capi_default_requests_timeout ) -> CAPIData: @@ -939,9 +944,8 @@ def capi_station_queries( # noqa: CCR001 logger.warning(f"{last_starport_id!r} != {int(market_data['id'])!r}") raise ServerLagging() - else: - market_data['name'] = last_starport_name - station_data['lastStarport'].update(market_data) + market_data['name'] = last_starport_name + station_data['lastStarport'].update(market_data) if services.get('outfitting') or services.get('shipyard'): shipyard_data = capi_single_query(capi_host, self.FRONTIER_CAPI_PATH_SHIPYARD, timeout=timeout) @@ -953,9 +957,8 @@ def capi_station_queries( # noqa: CCR001 logger.warning(f"{last_starport_id!r} != {int(shipyard_data['id'])!r}") raise ServerLagging() - else: - shipyard_data['name'] = last_starport_name - station_data['lastStarport'].update(shipyard_data) + shipyard_data['name'] = last_starport_name + station_data['lastStarport'].update(shipyard_data) # WORKAROUND END return station_data @@ -1024,7 +1027,7 @@ def capi_query_close_worker(self) -> None: ) def station( - self, query_time: int, tk_response_event: Optional[str] = None, + self, query_time: int, tk_response_event: str | None = None, play_sound: bool = False, auto_update: bool = False ) -> None: """ @@ -1178,11 +1181,9 @@ def capi_host_for_galaxy(self) -> str: logger.debug(f"Using {SERVER_LIVE} because monitor.is_live_galaxy() was True") return SERVER_LIVE - else: - logger.debug(f"Using {SERVER_LEGACY} because monitor.is_live_galaxy() was False") - return SERVER_LEGACY + logger.debug(f"Using {SERVER_LEGACY} because monitor.is_live_galaxy() was False") + return SERVER_LEGACY - return '' ###################################################################### @@ -1300,11 +1301,11 @@ def filter_ship(d: CAPIData) -> CAPIData: V = TypeVar('V') -def index_possibly_sparse_list(data: Union[Mapping[str, V], List[V]], key: int) -> V: +def index_possibly_sparse_list(data: Mapping[str, V] | list[V], key: int) -> V: """ Index into a "list" that may or may not be sparseified into a dict. - :param data: List or Dict to index + :param data: list or dict to index :param key: Key to use to index :raises ValueError: When data is of an unexpected type :return: The value at the key @@ -1320,11 +1321,10 @@ def index_possibly_sparse_list(data: Union[Mapping[str, V], List[V]], key: int) if isinstance(data, list): return data[key] - elif isinstance(data, (dict, OrderedDict)): + if isinstance(data, (dict, OrderedDict)): return data[str(key)] - else: - raise ValueError(f'Unexpected data type {type(data)}') + raise ValueError(f'Unexpected data type {type(data)}') ###################################################################### diff --git a/hotkey/__init__.py b/hotkey/__init__.py index 5162621cf..52f8d6842 100644 --- a/hotkey/__init__.py +++ b/hotkey/__init__.py @@ -79,16 +79,15 @@ def get_hotkeymgr() -> AbstractHotkeyMgr: from hotkey.darwin import MacHotkeyMgr return MacHotkeyMgr() - elif sys.platform == 'win32': + if sys.platform == 'win32': from hotkey.windows import WindowsHotkeyMgr return WindowsHotkeyMgr() - elif sys.platform == 'linux': + if sys.platform == 'linux': from hotkey.linux import LinuxHotKeyMgr return LinuxHotKeyMgr() - else: - raise ValueError(f'Unknown platform: {sys.platform}') + raise ValueError(f'Unknown platform: {sys.platform}') # singleton diff --git a/hotkey/darwin.py b/hotkey/darwin.py index cbf9d2609..63f9d1326 100644 --- a/hotkey/darwin.py +++ b/hotkey/darwin.py @@ -1,8 +1,10 @@ """darwin/macOS implementation of hotkey.AbstractHotkeyMgr.""" +from __future__ import annotations + import pathlib import sys import tkinter as tk -from typing import Callable, Optional, Tuple, Union +from typing import Callable assert sys.platform == 'darwin' import objc @@ -107,7 +109,7 @@ def tkProcessKeyEvent(self, cls, the_event): # noqa: N802 # suppress the event by not chaining the old function return the_event - elif the_event.type() in (NSKeyDown, NSKeyUp): + if the_event.type() in (NSKeyDown, NSKeyUp): c = the_event.charactersIgnoringModifiers() self.acquire_key = (c and ord(c[0]) or 0) | \ (the_event.modifierFlags() & NSDeviceIndependentModifierFlagsMask) @@ -192,7 +194,7 @@ def _acquire_poll(self) -> None: self.acquire_state = MacHotkeyMgr.ACQUIRE_ACTIVE self.root.after(50, self._acquire_poll) - def fromevent(self, event) -> Optional[Union[bool, Tuple]]: + def fromevent(self, event) -> bool | tuple | None: """ Return configuration (keycode, modifiers) or None=clear or False=retain previous. @@ -209,17 +211,17 @@ def fromevent(self, event) -> Optional[Union[bool, Tuple]]: return False # BkSp, Del, Clear = clear hotkey - elif keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]: + if keycode in [0x7f, ord(NSDeleteFunctionKey), ord(NSClearLineFunctionKey)]: self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE return None # don't allow keys needed for typing in System Map - elif keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a: + if keycode in [0x13, 0x20, 0x2d] or 0x61 <= keycode <= 0x7a: NSBeep() self.acquire_state = MacHotkeyMgr.ACQUIRE_INACTIVE return None - return (keycode, modifiers) + return keycode, modifiers def display(self, keycode, modifiers) -> str: """ diff --git a/hotkey/linux.py b/hotkey/linux.py index 927c4d26a..11d95a627 100644 --- a/hotkey/linux.py +++ b/hotkey/linux.py @@ -1,6 +1,7 @@ """Linux implementation of hotkey.AbstractHotkeyMgr.""" -import sys +from __future__ import annotations +import sys from EDMCLogging import get_main_logger from hotkey import AbstractHotkeyMgr diff --git a/hotkey/windows.py b/hotkey/windows.py index 8a1c7acd7..f21a24b24 100644 --- a/hotkey/windows.py +++ b/hotkey/windows.py @@ -1,4 +1,6 @@ """Windows implementation of hotkey.AbstractHotkeyMgr.""" +from __future__ import annotations + import atexit import ctypes import pathlib @@ -7,8 +9,6 @@ import tkinter as tk import winsound from ctypes.wintypes import DWORD, HWND, LONG, LPWSTR, MSG, ULONG, WORD -from typing import Optional, Tuple, Union - from config import config from EDMCLogging import get_main_logger from hotkey import AbstractHotkeyMgr @@ -266,7 +266,7 @@ def acquire_stop(self) -> None: """Stop acquiring hotkey state.""" pass - def fromevent(self, event) -> Optional[Union[bool, Tuple]]: # noqa: CCR001 + def fromevent(self, event) -> bool | tuple | None: # noqa: CCR001 """ Return configuration (keycode, modifiers) or None=clear or False=retain previous. @@ -285,33 +285,32 @@ def fromevent(self, event) -> Optional[Union[bool, Tuple]]: # noqa: CCR001 keycode = event.keycode if keycode in [VK_SHIFT, VK_CONTROL, VK_MENU, VK_LWIN, VK_RWIN]: - return (0, modifiers) + return 0, modifiers if not modifiers: if keycode == VK_ESCAPE: # Esc = retain previous return False - elif keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey + if keycode in [VK_BACK, VK_DELETE, VK_CLEAR, VK_OEM_CLEAR]: # BkSp, Del, Clear = clear hotkey return None - elif ( + if ( keycode in [VK_RETURN, VK_SPACE, VK_OEM_MINUS] or ord('A') <= keycode <= ord('Z') ): # don't allow keys needed for typing in System Map winsound.MessageBeep() return None - elif (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY] - or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys - return (0, modifiers) + if (keycode in [VK_NUMLOCK, VK_SCROLL, VK_PROCESSKEY] + or VK_CAPITAL <= keycode <= VK_MODECHANGE): # ignore unmodified mode switch keys + return 0, modifiers # See if the keycode is usable and available if RegisterHotKey(None, 2, modifiers | MOD_NOREPEAT, keycode): UnregisterHotKey(None, 2) - return (keycode, modifiers) + return keycode, modifiers - else: - winsound.MessageBeep() - return None + winsound.MessageBeep() + return None def display(self, keycode, modifiers) -> str: """ diff --git a/monitor.py b/monitor.py index bbbfc190f..17a6fa165 100644 --- a/monitor.py +++ b/monitor.py @@ -1,7 +1,11 @@ -"""Monitor for new Journal files and contents of latest.""" -# v [sic] -# spell-checker: words onfoot unforseen relog fsdjump suitloadoutid slotid suitid loadoutid fauto Intimidator -# spell-checker: words joinacrew quitacrew sellshiponrebuy newbal navroute npccrewpaidwage sauto +""" +monitor.py - Monitor for new Journal files and contents of latest. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations import json import pathlib @@ -14,19 +18,16 @@ from os import SEEK_END, SEEK_SET, listdir from os.path import basename, expanduser, getctime, isdir, join from time import gmtime, localtime, mktime, sleep, strftime, strptime, time -from typing import TYPE_CHECKING, Any, BinaryIO, MutableMapping, Tuple - -if TYPE_CHECKING: - import tkinter - +from typing import TYPE_CHECKING, Any, BinaryIO, MutableMapping import semantic_version - import util_ships from config import config from edmc_data import edmc_suit_shortnames, edmc_suit_symbol_localised from EDMCLogging import get_main_logger -# spell-checker: words navroute +if TYPE_CHECKING: + import tkinter + logger = get_main_logger() STARTUP = 'journal.startup' @@ -76,11 +77,10 @@ def _(x: str) -> str: # Journal handler -class EDLogs(FileSystemEventHandler): # type: ignore # See below +class EDLogs(FileSystemEventHandler): """Monitoring of Journal files.""" # Magic with FileSystemEventHandler can confuse type checkers when they do not have access to every import - _POLL = 1 # Polling is cheap, so do it often _RE_CANONICALISE = re.compile(r'\$(.+)_name;') _RE_CATEGORY = re.compile(r'\$MICRORESOURCE_CATEGORY_(.+);') @@ -207,7 +207,7 @@ def start(self, root: 'tkinter.Tk') -> bool: # noqa: CCR001 :return: bool - False if we couldn't access/find latest Journal file. """ logger.debug('Begin...') - self.root = root # type: ignore + self.root = root journal_dir = config.get_str('journaldir') if journal_dir == '' or journal_dir is None: @@ -515,8 +515,6 @@ def worker(self) -> None: # noqa: C901, CCR001 else: self.game_was_running = self.game_running() - logger.debug('Done.') - def synthesize_startup_event(self) -> dict[str, Any]: """ Synthesize a 'StartUp' event to notify plugins of initial state. @@ -570,7 +568,7 @@ def parse_entry(self, line: bytes) -> MutableMapping[str, Any]: # noqa: C901, C try: # Preserve property order because why not? entry: MutableMapping[str, Any] = json.loads(line, object_pairs_hook=OrderedDict) - entry['timestamp'] # we expect this to exist # TODO: replace with assert? or an if key in check + assert 'timestamp' in entry, "Timestamp does not exist in the entry" self.__navroute_retry() @@ -933,7 +931,7 @@ def parse_entry(self, line: bytes) -> MutableMapping[str, Any]: # noqa: C901, C ############################################################### if 'StarPos' in entry: # Plugins need this as well, so copy in state - self.state['StarPos'] = tuple(entry['StarPos']) # type: ignore + self.state['StarPos'] = tuple(entry['StarPos']) else: logger.warning(f"'{event_type}' event without 'StarPos' !!!:\n{entry}\n") @@ -1109,7 +1107,7 @@ def parse_entry(self, line: bytes) -> MutableMapping[str, Any]: # noqa: C901, C while attempts < shiplocker_max_attempts: attempts += 1 try: - with open(shiplocker_filename, 'rb') as h: # type: ignore + with open(shiplocker_filename, 'rb') as h: entry = json.load(h, object_pairs_hook=OrderedDict) self.state['ShipLockerJSON'] = entry break @@ -1551,7 +1549,7 @@ def parse_entry(self, line: bytes) -> MutableMapping[str, Any]: # noqa: C901, C entry = json.load(mf) except json.JSONDecodeError: - logger.exception('Failed decoding ModulesInfo.json', exc_info=True) + logger.exception('Failed decoding ModulesInfo.json') else: self.state['ModuleInfo'] = entry @@ -1812,7 +1810,7 @@ def parse_entry(self, line: bytes) -> MutableMapping[str, Any]: # noqa: C901, C self.state['Credits'] -= entry.get('Price', 0) elif event_type == 'carrierbanktransfer': - if (newbal := entry.get('PlayerBalance')): + if newbal := entry.get('PlayerBalance'): self.state['Credits'] = newbal elif event_type == 'carrierdecommission': @@ -1911,7 +1909,7 @@ def suit_sane_name(self, name: str) -> str: return name - def suitloadout_store_from_event(self, entry) -> Tuple[int, int]: + def suitloadout_store_from_event(self, entry) -> tuple[int, int]: """ Store Suit and SuitLoadout data from a journal event. @@ -1990,64 +1988,64 @@ def suit_and_loadout_setcurrent(self, suitid: int, suitloadout_slotid: int) -> b # TODO: *This* will need refactoring and a proper validation infrastructure # designed for this in the future. This is a bandaid for a known issue. - def event_valid_engineerprogress(self, entry) -> bool: # noqa: CCR001 C901 + def event_valid_engineerprogress(self, entry) -> bool: # noqa: CCR001 """ Check an `EngineerProgress` Journal event for validity. :param entry: Journal event dict :return: True if passes validation, else False. """ - # The event should have at least one of these - if 'Engineers' not in entry and 'Progress' not in entry: + engineers_present = 'Engineers' in entry + progress_present = 'Progress' in entry + + if not (engineers_present or progress_present): logger.warning(f"EngineerProgress has neither 'Engineers' nor 'Progress': {entry=}") return False - # But not both of them - if 'Engineers' in entry and 'Progress' in entry: + if engineers_present and progress_present: logger.warning(f"EngineerProgress has BOTH 'Engineers' and 'Progress': {entry=}") return False - if 'Engineers' in entry: + if engineers_present: + engineers = entry['Engineers'] # 'Engineers' version should have a list as value - if not isinstance(entry['Engineers'], list): + if not isinstance(engineers, list): logger.warning(f"EngineerProgress 'Engineers' is not a list: {entry=}") return False # It should have at least one entry? This might still be valid ? - if len(entry['Engineers']) < 1: + if len(engineers) < 1: logger.warning(f"EngineerProgress 'Engineers' list is empty ?: {entry=}") # TODO: As this might be valid, we might want to only log return False # And that list should have all of these keys - for e in entry['Engineers']: - for f in ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress'): - if f not in e: - # For some Progress there's no Rank/RankProgress yet - if f in ('Rank', 'RankProgress'): - if (progress := e.get('Progress', None)) is not None: - if progress in ('Invited', 'Known'): - continue - - logger.warning(f"Engineer entry without '{f}' key: {e=} in {entry=}") - return False - - if 'Progress' in entry: + # For some Progress there's no Rank/RankProgress yet + required_keys = ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress') + for e in engineers: + missing_keys = [key for key in required_keys if key not in e] + if any(key in ('Rank', 'RankProgress') and e.get('Progress') in ('Invited', 'Known') for key in + missing_keys): + continue + + if missing_keys: + logger.warning(f"Engineer entry without '{missing_keys[0]}' key: {e=} in {entry=}") + return False + + if progress_present: # Progress is only a single Engineer, so it's not an array # { "timestamp":"2021-05-24T17:57:52Z", # "event":"EngineerProgress", # "Engineer":"Felicity Farseer", # "EngineerID":300100, # "Progress":"Invited" } - for f in ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress'): - if f not in entry: - # For some Progress there's no Rank/RankProgress yet - if f in ('Rank', 'RankProgress'): - if (progress := entry.get('Progress', None)) is not None: - if progress in ('Invited', 'Known'): - continue - - logger.warning(f"Progress event without '{f}' key: {entry=}") + # For some Progress there's no Rank/RankProgress yet + required_keys = ('Engineer', 'EngineerID', 'Rank', 'Progress', 'RankProgress') + missing_keys = [key for key in required_keys if key not in entry] + if any(key in ('Rank', 'RankProgress') and entry.get('Progress') in ('Invited', 'Known') for key in + missing_keys): + if missing_keys: + logger.warning(f"Progress event without '{missing_keys[0]}' key: {entry=}") return False return True @@ -2152,7 +2150,7 @@ def game_running(self) -> bool: # noqa: CCR001 return True elif sys.platform == 'win32': - def WindowTitle(h): # noqa: N802 # type: ignore + def WindowTitle(h): # noqa: N802 if h: length = GetWindowTextLength(h) + 1 buf = ctypes.create_unicode_buffer(length) @@ -2261,18 +2259,18 @@ def export_ship(self, filename=None) -> None: # noqa: C901, CCR001 return ship = util_ships.ship_file_name(self.state['ShipName'], self.state['ShipType']) - regexp = re.compile(re.escape(ship) + r'\.\d{4}\-\d\d\-\d\dT\d\d\.\d\d\.\d\d\.txt') - oldfiles = sorted((x for x in listdir(config.get_str('outdir')) if regexp.match(x))) # type: ignore + regexp = re.compile(re.escape(ship) + r'\.\d{4}-\d\d-\d\dT\d\d\.\d\d\.\d\d\.txt') + oldfiles = sorted((x for x in listdir(config.get_str('outdir')) if regexp.match(x))) if oldfiles: try: - with open(join(config.get_str('outdir'), oldfiles[-1]), 'r', encoding='utf-8') as h: # type: ignore + with open(join(config.get_str('outdir'), oldfiles[-1]), encoding='utf-8') as h: if h.read() == string: return # same as last time - don't write except UnicodeError: logger.exception("UnicodeError reading old ship loadout with utf-8 encoding, trying without...") try: - with open(join(config.get_str('outdir'), oldfiles[-1]), 'r') as h: # type: ignore + with open(join(config.get_str('outdir'), oldfiles[-1])) as h: if h.read() == string: return # same as last time - don't write @@ -2291,9 +2289,7 @@ def export_ship(self, filename=None) -> None: # noqa: C901, CCR001 # Write ts = strftime('%Y-%m-%dT%H.%M.%S', localtime(time())) - filename = join( # type: ignore - config.get_str('outdir'), f'{ship}.{ts}.txt' - ) + filename = join(config.get_str('outdir'), f'{ship}.{ts}.txt') try: with open(filename, 'wt', encoding='utf-8') as h: @@ -2380,7 +2376,7 @@ def _parse_navroute_file(self) -> dict[str, Any] | None: try: - with open(join(self.currentdir, 'NavRoute.json'), 'r') as f: + with open(join(self.currentdir, 'NavRoute.json')) as f: raw = f.read() except Exception as e: @@ -2391,7 +2387,7 @@ def _parse_navroute_file(self) -> dict[str, Any] | None: data = json.loads(raw) except json.JSONDecodeError: - logger.exception('Failed to decode NavRoute.json', exc_info=True) + logger.exception('Failed to decode NavRoute.json') return None if 'timestamp' not in data: # quick sanity check @@ -2406,7 +2402,7 @@ def _parse_fcmaterials_file(self) -> dict[str, Any] | None: try: - with open(join(self.currentdir, 'FCMaterials.json'), 'r') as f: + with open(join(self.currentdir, 'FCMaterials.json')) as f: raw = f.read() except Exception as e: @@ -2417,7 +2413,7 @@ def _parse_fcmaterials_file(self) -> dict[str, Any] | None: data = json.loads(raw) except json.JSONDecodeError: - logger.exception('Failed to decode FCMaterials.json', exc_info=True) + logger.exception('Failed to decode FCMaterials.json') return None if 'timestamp' not in data: # quick sanity check diff --git a/plug.py b/plug.py index fa7807f2f..de67205b1 100644 --- a/plug.py +++ b/plug.py @@ -1,4 +1,12 @@ -"""Plugin API.""" +""" +plug.py - Plugin API. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" +from __future__ import annotations + import copy import importlib import logging @@ -6,9 +14,8 @@ import os import sys import tkinter as tk -from builtins import object, str from tkinter import ttk -from typing import Any, Callable, List, Mapping, MutableMapping, Optional +from typing import Any, Mapping, MutableMapping import companion import myNotebook as nb # noqa: N813 @@ -26,7 +33,7 @@ class LastError: """Holds the last plugin error.""" - msg: Optional[str] + msg: str | None root: tk.Tk def __init__(self) -> None: @@ -36,10 +43,10 @@ def __init__(self) -> None: last_error = LastError() -class Plugin(object): +class Plugin: """An EDMC plugin.""" - def __init__(self, name: str, loadfile: Optional[str], plugin_logger: Optional[logging.Logger]): + def __init__(self, name: str, loadfile: str | None, plugin_logger: logging.Logger | None): """ Load a single plugin. @@ -49,9 +56,9 @@ def __init__(self, name: str, loadfile: Optional[str], plugin_logger: Optional[l :raises Exception: Typically ImportError or OSError """ self.name: str = name # Display name. - self.folder: Optional[str] = name # basename of plugin folder. None for internal plugins. + self.folder: str | None = name # basename of plugin folder. None for internal plugins. self.module = None # None for disabled plugins. - self.logger: Optional[logging.Logger] = plugin_logger + self.logger: logging.Logger | None = plugin_logger if loadfile: logger.info(f'loading plugin "{name.replace(".", "_")}" from "{loadfile}"') @@ -64,7 +71,7 @@ def __init__(self, name: str, loadfile: Optional[str], plugin_logger: Optional[l ).load_module() if getattr(module, 'plugin_start3', None): newname = module.plugin_start3(os.path.dirname(loadfile)) - self.name = newname and str(newname) or name + self.name = str(newname) if newname else self.name self.module = module elif getattr(module, 'plugin_start', None): logger.warning(f'plugin {name} needs migrating\n') @@ -77,7 +84,7 @@ def __init__(self, name: str, loadfile: Optional[str], plugin_logger: Optional[l else: logger.info(f'plugin {name} disabled') - def _get_func(self, funcname: str) -> Optional[Callable]: + def _get_func(self, funcname: str): # Removing Unhelpful Type Hint """ Get a function from a plugin. @@ -86,7 +93,7 @@ def _get_func(self, funcname: str) -> Optional[Callable]: """ return getattr(self.module, funcname, None) - def get_app(self, parent: tk.Frame) -> Optional[tk.Frame]: + def get_app(self, parent: tk.Frame) -> tk.Frame | None: """ If the plugin provides mainwindow content create and return it. @@ -100,7 +107,7 @@ def get_app(self, parent: tk.Frame) -> Optional[tk.Frame]: if appitem is None: return None - elif isinstance(appitem, tuple): + if isinstance(appitem, tuple): if ( len(appitem) != 2 or not isinstance(appitem[0], tk.Widget) @@ -118,7 +125,7 @@ def get_app(self, parent: tk.Frame) -> Optional[tk.Frame]: return None - def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> Optional[tk.Frame]: + def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame | None: """ If the plugin provides a prefs frame, create and return it. @@ -132,41 +139,50 @@ def get_prefs(self, parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> Op if plugin_prefs: try: frame = plugin_prefs(parent, cmdr, is_beta) - if not isinstance(frame, nb.Frame): - raise AssertionError - return frame + if isinstance(frame, nb.Frame): + return frame + raise AssertionError except Exception: logger.exception(f'Failed for Plugin "{self.name}"') return None -def load_plugins(master: tk.Tk) -> None: # noqa: CCR001 +def load_plugins(master: tk.Tk) -> None: """Find and load all plugins.""" last_error.root = master + internal = _load_internal_plugins() + PLUGINS.extend(sorted(internal, key=lambda p: operator.attrgetter('name')(p).lower())) + + # Add plugin folder to load path so packages can be loaded from plugin folder + sys.path.append(config.plugin_dir) + + found = _load_found_plugins() + PLUGINS.extend(sorted(found, key=lambda p: operator.attrgetter('name')(p).lower())) + + +def _load_internal_plugins(): internal = [] for name in sorted(os.listdir(config.internal_plugin_dir_path)): - if name.endswith('.py') and not name[0] in ['.', '_']: + if name.endswith('.py') and name[0] not in ['.', '_']: try: plugin = Plugin(name[:-3], os.path.join(config.internal_plugin_dir_path, name), logger) - plugin.folder = None # Suppress listing in Plugins prefs tab + plugin.folder = None internal.append(plugin) except Exception: logger.exception(f'Failure loading internal Plugin "{name}"') - PLUGINS.extend(sorted(internal, key=lambda p: operator.attrgetter('name')(p).lower())) + return internal - # Add plugin folder to load path so packages can be loaded from plugin folder - sys.path.append(config.plugin_dir) +def _load_found_plugins(): found = [] # Load any plugins that are also packages first, but note it's *still* # 100% relying on there being a `load.py`, as only that will be loaded. # The intent here is to e.g. have EDMC-Overlay load before any plugins # that depend on it. - for name in sorted( - os.listdir(config.plugin_dir_path), - key=lambda n: (not os.path.isfile(os.path.join(config.plugin_dir_path, n, '__init__.py')), n.lower()) - ): + + for name in sorted(os.listdir(config.plugin_dir_path), key=lambda n: ( + not os.path.isfile(os.path.join(config.plugin_dir_path, n, '__init__.py')), n.lower())): if not os.path.isdir(os.path.join(config.plugin_dir_path, name)) or name[0] in ['.', '_']: pass elif name.endswith('.disabled'): @@ -177,19 +193,17 @@ def load_plugins(master: tk.Tk) -> None: # noqa: CCR001 # Add plugin's folder to load path in case plugin has internal package dependencies sys.path.append(os.path.join(config.plugin_dir_path, name)) - # Create a logger for this 'found' plugin. Must be before the - # load.py is loaded. import EDMCLogging - + # Create a logger for this 'found' plugin. Must be before the load.py is loaded. plugin_logger = EDMCLogging.get_plugin_logger(name) found.append(Plugin(name, os.path.join(config.plugin_dir_path, name, 'load.py'), plugin_logger)) except Exception: logger.exception(f'Failure loading found Plugin "{name}"') pass - PLUGINS.extend(sorted(found, key=lambda p: operator.attrgetter('name')(p).lower())) + return found -def provides(fn_name: str) -> List[str]: +def provides(fn_name: str) -> list[str]: """ Find plugins that provide a function. @@ -202,7 +216,7 @@ def provides(fn_name: str) -> List[str]: def invoke( plugin_name: str, fallback: str | None, fn_name: str, *args: Any -) -> Optional[str]: +) -> str | None: """ Invoke a function on a named plugin. @@ -228,7 +242,7 @@ def invoke( return None -def notify_stop() -> Optional[str]: +def notify_stop() -> str | None: """ Notify each plugin that the program is closing. @@ -251,6 +265,16 @@ def notify_stop() -> Optional[str]: return error +def _notify_prefs_plugins(fn_name: str, cmdr: str | None, is_beta: bool) -> None: + for plugin in PLUGINS: + prefs_callback = plugin._get_func(fn_name) + if prefs_callback: + try: + prefs_callback(cmdr, is_beta) + except Exception: + logger.exception(f'Plugin "{plugin.name}" failed') + + def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: """ Notify plugins that the Cmdr was changed while the settings dialog is open. @@ -259,13 +283,7 @@ def notify_prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: :param cmdr: current Cmdr name (or None). :param is_beta: whether the player is in a Beta universe. """ - for plugin in PLUGINS: - prefs_cmdr_changed = plugin._get_func('prefs_cmdr_changed') - if prefs_cmdr_changed: - try: - prefs_cmdr_changed(cmdr, is_beta) - except Exception: - logger.exception(f'Plugin "{plugin.name}" failed') + _notify_prefs_plugins("prefs_cmdr_changed", cmdr, is_beta) def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None: @@ -278,20 +296,14 @@ def notify_prefs_changed(cmdr: str | None, is_beta: bool) -> None: :param cmdr: current Cmdr name (or None). :param is_beta: whether the player is in a Beta universe. """ - for plugin in PLUGINS: - prefs_changed = plugin._get_func('prefs_changed') - if prefs_changed: - try: - prefs_changed(cmdr, is_beta) - except Exception: - logger.exception(f'Plugin "{plugin.name}" failed') + _notify_prefs_plugins("prefs_changed", cmdr, is_beta) def notify_journal_entry( cmdr: str, is_beta: bool, system: str | None, station: str | None, entry: MutableMapping[str, Any], state: Mapping[str, Any] -) -> Optional[str]: +) -> str | None: """ Send a journal entry to each plugin. @@ -303,7 +315,7 @@ def notify_journal_entry( :param is_beta: whether the player is in a Beta universe. :returns: Error message from the first plugin that returns one (if any) """ - if entry['event'] in ('Location'): + if entry['event'] in 'Location': logger.trace_if('journal.locations', 'Notifying plugins of "Location" event') error = None @@ -323,7 +335,7 @@ def notify_journal_entry_cqc( cmdr: str, is_beta: bool, entry: MutableMapping[str, Any], state: Mapping[str, Any] -) -> Optional[str]: +) -> str | None: """ Send an in-CQC journal entry to each plugin. @@ -348,10 +360,7 @@ def notify_journal_entry_cqc( return error -def notify_dashboard_entry( - cmdr: str, is_beta: bool, - entry: MutableMapping[str, Any], -) -> Optional[str]: +def notify_dashboard_entry(cmdr: str, is_beta: bool, entry: MutableMapping[str, Any],) -> str | None: """ Send a status entry to each plugin. @@ -373,10 +382,7 @@ def notify_dashboard_entry( return error -def notify_capidata( - data: companion.CAPIData, - is_beta: bool -) -> Optional[str]: +def notify_capidata(data: companion.CAPIData, is_beta: bool) -> str | None: """ Send the latest EDMC data from the FD servers to each plugin. @@ -404,9 +410,7 @@ def notify_capidata( return error -def notify_capi_fleetcarrierdata( - data: companion.CAPIData -) -> str | None: +def notify_capi_fleetcarrierdata(data: companion.CAPIData) -> str | None: """ Send the latest CAPI Fleetcarrier data from the FD servers to each plugin. diff --git a/stats.py b/stats.py index d690dce64..041e47f36 100644 --- a/stats.py +++ b/stats.py @@ -1,12 +1,16 @@ -"""CMDR Status information.""" +""" +stats.py - CMDR Status Information. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" import csv import json import sys -import tkinter import tkinter as tk from tkinter import ttk from typing import TYPE_CHECKING, Any, AnyStr, Callable, NamedTuple, Sequence, cast - import companion import EDMCLogging import myNotebook as nb # noqa: N813 @@ -488,11 +492,11 @@ def addpagerow( :param align: The alignment of the data, defaults to tk.W """ row = -1 # To silence unbound warnings - for i in range(len(content)): - # label = HyperlinkLabel(parent, text=content[i], popup_copy=True) - label = nb.Label(parent, text=content[i]) + for i, col_content in enumerate(content): + # label = HyperlinkLabel(parent, text=col_content, popup_copy=True) + label = nb.Label(parent, text=col_content) if with_copy: - label.bind('', self.copy_callback(label, content[i])) + label.bind('', self.copy_callback(label, col_content)) if i == 0: label.grid(padx=10, sticky=tk.W) @@ -512,7 +516,7 @@ def credits(self, value: int) -> str: @staticmethod def copy_callback(label: tk.Label, text_to_copy: str) -> Callable[..., None]: """Copy data in Label to clipboard.""" - def do_copy(event: tkinter.Event) -> None: + def do_copy(event: tk.Event) -> None: label.clipboard_clear() label.clipboard_append(text_to_copy) old_bg = label['bg'] From 170b86b5dc0c17cb1ec1fbc7f0423b29732a6c18 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Thu, 30 Nov 2023 21:45:04 -0500 Subject: [PATCH 2/3] [2051] Remove More Old Types --- build.py | 9 ++++----- docs/examples/click_counter/load.py | 9 ++++----- hotkey/__init__.py | 5 +++-- outfitting.py | 1 - plugins/eddn.py | 9 ++++----- prefs.py | 5 +++-- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/build.py b/build.py index 7d182641f..d6cecb7dd 100644 --- a/build.py +++ b/build.py @@ -9,7 +9,6 @@ import shutil import sys import pathlib -from typing import List, Tuple from string import Template from os.path import join, isdir import py2exe @@ -57,8 +56,8 @@ def system_check(dist_dir: str) -> str: def generate_data_files( - app_name: str, gitversion_file: str, plugins: List[str] -) -> List[Tuple[str, List[str]]]: + app_name: str, gitversion_file: str, plugins: list[str] +) -> list[tuple[str, list[str]]]: """Create the required datafiles to build.""" l10n_dir = "L10n" fdevids_dir = "FDevIDs" @@ -106,7 +105,7 @@ def build() -> None: gitversion_filename: str = system_check(dist_dir) # Constants - plugins: List[str] = [ + plugins: list[str] = [ "plugins/coriolis.py", "plugins/eddn.py", "plugins/edsm.py", @@ -141,7 +140,7 @@ def build() -> None: } # Function to generate DATA_FILES list - data_files: List[Tuple[str, List[str]]] = generate_data_files( + data_files: list[tuple[str, list[str]]] = generate_data_files( appname, gitversion_filename, plugins ) diff --git a/docs/examples/click_counter/load.py b/docs/examples/click_counter/load.py index 0c65fc29a..ad90a084a 100644 --- a/docs/examples/click_counter/load.py +++ b/docs/examples/click_counter/load.py @@ -3,11 +3,10 @@ It adds a single button to the EDMC interface that displays the number of times it has been clicked. """ +from __future__ import annotations import logging import tkinter as tk -from typing import Optional - import myNotebook as nb # noqa: N813 from config import appname, config @@ -48,7 +47,7 @@ def on_unload(self) -> None: """ self.on_preferences_closed("", False) # Save our prefs - def setup_preferences(self, parent: nb.Notebook, cmdr: str, is_beta: bool) -> Optional[tk.Frame]: + def setup_preferences(self, parent: nb.Notebook, cmdr: str, is_beta: bool) -> tk.Frame | None: """ setup_preferences is called by plugin_prefs below. @@ -127,7 +126,7 @@ def plugin_stop() -> None: return cc.on_unload() -def plugin_prefs(parent: nb.Notebook, cmdr: str, is_beta: bool) -> Optional[tk.Frame]: +def plugin_prefs(parent: nb.Notebook, cmdr: str, is_beta: bool) -> tk.Frame | None: """ Handle preferences tab for the plugin. @@ -145,7 +144,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: return cc.on_preferences_closed(cmdr, is_beta) -def plugin_app(parent: tk.Frame) -> Optional[tk.Frame]: +def plugin_app(parent: tk.Frame) -> tk.Frame | None: """ Set up the UI of the plugin. diff --git a/hotkey/__init__.py b/hotkey/__init__.py index 52f8d6842..e75158075 100644 --- a/hotkey/__init__.py +++ b/hotkey/__init__.py @@ -1,10 +1,11 @@ """Handle keyboard input for manual update triggering.""" +from __future__ import annotations + # -*- coding: utf-8 -*- import abc import sys from abc import abstractmethod -from typing import Optional, Tuple, Union class AbstractHotkeyMgr(abc.ABC): @@ -31,7 +32,7 @@ def acquire_stop(self) -> None: pass @abstractmethod - def fromevent(self, event) -> Optional[Union[bool, Tuple]]: + def fromevent(self, event) -> bool | tuple | None: """ Return configuration (keycode, modifiers) or None=clear or False=retain previous. diff --git a/outfitting.py b/outfitting.py index 99d841f1b..07e8ae762 100644 --- a/outfitting.py +++ b/outfitting.py @@ -10,7 +10,6 @@ import json from collections import OrderedDict from typing import OrderedDict as OrderedDictT - from config import config from edmc_data import ( outfitting_armour_map as armour_map, diff --git a/plugins/eddn.py b/plugins/eddn.py index 0ab06a839..687b39b7f 100644 --- a/plugins/eddn.py +++ b/plugins/eddn.py @@ -41,7 +41,6 @@ MutableMapping, ) from typing import OrderedDict as OrderedDictT -from typing import Tuple, Union import requests import companion import edmc_data @@ -100,8 +99,8 @@ def __init__(self): # Avoid duplicates self.marketId: str | None = None self.commodities: list[OrderedDictT[str, Any]] | None = None - self.outfitting: Tuple[bool, list[str]] | None = None - self.shipyard: Tuple[bool, list[Mapping[str, Any]]] | None = None + self.outfitting: tuple[bool, list[str]] | None = None + self.shipyard: tuple[bool, list[Mapping[str, Any]]] | None = None self.fcmaterials_marketid: int = 0 self.fcmaterials: list[OrderedDictT[str, Any]] | None = None self.fcmaterials_capi_marketid: int = 0 @@ -714,7 +713,7 @@ def export_commodities(self, data: CAPIData, is_beta: bool) -> None: # noqa: CC # Send any FCMaterials.json-equivalent 'orders' as well self.export_capi_fcmaterials(data, is_beta, horizons) - def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[dict, dict]: + def safe_modules_and_ships(self, data: Mapping[str, Any]) -> tuple[dict, dict]: """ Produce a sanity-checked version of ships and modules from CAPI data. @@ -1091,7 +1090,7 @@ def entry_augment_system_data( entry: MutableMapping[str, Any], system_name: str, system_coordinates: list - ) -> Union[str, MutableMapping[str, Any]]: + ) -> str | MutableMapping[str, Any]: """ Augment a journal entry with necessary system data. diff --git a/prefs.py b/prefs.py index e1ab6ea2b..6e423acc0 100644 --- a/prefs.py +++ b/prefs.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """EDMC preferences library.""" +from __future__ import annotations import contextlib import logging @@ -13,7 +14,7 @@ from tkinter import colorchooser as tkColorChooser # type: ignore # noqa: N812 from tkinter import ttk from types import TracebackType -from typing import TYPE_CHECKING, Any, Callable, Optional, Type, Union +from typing import TYPE_CHECKING, Any, Callable, Optional, Type import myNotebook as nb # noqa: N813 import plug @@ -276,7 +277,7 @@ def __init__(self, parent: tk.Tk, callback: Optional[Callable]): self.resizable(tk.FALSE, tk.FALSE) - self.cmdr: Union[str, bool, None] = False # Note if Cmdr changes in the Journal + self.cmdr: str | bool | None = False # Note if Cmdr changes in the Journal self.is_beta: bool = False # Note if Beta status changes in the Journal self.cmdrchanged_alarm: Optional[str] = None # This stores an ID that can be used to cancel a scheduled call From f30fb56f6e1767d05d235f6e1077dd91eddce5e0 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Thu, 30 Nov 2023 22:05:59 -0500 Subject: [PATCH 3/3] [2051] Re-add missing context It's not a refactor if I don't accidentally nuke a lang comment --- companion.py | 4 ++++ plug.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/companion.py b/companion.py index 4f729479c..3384af364 100644 --- a/companion.py +++ b/companion.py @@ -865,11 +865,15 @@ def handle_http_error(response: requests.Response, endpoint: str): self.dump(response) if response.status_code == 401: + # TODO: This needs to try a REFRESH, not a full re-auth + # No need for translation, we'll go straight into trying new Auth + # and thus any message would be overwritten. # CAPI doesn't think we're Auth'd raise CredentialsRequireRefresh('Frontier CAPI said "unauthorized"') if response.status_code == 418: # "I'm a teapot" - used to signal maintenance + # LANG: Frontier CAPI returned 418, meaning down for maintenance raise ServerError(_("Frontier CAPI down for maintenance")) logger.exception('Frontier CAPI: Misc. Error') diff --git a/plug.py b/plug.py index de67205b1..b343677bd 100644 --- a/plug.py +++ b/plug.py @@ -84,7 +84,7 @@ def __init__(self, name: str, loadfile: str | None, plugin_logger: logging.Logge else: logger.info(f'plugin {name} disabled') - def _get_func(self, funcname: str): # Removing Unhelpful Type Hint + def _get_func(self, funcname: str): """ Get a function from a plugin.