From ef8ba7b11f2c485cb3b64d1b3fa3fa72b9573938 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Sat, 6 Jun 2026 14:24:44 -0400 Subject: [PATCH 1/3] Hand Over OS to Pathlib where possible --- EDMC.py | 5 +- EDMCLogging.py | 3 +- EDMCSystemProfiler.py | 13 ++--- EDMarketConnector.py | 6 +-- build.py | 34 ++++++------ collate.py | 8 +-- config/__init__.py | 2 +- dashboard.py | 1 + edshipyard.py | 7 ++- journal_lock.py | 7 ++- l10n.py | 13 ++--- loadout.py | 3 +- monitor.py | 53 ++++++++++--------- plug.py | 5 +- plugin_browser.py | 21 ++++---- prefs.py | 16 ++---- tests/journal_lock.py/test_journal_lock.py | 60 ++++++---------------- util/text.py | 2 + 18 files changed, 112 insertions(+), 147 deletions(-) diff --git a/EDMC.py b/EDMC.py index 3314e8c581..793abffa35 100755 --- a/EDMC.py +++ b/EDMC.py @@ -13,6 +13,7 @@ import os import queue import sys +from pathlib import Path from time import sleep, time from typing import TYPE_CHECKING, Any from common_utils import log_locale, SERVER_RETRY @@ -226,14 +227,14 @@ def main() -> None: # noqa: C901, CCR001 # system, chances are its the current locale, and not utf-8. Otherwise if it was copied, its probably # utf8. Either way, try the system FIRST because reading something like cp1251 in UTF-8 results in garbage # but the reverse results in an exception. - json_file = os.path.abspath(args.j) + json_file = str(Path(args.j).resolve()) try: with open(json_file) as file_handle: data = json.load(file_handle) except UnicodeDecodeError: with open(json_file, encoding='utf-8') as file_handle: data = json.load(file_handle) - config.set('querytime', int(os.path.getmtime(args.j))) + config.set('querytime', int(Path(args.j).stat().st_mtime)) else: # Get state from latest Journal file diff --git a/EDMCLogging.py b/EDMCLogging.py index 3683babaa9..997a064466 100644 --- a/EDMCLogging.py +++ b/EDMCLogging.py @@ -184,7 +184,6 @@ class InterceptHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: """Intercept standard logging and send to Loguru with context.""" - level: str | int try: level = loguru_logger.level(record.levelname).name @@ -367,9 +366,11 @@ def get_plugin_logger(plugin_name: str, loglevel: int = _default_loglevel) -> Lo class EDMCContextFilter(logging.Filter): """ Legacy compatibility shim. + EDMC now intercepts standard logging globally via Loguru, making manual context filtering obsolete. """ + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) warnings.warn( diff --git a/EDMCSystemProfiler.py b/EDMCSystemProfiler.py index c4883cec96..17c8d39782 100644 --- a/EDMCSystemProfiler.py +++ b/EDMCSystemProfiler.py @@ -11,7 +11,7 @@ import webbrowser import platform import sys -from os import chdir, environ, path +from os import chdir, environ import pathlib import logging from journal_lock import JournalLock @@ -19,10 +19,11 @@ if getattr(sys, "frozen", False): # Under py2exe sys.path[0] is the executable name if sys.platform == "win32": - chdir(path.dirname(sys.path[0])) + exe_dir = pathlib.Path(sys.path[0]).parent + chdir(exe_dir) # Allow executable to be invoked from any cwd - environ["TCL_LIBRARY"] = path.join(path.dirname(sys.path[0]), "lib", "tcl") - environ["TK_LIBRARY"] = path.join(path.dirname(sys.path[0]), "lib", "tk") + environ["TCL_LIBRARY"] = str(exe_dir / "lib" / "tcl") + environ["TK_LIBRARY"] = str(exe_dir / "lib" / "tk") else: # We still want to *try* to have CWD be where the main script is, even if @@ -115,12 +116,12 @@ def main(active_config: config.Config) -> None: root.withdraw() # Hide the window initially to calculate the dimensions try: icon_image = tk.PhotoImage( - file=path.join(active_config.respath_path, "io.edcd.EDMarketConnector.png") + file=pathlib.Path(active_config.respath_path) / "io.edcd.EDMarketConnector.png" ) root.iconphoto(True, icon_image) except tk.TclError: - root.iconbitmap(path.join(active_config.respath_path, "EDMarketConnector.ico")) + root.iconbitmap(pathlib.Path(active_config.respath_path) / "EDMarketConnector.ico") sys_report = get_sys_report(active_config) diff --git a/EDMarketConnector.py b/EDMarketConnector.py index cb56daeca8..ede8b80f76 100644 --- a/EDMarketConnector.py +++ b/EDMarketConnector.py @@ -20,7 +20,7 @@ import sys import threading import webbrowser -from os import chdir, environ +from os import environ from time import localtime, strftime, time from typing import TYPE_CHECKING, Any, Literal, MutableMapping from constants import applongname, appname, protocolhandler_redirect @@ -39,7 +39,7 @@ else: # We still want to *try* to have CWD be where the main script is, even if # not frozen. - chdir(pathlib.Path(__file__).parent) + os.chdir(pathlib.Path(__file__).parent) # config will now cause an appname logger to be set up, so we need the # console redirect before this @@ -412,7 +412,7 @@ def already_running_popup(): sys.stdout.truncate() else: # Potential Git Repo? - if os.path.exists(".git"): + if pathlib.Path(".git").exists(): try: git_branch = subprocess.check_output( ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], diff --git a/build.py b/build.py index da670dc90b..a5ba1f2928 100644 --- a/build.py +++ b/build.py @@ -6,8 +6,8 @@ See LICENSE file. """ +from collections.abc import Sequence import datetime -import os import shutil import sys import pathlib @@ -73,9 +73,9 @@ def system_check(dist_dir: pathlib.Path) -> str: def generate_data_files( app_name: str, gitversion_file: str, plugins: list[str] -) -> list[tuple[object, object]]: +) -> Sequence[tuple[object, object]]: """Create the required datafiles to build.""" - l10n_dir = "L10n" + l10n_dir = pathlib.Path("L10n") fdevids_dir = pathlib.Path("FDevIDs") license_dir = pathlib.Path("docs/Licenses") data_files = [ @@ -97,26 +97,22 @@ def generate_data_files( ], ), ( - l10n_dir, - [ - pathlib.Path(l10n_dir) / x - for x in os.listdir(l10n_dir) - if x.endswith(".strings") - ], + str(l10n_dir), + [str(x) for x in l10n_dir.iterdir() if x.suffix == ".strings"], ), ( - fdevids_dir, + str(fdevids_dir), [ - pathlib.Path(fdevids_dir / "commodity.csv"), - pathlib.Path(fdevids_dir / "rare_commodity.csv"), + str(fdevids_dir / "commodity.csv"), + str(fdevids_dir / "rare_commodity.csv"), ], ), ("plugins", plugins), ] # Add all files recursively from license directories - for root, dirs, files in os.walk(license_dir): - file_list = [os.path.join(root, f) for f in files] - dest_dir = os.path.join(license_dir, os.path.relpath(root, license_dir)) + for root, dirs, files in license_dir.walk(): + file_list = [str(root / f) for f in files] + dest_dir = str(license_dir / root.relative_to(license_dir)) data_files.append((dest_dir, file_list)) return data_files @@ -131,9 +127,9 @@ def _scan_dist_for_modules( # noqa: C901, CCR001 def add(name: str, origin: str): modules.setdefault(name, set()).add(origin) - for root, dirs, files in os.walk(dist_dir): + for root, dirs, files in dist_dir.walk(): for name in files: - path = pathlib.Path(root) / name + path = root / name # root is a Path object, name is a string # Loose files if path.suffix in (".py", ".pyc"): @@ -153,12 +149,12 @@ def add(name: str, origin: str): add(top, str(rel)) # Zips - if path.suffix == ".zip": + elif path.suffix == ".zip": try: with zipfile.ZipFile(path) as z: for info in z.infolist(): if info.filename.endswith((".py", ".pyc")): - parts = info.filename.split("/") # type: ignore + parts = tuple(info.filename.split("/")) if not parts: continue diff --git a/collate.py b/collate.py index 07830770c7..fc51705daf 100755 --- a/collate.py +++ b/collate.py @@ -14,7 +14,6 @@ import csv import json -import os import pathlib import sys from traceback import print_exc @@ -33,11 +32,8 @@ def __make_backup(file_name: pathlib.Path, suffix: str = '.bak') -> None: :param suffix: The suffix to use for backup files (default '.bak') """ backup_name = file_name.parent / (file_name.name + suffix) - - if pathlib.Path.is_file(backup_name): - os.unlink(backup_name) - - os.rename(file_name, backup_name) + backup_name.unlink(missing_ok=True) + file_name.rename(backup_name) def addcommodities(data) -> None: # noqa: CCR001 diff --git a/config/__init__.py b/config/__init__.py index 841eaae70b..afbdf11944 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -91,7 +91,7 @@ def git_shorthash_from_head() -> str | None: :return: str | None: None if we couldn't determine the short hash. """ - if IS_FROZEN or not os.path.exists(".git"): + if IS_FROZEN or not pathlib.Path(".git").exists(): return None try: diff --git a/dashboard.py b/dashboard.py index 17d8aa65b8..fd38189a35 100644 --- a/dashboard.py +++ b/dashboard.py @@ -163,6 +163,7 @@ def process(self, logfile: str | None = None) -> None: def poll(self, first_time: bool = False) -> None: """ Legacy compatibility shim for backwards compatibility with plugins. + Status.json is now handled entirely via filesystem event observers. """ warnings.warn( diff --git a/edshipyard.py b/edshipyard.py index 04b56ba923..cc982f85bb 100644 --- a/edshipyard.py +++ b/edshipyard.py @@ -2,7 +2,6 @@ from __future__ import annotations import json -import os import pathlib import re import time @@ -199,7 +198,11 @@ def class_rating(module: __Module) -> str: # Look for last ship of this type ship = util_ships.ship_file_name(data['ship'].get('shipName'), data['ship']['name']) regexp = re.compile(re.escape(ship) + r'\.\d{4}-\d\d-\d\dT\d\d\.\d\d\.\d\d\.txt') - oldfiles = sorted([x for x in os.listdir(config.get_str('outdir')) if regexp.match(x)]) + out_dir = pathlib.Path(config.get_str('outdir')) + oldfiles = sorted( + [x for x in out_dir.iterdir() if regexp.match(x.name)], + key=lambda p: p.name # Sort based on the filename string + ) if oldfiles: with (pathlib.Path(config.get_str('outdir')) / oldfiles[-1]).open() as h: if h.read() == string: diff --git a/journal_lock.py b/journal_lock.py index 604cf85062..433bf3b277 100644 --- a/journal_lock.py +++ b/journal_lock.py @@ -7,7 +7,6 @@ """ from __future__ import annotations -import os import pathlib import sys import tkinter as tk @@ -125,10 +124,10 @@ def release_lock(self) -> bool: self.locked = False # Physically remove the lockfile from disk on a clean exit - if self.journal_dir_lockfile_name and self.journal_dir_lockfile_name.exists(): + if self.journal_dir_lockfile_name: try: - os.remove(self.journal_dir_lockfile_name) - except Exception: + self.journal_dir_lockfile_name.unlink(missing_ok=True) + except OSError: pass # Prevent crashing if a file hook holds it open briefly during shutdown return True diff --git a/l10n.py b/l10n.py index 6018022966..2278061edb 100755 --- a/l10n.py +++ b/l10n.py @@ -15,7 +15,6 @@ import re import sys from contextlib import suppress -from os import listdir, sep from typing import TextIO, cast from collections.abc import Iterable import pathlib @@ -106,7 +105,7 @@ def install(self, lang: str | None = None) -> None: # noqa: CCR001 return self.translations = {None: self.contents(cast(str, lang))} - for plugin in listdir(config.plugin_dir_path): + for plugin in (x.name for x in pathlib.Path(config.plugin_dir_path).iterdir()): plugin_path = config.plugin_dir_path / plugin / LOCALISATION_DIR if pathlib.Path.is_dir(plugin_path): try: @@ -162,9 +161,11 @@ def translate(self, x: str, context: str | None = None, lang: str | None = None) plugin_path: pathlib.Path | None = None if context: - # TODO: There is probably a better way to go about this now. - plugin_name = context[len(config.plugin_dir)+1:].split(sep)[0] - plugin_path = config.plugin_dir_path / plugin_name / LOCALISATION_DIR + context_path = pathlib.Path(context) + # .is_relative_to() prevents crashes if context isn't actually in the plugin dir + if context_path.is_relative_to(config.plugin_dir_path): + plugin_name = context_path.relative_to(config.plugin_dir_path).parts[0] + plugin_path = config.plugin_dir_path / plugin_name / LOCALISATION_DIR if lang: contents: dict[str, str] = self.contents(lang=lang, plugin_path=plugin_path) @@ -192,7 +193,7 @@ def available(self) -> set[str]: """Return a list of available language codes.""" path = self.respath() - available = {x[:-len('.strings')] for x in listdir(path) if x.endswith('.strings')} + available = {x.stem for x in pathlib.Path(path).iterdir() if x.suffix == '.strings'} return available diff --git a/loadout.py b/loadout.py index 2690133dbb..9d72029de9 100644 --- a/loadout.py +++ b/loadout.py @@ -10,7 +10,6 @@ import json import re import time -from os import listdir from pathlib import Path import companion import util_ships @@ -43,7 +42,7 @@ def export(data: companion.CAPIData, requested_filename: str | None = None) -> N # Look for last ship of this type ship = util_ships.ship_file_name(data['ship'].get('shipName'), data['ship']['name']) regexp = re.compile(re.escape(ship) + r'\.\d\d\d\d-\d\d-\d\dT\d\d\.\d\d\.\d\d\.txt') - oldfiles = sorted([x for x in listdir(config.get_str('outdir')) if regexp.match(x)]) + oldfiles = sorted([x.name for x in Path(config.get_str('outdir')).iterdir() if regexp.match(x.name)]) if oldfiles: with open(Path(config.get_str('outdir')) / Path(oldfiles[-1])) as h: if h.read() == string: diff --git a/monitor.py b/monitor.py index 08640e19cb..8e2ff650ee 100644 --- a/monitor.py +++ b/monitor.py @@ -15,8 +15,7 @@ import threading from datetime import datetime from collections import defaultdict -from os import SEEK_END, SEEK_SET, listdir -from os.path import basename, expanduser, getctime, isdir, join +from os import SEEK_END, SEEK_SET from time import gmtime, localtime, mktime, sleep, strftime, strptime, time from typing import TYPE_CHECKING, Any, BinaryIO from collections.abc import MutableMapping @@ -203,13 +202,15 @@ def start(self, root: tkinter.Tk) -> bool: # noqa: CCR001 if journal_dir == '' or journal_dir is None: journal_dir = config.default_journal_dir - logdir = expanduser(journal_dir) + logdir_path = pathlib.Path(journal_dir).expanduser() - if not logdir or not isdir(logdir): - logger.error(f'Journal Directory is invalid: "{logdir}"') + if not logdir_path or not logdir_path.is_dir(): + logger.error(f'Journal Directory is invalid: "{logdir_path}"') self.stop() return False + logdir = str(logdir_path.resolve()) + if self.currentdir and self.currentdir != logdir: logger.debug(f'Journal Directory changed? Was "{self.currentdir}", now "{logdir}"') self.stop() @@ -273,14 +274,16 @@ def journal_newest_filename(self, journals_dir) -> str | None: if journals_dir is None: return None - journal_files = (x for x in listdir(journals_dir) if self._RE_LOGFILE.search(x)) - if journal_files: - # Odyssey Update 11 has, e.g. Journal.2022-03-15T152503.01.log - # Horizons Update 11 equivalent: Journal.220315152335.01.log - # So we can no longer use a naive sort. - journals_dir_path = pathlib.Path(journals_dir) - journal_files = (journals_dir_path / pathlib.Path(x) for x in journal_files) - return str(max(journal_files, key=getctime)) + journals_path = pathlib.Path(journals_dir) + try: + journal_files = [x for x in journals_path.iterdir() if self._RE_LOGFILE.search(x.name)] + if journal_files: + # Odyssey Update 11 has, e.g. Journal.2022-03-15T152503.01.log + # Horizons Update 11 equivalent: Journal.220315152335.01.log + # So we can no longer use a naive sort. + return str(max(journal_files, key=lambda x: x.stat().st_ctime)) + except Exception: + logger.exception('Failed to find latest logfile') return None @@ -350,8 +353,8 @@ def running(self) -> bool: def on_created(self, event: FileSystemEvent) -> None: """Watchdog callback when, e.g. client (re)started.""" - if not event.is_directory and self._RE_LOGFILE.search(str(basename(event.src_path))): - + src_path = event.src_path.decode() if isinstance(event.src_path, bytes) else event.src_path + if not event.is_directory and self._RE_LOGFILE.search(pathlib.Path(src_path).name): self.logfile = event.src_path # type: ignore def worker(self) -> None: # noqa: C901, CCR001 @@ -1111,8 +1114,8 @@ def parse_entry(self, line: bytes) -> MutableMapping[str, Any]: # noqa: C901, C elif event_type == 'cargo' and entry.get('Vessel') == 'Ship': self.state['Cargo'] = defaultdict(int) # From 3.3 full Cargo event (after the first one) is written to a separate file - if 'Inventory' not in entry: - with open(join(self.currentdir, 'Cargo.json'), 'rb') as h: # type: ignore + if 'Inventory' not in entry and self.currentdir: + with pathlib.Path(self.currentdir, 'Cargo.json').open('rb') as h: entry = json.load(h) self.state['CargoJSON'] = entry @@ -1583,8 +1586,8 @@ def parse_entry(self, line: bytes) -> MutableMapping[str, Any]: # noqa: C901, C if fcmaterials := self.__fcmaterials_retry(): entry = fcmaterials - elif event_type == 'moduleinfo': - with open(join(self.currentdir, 'ModulesInfo.json'), 'rb') as mf: # type: ignore + elif event_type == 'moduleinfo' and self.currentdir: + with pathlib.Path(self.currentdir, 'ModulesInfo.json').open('rb') as mf: try: entry = json.load(mf) @@ -2308,17 +2311,17 @@ def export_ship(self, filename=None) -> None: # noqa: C901, CCR001 ship = util_ships.ship_file_name(self.state['ShipName'], self.state['ShipType']) regexp = re.compile(re.escape(ship) + r'\.\d{4}-\d\d-\d\dT\d\d\.\d\d\.\d\d\.txt') - oldfiles = sorted(x for x in listdir(config.get_str('outdir')) if regexp.match(x)) + oldfiles = sorted(x.name for x in pathlib.Path(config.get_str('outdir')).iterdir() if regexp.match(x.name)) if oldfiles: try: - with open(join(config.get_str('outdir'), oldfiles[-1]), encoding='utf-8') as h: + with (pathlib.Path(config.get_str('outdir')) / oldfiles[-1]).open(encoding='utf-8') as h: if h.read() == string: return # same as last time - don't write except UnicodeError: logger.exception("UnicodeError reading old ship loadout with utf-8 encoding, trying without...") try: - with open(join(config.get_str('outdir'), oldfiles[-1])) as h: + with (pathlib.Path(config.get_str('outdir')) / oldfiles[-1]).open(encoding='utf-8') as h: if h.read() == string: return # same as last time - don't write @@ -2337,7 +2340,7 @@ def export_ship(self, filename=None) -> None: # noqa: C901, CCR001 # Write ts = strftime('%Y-%m-%dT%H.%M.%S', localtime(time())) - filename = join(config.get_str('outdir'), f'{ship}.{ts}.txt') + filename = str(pathlib.Path(config.get_str('outdir')) / f'{ship}.{ts}.txt') try: with open(filename, 'w', encoding='utf-8') as h: @@ -2424,7 +2427,7 @@ def _parse_navroute_file(self) -> dict[str, Any] | None: try: - with open(join(self.currentdir, 'NavRoute.json')) as f: + with (pathlib.Path(self.currentdir) / 'NavRoute.json').open() as f: raw = f.read() except Exception as e: @@ -2450,7 +2453,7 @@ def _parse_fcmaterials_file(self) -> dict[str, Any] | None: try: - with open(join(self.currentdir, 'FCMaterials.json')) as f: + with (pathlib.Path(self.currentdir) / 'FCMaterials.json').open() as f: raw = f.read() except Exception as e: diff --git a/plug.py b/plug.py index ae9600c4b6..5f73484656 100644 --- a/plug.py +++ b/plug.py @@ -11,7 +11,6 @@ import importlib.util import logging import operator -import os import sys from dataclasses import dataclass, field import tkinter as tk @@ -204,7 +203,9 @@ def load_plugins(master: tk.Tk) -> None: def _load_internal_plugins(): internal = [] - for name in sorted(os.listdir(config.internal_plugin_dir_path)): + plugin_dir = Path(config.internal_plugin_dir_path) + for plugin_path in sorted(plugin_dir.iterdir(), key=lambda p: p.name): + name = plugin_path.name if name.endswith('.py') and name[0] not in ('.', '_'): try: plugin_name = name[:-3] diff --git a/plugin_browser.py b/plugin_browser.py index cfa3e122aa..acaafc6f35 100644 --- a/plugin_browser.py +++ b/plugin_browser.py @@ -7,11 +7,11 @@ """ from __future__ import annotations -import os import json import sys import webbrowser import tkinter as tk +import pathlib from tkinter import ttk import tkinter.font as tkfont from PIL import Image, ImageTk @@ -545,10 +545,8 @@ def _build_plugin_links(self, plugin: dict, start_row: int) -> None: def _update_plugin_icon(self, plugin_id: str, plugin: dict) -> None: icon_url = plugin.get("pluginIcon") icon_key = plugin_id if icon_url else "__fallback__" - url = icon_url or os.path.join( - os.path.dirname(sys.argv[0]), - "io.edcd.EDMarketConnector.png", - ) + script_dir = pathlib.Path(sys.argv[0]).parent + url = icon_url or str(script_dir / "io.edcd.EDMarketConnector.png") self.__load_plugin_icon(icon_key, url) def __clear_plugin_details(self) -> None: @@ -583,13 +581,12 @@ def __load_plugin_icon(self, plugin_id: str, url: str) -> None: try: # Open PIL image - pil_image = ( - Image.open(url).convert("RGBA") - if os.path.isfile(url) - else Image.open(BytesIO(requests.get(url, timeout=5).content)).convert( - "RGBA" - ) - ) + url_path = pathlib.Path(url) + if url_path.is_file(): + pil_image = Image.open(url_path).convert("RGBA") + else: + response = requests.get(url, timeout=5) + pil_image = Image.open(BytesIO(response.content)).convert("RGBA") # Resize while maintaining aspect ratio pil_image.thumbnail((64, 64), Image.Resampling.LANCZOS) diff --git a/prefs.py b/prefs.py index 747b982c98..c1ef2f445e 100644 --- a/prefs.py +++ b/prefs.py @@ -3,7 +3,6 @@ import logging import time -from os.path import join, normpath from pathlib import Path import subprocess import sys @@ -171,7 +170,7 @@ def get_localized_path(path: str, config_home: str) -> str: start_idx = len(config_home.split('\\')) if path.lower().startswith(config_home.lower()) else 0 # Split path into components - components = normpath(path).split('\\') + components = Path(path).parts display_components = [] # Process each path component @@ -1340,10 +1339,8 @@ def apply(self) -> None: # noqa: CCR001 _val = 'SEMICOLON' config.set('mkt_export_type', _val) - config.set( - 'outdir', - join(config.home_path, self.outdir.get()[2:]) if self.outdir.get().startswith('~') else self.outdir.get() - ) + out_path = Path(self.outdir.get()) + config.set('outdir', str(out_path.expanduser())) logdir = self.logdir.get() if config.default_journal_dir_path and logdir.lower() == config.default_journal_dir.lower(): @@ -1386,11 +1383,8 @@ def apply(self) -> None: # noqa: CCR001 config.set('dark_highlight', self.theme_colors[1]) theme.apply(self.parent) if self.plugdir.get() != config.get_str('plugin_dir'): - config.set( - 'plugin_dir', - join(config.home_path, self.plugdir.get()[2:]) if self.plugdir.get().startswith( - '~') else self.plugdir.get() - ) + plug_path = Path(self.plugdir.get()) + config.set('plugin_dir', str(plug_path.expanduser())) self.req_restart = True # Notify diff --git a/tests/journal_lock.py/test_journal_lock.py b/tests/journal_lock.py/test_journal_lock.py index aac1e49b97..c4202e7ece 100644 --- a/tests/journal_lock.py/test_journal_lock.py +++ b/tests/journal_lock.py/test_journal_lock.py @@ -5,7 +5,6 @@ from __future__ import annotations import multiprocessing as mp -import os import pathlib from collections.abc import Generator import pytest @@ -43,11 +42,7 @@ def other_process_lock(continue_q: mp.Queue, exit_q: mp.Queue, lockfile: pathlib except Exception as e: print(f"sub-process: Failed execution handle context: {e!r}") finally: - try: - if lock_path.exists(): - os.unlink(lock_path) - except Exception: - pass + lock_path.unlink(missing_ok=True) def _obtain_lock_with_filelock(lockfile_path: pathlib.Path) -> bool: @@ -58,9 +53,7 @@ def _obtain_lock_with_filelock(lockfile_path: pathlib.Path) -> bool: lock.acquire(timeout=0) lock.release() return True - except Timeout: - return False - except Exception: + except (Timeout, Exception): return False @@ -115,11 +108,8 @@ def get_str(key: str, *, default: str | None = None) -> str: # Tests against JournalLock.__init__() def test_journal_lock_init(self, mock_journaldir: pathlib.Path): """Test JournalLock instantiation.""" - tmpdir = str(mock_journaldir) - jlock = JournalLock() - # Check members are properly initialised. - assert jlock.journal_dir == tmpdir + assert jlock.journal_dir == str(mock_journaldir) assert jlock.journal_dir_path is not None assert jlock.journal_dir_lockfile_name is None @@ -136,12 +126,8 @@ def test_path_from_journaldir_with_none(self): def test_path_from_journaldir_with_tmpdir(self, mock_journaldir: pathlib.Path): """Test JournalLock.set_path_from_journaldir() with tmpdir.""" - tmpdir = mock_journaldir - jlock = JournalLock() - - # Check that an actual journaldir is handled correctly. - jlock.journal_dir = str(tmpdir) + jlock.journal_dir = str(mock_journaldir) jlock.set_path_from_journaldir() assert isinstance(jlock.journal_dir_path, pathlib.Path) @@ -154,7 +140,6 @@ def test_obtain_lock_with_none(self): # Check that 'None' is handled correctly. jlock.journal_dir = None jlock.set_path_from_journaldir() - assert jlock.journal_dir_path is None locked = jlock.obtain_lock() assert locked == JournalLockResult.JOURNALDIR_IS_NONE @@ -169,8 +154,8 @@ def test_obtain_lock_with_tmpdir(self, mock_journaldir: pathlib.Path): # Cleanup, to avoid side-effect on other tests assert jlock.release_lock() - if jlock.journal_dir_lockfile_name and jlock.journal_dir_lockfile_name.exists(): - os.unlink(str(jlock.journal_dir_lockfile_name)) + if jlock.journal_dir_lockfile_name: + jlock.journal_dir_lockfile_name.unlink(missing_ok=True) def test_obtain_lock_already_locked(self, mock_journaldir: pathlib.Path): """Test JournalLock.obtain_lock() when already locked by another process.""" @@ -191,8 +176,7 @@ def test_obtain_lock_already_locked(self, mock_journaldir: pathlib.Path): print("Attempt actual lock test...") # Now attempt to lock with to-test code jlock = JournalLock() - second_attempt = jlock.obtain_lock() - assert second_attempt == JournalLockResult.ALREADY_LOCKED + assert jlock.obtain_lock() == JournalLockResult.ALREADY_LOCKED finally: print("Telling sub-process to quit...") exit_q.put("quit") @@ -215,10 +199,7 @@ def test_release_lock(self, mock_journaldir: pathlib.Path): # Check it actually IS unlocked using the filelock backend engine lock_file_path = mock_journaldir / "edmc-journal-lock.txt" assert _obtain_lock_with_filelock(lock_file_path) - - # Cleanup, to avoid side-effect on other tests - if jlock.journal_dir_lockfile_name and jlock.journal_dir_lockfile_name.exists(): - os.unlink(str(jlock.journal_dir_lockfile_name)) + jlock.journal_dir_lockfile_name.unlink(missing_ok=True) def test_release_lock_not_locked(self, mock_journaldir: pathlib.Path): """Test JournalLock.release_lock() when not locked.""" @@ -259,21 +240,15 @@ def test_update_lock(self, mock_journaldir_changing: pathlib.Path): jlock.obtain_lock() assert jlock.locked - # Now store the 'current' journaldir for reference and attempt - # to update to a new one. - old_journaldir = jlock.journal_dir - old_journaldir_lockfile_name = jlock.journal_dir_lockfile_name + old_lockfile = jlock.journal_dir_lockfile_name jlock.update_lock(None) # type: ignore - assert jlock.journal_dir != old_journaldir assert jlock.locked # Cleanup, to avoid side-effect on other tests assert jlock.release_lock() - if jlock.journal_dir_lockfile_name and jlock.journal_dir_lockfile_name.exists(): - os.unlink(str(jlock.journal_dir_lockfile_name)) - # And the old_journaldir's lockfile too - if old_journaldir_lockfile_name and old_journaldir_lockfile_name.exists(): - os.unlink(str(old_journaldir_lockfile_name)) + jlock.journal_dir_lockfile_name.unlink(missing_ok=True) + if old_lockfile: + old_lockfile.unlink(missing_ok=True) def test_update_lock_same(self, mock_journaldir: pathlib.Path): """ @@ -282,16 +257,11 @@ def test_update_lock_same(self, mock_journaldir: pathlib.Path): # First actually obtain the lock, and check it worked jlock = JournalLock() assert jlock.obtain_lock() == JournalLockResult.LOCKED - assert jlock.locked - # Now store the 'current' journaldir for reference and attempt - # to update to a new one. - old_journaldir = jlock.journal_dir + old_dir = jlock.journal_dir jlock.update_lock(None) # type: ignore - assert jlock.journal_dir == old_journaldir - assert jlock.locked + assert jlock.journal_dir == old_dir # Cleanup, to avoid side-effect on other tests assert jlock.release_lock() - if jlock.journal_dir_lockfile_name and jlock.journal_dir_lockfile_name.exists(): - os.unlink(str(jlock.journal_dir_lockfile_name)) + jlock.journal_dir_lockfile_name.unlink(missing_ok=True) diff --git a/util/text.py b/util/text.py index 7c940cf7e1..5646d722c1 100644 --- a/util/text.py +++ b/util/text.py @@ -1,5 +1,6 @@ """ text.py - Dealing with Text and Bytes. + Copyright (c) EDCD, All Rights Reserved Licensed under the GNU General Public License v2 or later. See LICENSE file. @@ -16,6 +17,7 @@ def gzip(data: str | bytes, max_size: int = 512, encoding='utf-8') -> tuple[bytes, bool]: """ Compress the given data if the max size is greater than specified. + The default was chosen somewhat arbitrarily, see eddn.py for some more careful work towards keeping the data almost always compressed :param data: The data to compress From f65e82ceec8cf84b9bf00987dfb865a2d0bfda54 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Sat, 6 Jun 2026 15:11:53 -0400 Subject: [PATCH 2/3] [#2602] Allow Typing in StringVars --- prefs.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/prefs.py b/prefs.py index c1ef2f445e..c5c9b43dba 100644 --- a/prefs.py +++ b/prefs.py @@ -3,6 +3,7 @@ import logging import time +import os from pathlib import Path import subprocess import sys @@ -374,13 +375,17 @@ def __setup_output_tab(self, root_notebook: ttk.Notebook) -> None: self.out_auto_button.grid(columnspan=2, padx=self.BUTTONX, pady=self.PADY, sticky=tk.W, row=next(row)) self.outdir = tk.StringVar() - self.outdir.set(str(config.get_str('outdir'))) + outdir_saved = str(config.get_str('outdir') or '') + self.outdir.set(self.resolve_path_str(outdir_saved)) + self.attach_path_resolver(self.outdir) + # LANG: Settings > Output - Label for "where files are located" - self.outdir_label = nb.Label(output_frame, text=tr.tl('File location')+':') # Section heading in settings + self.outdir_label = nb.Label(output_frame, text=tr.tl('File location') + ':') # Section heading in settings # Type ignored due to incorrect type annotation. a 2 tuple does padding for each side self.outdir_label.grid(padx=self.PADX, pady=self.PADY, sticky=tk.W, row=next(row)) # type: ignore - self.outdir_entry = ttk.Entry(output_frame, takefocus=False) + # Linked to self.outdir via textvariable and set takefocus=True to allow direct typing + self.outdir_entry = ttk.Entry(output_frame, textvariable=self.outdir, takefocus=True) self.outdir_entry.grid(columnspan=2, padx=self.PADX, pady=self.BOXY, sticky=tk.EW, row=next(row)) text = tr.tl('Browse...') # LANG: NOT-macOS Settings - files location selection button @@ -415,14 +420,17 @@ def __setup_config_tab(self, notebook: ttk.Notebook) -> None: # noqa: CCR001 if logdir is None or logdir == '': logdir = default - self.logdir.set(logdir) - self.logdir_entry = ttk.Entry(config_frame, takefocus=False) + self.logdir.set(self.resolve_path_str(logdir)) + self.attach_path_resolver(self.logdir) + + # takefocus=True allows for typing + self.logdir_entry = ttk.Entry(config_frame, textvariable=self.logdir, takefocus=True) # Location of the Journal files nb.Label( config_frame, # LANG: Settings > Configuration - Label for Journal files location - text=tr.tl('E:D journal file location')+':' + text=tr.tl('E:D journal file location') + ':' ).grid(columnspan=4, padx=self.PADX, pady=self.PADY, sticky=tk.W, row=next(row)) self.logdir_entry.grid(columnspan=4, padx=self.PADX, pady=self.BOXY, sticky=tk.EW, row=next(row)) @@ -1160,9 +1168,6 @@ def tabchanged(self, event: tk.Event) -> None: def outvarchanged(self, event: tk.Event | None = None) -> None: """Handle Output tab variable changes.""" - self.displaypath(self.outdir, self.outdir_entry) - self.displaypath(self.logdir, self.logdir_entry) - self.out_label['state'] = tk.NORMAL self.out_csv_button['state'] = tk.NORMAL self.out_td_button['state'] = tk.NORMAL @@ -1412,3 +1417,43 @@ def _destroy(self) -> None: self.parent.wm_attributes('-topmost', config.get_bool('always_ontop')) self.destroy() + + def attach_path_resolver(self, string_var: tk.StringVar) -> None: + """Establish Path resolution for Tkinter StringVars. + + Args: + string_var (tk.StringVar): The Tkinter string variable to monitor and resolve. + """ + trace_id = [""] + + def on_path_change(*args: Any) -> None: + """Internal trace callback.""" + if trace_id[0]: + string_var.trace_remove("write", trace_id[0]) + current_val: str = string_var.get() + resolved_val: str = self.resolve_path_str(current_val) + if current_val != resolved_val: + string_var.set(resolved_val) + trace_id[0] = string_var.trace_add("write", on_path_change) + + trace_id[0] = string_var.trace_add("write", on_path_change) + + @staticmethod + def resolve_path_str(path_str: str) -> str: + """Expand ENV vars and Linux shortcuts (~) to abs paths. + + Args: + path_str (str): The input by the user or configuration file. + + Returns: + str: A resolved filesystem path. + """ + if not path_str: + return '' + try: + if '%' in path_str or '$' in path_str: + for key, value in os.environ.items(): + path_str = path_str.replace(f'%{key}%', value).replace(f'${key}', value) + return str(Path(path_str).expanduser().resolve()) + except Exception: + return path_str From decd9e3c6a12cd65fcc0fe006c6e9667739254b4 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Sat, 6 Jun 2026 15:53:48 -0400 Subject: [PATCH 3/3] Remove oldstyle union typings --- companion.py | 4 ++-- edshipyard.py | 3 +-- killswitch.py | 4 ++-- plugins/inara.py | 4 ++-- prefs.py | 15 +++++---------- 5 files changed, 12 insertions(+), 18 deletions(-) diff --git a/companion.py b/companion.py index 77196297af..e51ee2756c 100644 --- a/companion.py +++ b/companion.py @@ -30,7 +30,7 @@ from enum import StrEnum from pathlib import Path from queue import Queue -from typing import TYPE_CHECKING, Any, TypeVar, Union, Iterator +from typing import TYPE_CHECKING, Any, TypeVar, Iterator from collections.abc import Mapping from dataclasses import dataclass from requests.adapters import HTTPAdapter @@ -73,7 +73,7 @@ class CAPIData(UserDict): def __init__( self, - data: Union[str, dict[str, Any], 'CAPIData', None] = None, + data: "str | dict[str, Any] | 'CAPIData' | None" = None, source_host: str | None = None, source_endpoint: str | None = None, request_cmdr: str | None = None diff --git a/edshipyard.py b/edshipyard.py index cc982f85bb..fc88ed9aaa 100644 --- a/edshipyard.py +++ b/edshipyard.py @@ -6,7 +6,6 @@ import re import time from collections import defaultdict -from typing import Union from update import check_for_datafile_updates import outfitting import util_ships @@ -17,7 +16,7 @@ logger = get_main_logger() -__Module = dict[str, Union[str, list[str]]] # Have to keep old-style here for compatibility +__Module = dict[str, str | list[str]] # Map API ship names to ED Shipyard names ship_map = ship_name_map.copy() diff --git a/killswitch.py b/killswitch.py index af50a8c19d..af730db055 100644 --- a/killswitch.py +++ b/killswitch.py @@ -12,7 +12,7 @@ from copy import deepcopy from typing import ( TYPE_CHECKING, Any, NamedTuple, - TypedDict, TypeVar, cast, Union + TypedDict, TypeVar, cast ) from collections.abc import Callable, Mapping, MutableMapping, MutableSequence, Sequence import requests @@ -26,7 +26,7 @@ OLD_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json' DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches_v2.json' CURRENT_KILLSWITCH_VERSION = 2 -UPDATABLE_DATA = Union[Mapping, Sequence] # Have to keep old-style +UPDATABLE_DATA = Mapping | Sequence _current_version: semantic_version.Version = config.appversion_nobuild() T = TypeVar('T', bound=UPDATABLE_DATA) diff --git a/plugins/inara.py b/plugins/inara.py index cdd316fecb..5b37bc4bda 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -31,7 +31,7 @@ from operator import itemgetter from threading import Lock, Thread from tkinter import ttk -from typing import Any, Deque, NamedTuple, cast, Union +from typing import Any, Deque, NamedTuple, cast from collections.abc import Callable, Mapping, Sequence import requests import edmc_data @@ -67,7 +67,7 @@ class Credentials(NamedTuple): api_key: str -EVENT_DATA = Union[Mapping[str, Any], Sequence[Mapping[str, Any]]] +EVENT_DATA = Mapping[str, Any] | Sequence[Mapping[str, Any]] @dataclass diff --git a/prefs.py b/prefs.py index c5c9b43dba..431eeb5afc 100644 --- a/prefs.py +++ b/prefs.py @@ -1419,15 +1419,17 @@ def _destroy(self) -> None: self.destroy() def attach_path_resolver(self, string_var: tk.StringVar) -> None: - """Establish Path resolution for Tkinter StringVars. + """ + Establish Path resolution for Tkinter StringVars. + ___ Args: string_var (tk.StringVar): The Tkinter string variable to monitor and resolve. """ trace_id = [""] def on_path_change(*args: Any) -> None: - """Internal trace callback.""" + """Handle the internal trace callback.""" if trace_id[0]: string_var.trace_remove("write", trace_id[0]) current_val: str = string_var.get() @@ -1440,14 +1442,7 @@ def on_path_change(*args: Any) -> None: @staticmethod def resolve_path_str(path_str: str) -> str: - """Expand ENV vars and Linux shortcuts (~) to abs paths. - - Args: - path_str (str): The input by the user or configuration file. - - Returns: - str: A resolved filesystem path. - """ + """Expand ENV vars and Linux shortcuts (~) to abs paths.""" if not path_str: return '' try: