From 30b9ce7891f3be806bf4d08cb4462e35efe42019 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Sun, 5 Apr 2026 16:32:02 -0700 Subject: [PATCH 01/10] esim: harden AtClient with retry loops, reconnect, and lazy serial - AtClient: lazy serial open (deferred to first query), store device/baud for reconnect, add _reconnect_serial for recovery from serial exceptions - open_isdr: extract _open_isdr_once, wrap in 10-attempt retry loop with lte.sh modem reset at attempt 3 for stuck SIM (CME ERROR 13) - send_apdu: add retry loop with max_retries=3, auto-reopen channel on failure with backoff sleep - query: catch SerialException and reconnect before retry - close: wrap CCHC in try/except to handle already-closed channels - Update DEFAULT_DEVICE to /dev/modem_at0, enable DEBUG by default - base.py: add process_notifications() default, is_comma_profile uses provider field instead of ICCID prefix - esim.py: remove post-mutation modem reboot (now handled by LPA internally) Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/base.py | 6 +- system/hardware/esim.py | 9 --- system/hardware/tici/lpa.py | 111 +++++++++++++++++++++++++++--------- 3 files changed, 88 insertions(+), 38 deletions(-) diff --git a/system/hardware/base.py b/system/hardware/base.py index 1a19f908c62971..2f17115bd61407 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -91,8 +91,12 @@ def nickname_profile(self, iccid: str, nickname: str) -> None: def switch_profile(self, iccid: str) -> None: pass + def process_notifications(self) -> None: + pass + def is_comma_profile(self, iccid: str) -> bool: - return any(iccid.startswith(prefix) for prefix in ('8985235',)) + profiles = self.list_profiles() + return any(p.iccid == iccid and p.provider == 'Webbing' for p in profiles) class HardwareBase(ABC): @staticmethod diff --git a/system/hardware/esim.py b/system/hardware/esim.py index 9b7d4f9ec0b086..40600d26b565f5 100755 --- a/system/hardware/esim.py +++ b/system/hardware/esim.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import argparse -import time from openpilot.system.hardware import HARDWARE @@ -13,16 +12,13 @@ parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile') args = parser.parse_args() - mutated = False lpa = HARDWARE.get_sim_lpa() if args.switch: lpa.switch_profile(args.switch) - mutated = True elif args.delete: confirm = input('are you sure you want to delete this profile? (y/N) ') if confirm == 'y': lpa.delete_profile(args.delete) - mutated = True else: print('cancelled') exit(0) @@ -33,11 +29,6 @@ else: parser.print_help() - if mutated: - HARDWARE.reboot_modem() - # eUICC needs a small delay post-reboot before querying profiles - time.sleep(.5) - profiles = lpa.list_profiles() print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:') for p in profiles: diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index 2e7e6a0ba97366..8f9cb8c6474f4f 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -5,14 +5,17 @@ import math import os import serial +import subprocess import sys +import termios +import time from collections.abc import Generator from openpilot.system.hardware.base import LPABase, Profile -DEFAULT_DEVICE = "/dev/ttyUSB2" +DEFAULT_DEVICE = "/dev/modem_at0" DEFAULT_BAUD = 9600 DEFAULT_TIMEOUT = 5.0 # https://euicc-manual.osmocom.org/docs/lpa/applet-id/ @@ -20,7 +23,7 @@ MM = "org.freedesktop.ModemManager1" MM_MODEM = MM + ".Modem" ES10X_MSS = 120 -DEBUG = os.environ.get("DEBUG") == "1" +DEBUG = True # TLV Tags TAG_ICCID = 0x5A @@ -39,18 +42,22 @@ class AtClient: def __init__(self, device: str, baud: int, timeout: float, debug: bool) -> None: self.debug = debug self.channel: str | None = None + self._device = device + self._baud = baud self._timeout = timeout self._serial: serial.Serial | None = None - try: - self._serial = serial.Serial(device, baudrate=baud, timeout=timeout) - self._serial.reset_input_buffer() - except (serial.SerialException, PermissionError, OSError): - pass + self._use_dbus = not os.path.exists(device) + if self.debug: + transport = "DBUS" if self._use_dbus else "serial" + print(f"AtClient: using {transport} transport", file=sys.stderr) def close(self) -> None: try: if self.channel: - self.query(f"AT+CCHC={self.channel}") + try: + self.query(f"AT+CCHC={self.channel}") + except (RuntimeError, TimeoutError): + pass self.channel = None finally: if self._serial: @@ -78,6 +85,15 @@ def _expect(self) -> list[str]: raise RuntimeError(f"AT command failed: {line}") lines.append(line) + def _reconnect_serial(self) -> None: + self.channel = None + try: + if self._serial: + self._serial.close() + except Exception: + pass + self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) + def _get_modem(self): import dbus bus = dbus.SystemBus() @@ -100,35 +116,74 @@ def _dbus_query(self, cmd: str) -> list[str]: return lines def query(self, cmd: str) -> list[str]: - if self._serial: + if self._use_dbus: + return self._dbus_query(cmd) + if not self._serial: + self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) + try: + self._send(cmd) + return self._expect() + except serial.SerialException: + self._reconnect_serial() self._send(cmd) return self._expect() - return self._dbus_query(cmd) - def open_isdr(self) -> None: - # close any stale logical channel from a previous crashed session - try: - self.query("AT+CCHC=1") - except RuntimeError: - pass + def _open_isdr_once(self) -> None: + if self.channel: + try: + self.query(f"AT+CCHC={self.channel}") + except RuntimeError: + pass + self.channel = None + # drain any unsolicited responses before opening + if self._serial and not self._use_dbus: + try: + self._serial.reset_input_buffer() + except (OSError, serial.SerialException, termios.error): + self._reconnect_serial() for line in self.query(f'AT+CCHO="{ISDR_AID}"'): if line.startswith("+CCHO:") and (ch := line.split(":", 1)[1].strip()): self.channel = ch return raise RuntimeError("Failed to open ISD-R application") - def send_apdu(self, apdu: bytes) -> tuple[bytes, int, int]: - if not self.channel: - raise RuntimeError("Logical channel is not open") - hex_payload = apdu.hex().upper() - for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'): - if line.startswith("+CGLA:"): - parts = line.split(":", 1)[1].split(",", 1) - if len(parts) == 2: - data = bytes.fromhex(parts[1].strip().strip('"')) - if len(data) >= 2: - return data[:-2], data[-2], data[-1] - raise RuntimeError("Missing +CGLA response") + def open_isdr(self) -> None: + for attempt in range(10): + try: + self._open_isdr_once() + return + except (RuntimeError, TimeoutError, termios.error) as e: + if self.debug: + print(f"open_isdr failed, trying again", file=sys.stderr) + if attempt == 3: + # SIM may be stuck (CME ERROR 13) — reset modem via lte.sh + subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True) + time.sleep(5) + self._reconnect_serial() + else: + time.sleep(2.0) + raise RuntimeError("Failed to open ISD-R after retries") + + def send_apdu(self, apdu: bytes, max_retries: int = 3) -> tuple[bytes, int, int]: + for attempt in range(max_retries): + try: + if not self.channel: + self.open_isdr() + hex_payload = apdu.hex().upper() + for line in self.query(f'AT+CGLA={self.channel},{len(hex_payload)},"{hex_payload}"'): + if line.startswith("+CGLA:"): + parts = line.split(":", 1)[1].split(",", 1) + if len(parts) == 2: + data = bytes.fromhex(parts[1].strip().strip('"')) + if len(data) >= 2: + return data[:-2], data[-2], data[-1] + raise RuntimeError("Missing +CGLA response") + except (RuntimeError, ValueError): + self.channel = None + if attempt == max_retries - 1: + raise + time.sleep(1 + attempt) + raise RuntimeError("send_apdu failed") # --- TLV utilities --- From b0c9a74c22b86cb7e90a83b8c29438a0de7cf17e Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Sun, 5 Apr 2026 22:38:10 -0700 Subject: [PATCH 02/10] esim: restore DEBUG to env-based toggle Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/tici/lpa.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index 8f9cb8c6474f4f..bf00053d4354b9 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -23,7 +23,7 @@ MM = "org.freedesktop.ModemManager1" MM_MODEM = MM + ".Modem" ES10X_MSS = 120 -DEBUG = True +DEBUG = os.environ.get("DEBUG") == "1" # TLV Tags TAG_ICCID = 0x5A From fb87a27d06945d49d7b6c3aed0465421ddec876b Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Sun, 5 Apr 2026 22:43:23 -0700 Subject: [PATCH 03/10] esim: fix lint errors, remove debug log from AtClient init Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/tici/lpa.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index bf00053d4354b9..aeca3b297da52b 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -47,9 +47,6 @@ def __init__(self, device: str, baud: int, timeout: float, debug: bool) -> None: self._timeout = timeout self._serial: serial.Serial | None = None self._use_dbus = not os.path.exists(device) - if self.debug: - transport = "DBUS" if self._use_dbus else "serial" - print(f"AtClient: using {transport} transport", file=sys.stderr) def close(self) -> None: try: @@ -152,9 +149,9 @@ def open_isdr(self) -> None: try: self._open_isdr_once() return - except (RuntimeError, TimeoutError, termios.error) as e: + except (RuntimeError, TimeoutError, termios.error): if self.debug: - print(f"open_isdr failed, trying again", file=sys.stderr) + print("open_isdr failed, trying again", file=sys.stderr) if attempt == 3: # SIM may be stuck (CME ERROR 13) — reset modem via lte.sh subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True) From 421e89a1758447eaaf00003ab25fa6d5833ea0e3 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Sun, 5 Apr 2026 22:46:16 -0700 Subject: [PATCH 04/10] esim: use cloudlog for operational logging, keep print only for AT debug - Replace debug print in open_isdr retry with cloudlog.warning - Remove debug param from AtClient, use module-level DEBUG for AT output - AT wire-level prints (SER >>/<<, DBUS >>/<<) stay as print() gated by DEBUG Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/tici/lpa.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index aeca3b297da52b..c1e52c92a2e066 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -12,6 +12,7 @@ from collections.abc import Generator +from openpilot.common.swaglog import cloudlog from openpilot.system.hardware.base import LPABase, Profile @@ -39,8 +40,7 @@ def b64e(data: bytes) -> str: class AtClient: - def __init__(self, device: str, baud: int, timeout: float, debug: bool) -> None: - self.debug = debug + def __init__(self, device: str, baud: int, timeout: float) -> None: self.channel: str | None = None self._device = device self._baud = baud @@ -61,7 +61,7 @@ def close(self) -> None: self._serial.close() def _send(self, cmd: str) -> None: - if self.debug: + if DEBUG: print(f"SER >> {cmd}", file=sys.stderr) self._serial.write((cmd + "\r").encode("ascii")) @@ -74,7 +74,7 @@ def _expect(self) -> list[str]: line = raw.decode(errors="ignore").strip() if not line: continue - if self.debug: + if DEBUG: print(f"SER << {line}", file=sys.stderr) if line == "OK": return lines @@ -100,14 +100,14 @@ def _get_modem(self): return bus.get_object(MM, modem_path) def _dbus_query(self, cmd: str) -> list[str]: - if self.debug: + if DEBUG: print(f"DBUS >> {cmd}", file=sys.stderr) try: result = str(self._get_modem().Command(cmd, math.ceil(self._timeout), dbus_interface=MM_MODEM, timeout=self._timeout)) except Exception as e: raise RuntimeError(f"AT command failed: {e}") from e lines = [line.strip() for line in result.splitlines() if line.strip()] - if self.debug: + if DEBUG: for line in lines: print(f"DBUS << {line}", file=sys.stderr) return lines @@ -150,8 +150,7 @@ def open_isdr(self) -> None: self._open_isdr_once() return except (RuntimeError, TimeoutError, termios.error): - if self.debug: - print("open_isdr failed, trying again", file=sys.stderr) + cloudlog.warning("open_isdr attempt %d failed, retrying", attempt + 1) if attempt == 3: # SIM may be stuck (CME ERROR 13) — reset modem via lte.sh subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True) @@ -302,7 +301,7 @@ def __new__(cls): def __init__(self): if hasattr(self, '_client'): return - self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT, debug=DEBUG) + self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT) self._client.open_isdr() atexit.register(self._client.close) From 2055fe22ab74c0ca32a9debc07b114282ee20cde Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 6 Apr 2026 06:49:24 -0700 Subject: [PATCH 05/10] =?UTF-8?q?esim:=20fix=20circular=20import=20?= =?UTF-8?q?=E2=80=94=20lazy-import=20cloudlog=20in=20open=5Fisdr?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/tici/lpa.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index c1e52c92a2e066..9d68b7dca6db37 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -12,7 +12,6 @@ from collections.abc import Generator -from openpilot.common.swaglog import cloudlog from openpilot.system.hardware.base import LPABase, Profile @@ -150,6 +149,7 @@ def open_isdr(self) -> None: self._open_isdr_once() return except (RuntimeError, TimeoutError, termios.error): + from openpilot.common.swaglog import cloudlog cloudlog.warning("open_isdr attempt %d failed, retrying", attempt + 1) if attempt == 3: # SIM may be stuck (CME ERROR 13) — reset modem via lte.sh From 6ef6ed0af9e1ac405e7f3c64bdc97321088f42c8 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 6 Apr 2026 07:27:59 -0700 Subject: [PATCH 06/10] esim: remove open_isdr from TiciLPA init Channel is opened on demand in send_apdu when needed. Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/tici/lpa.py | 1 - 1 file changed, 1 deletion(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index 9d68b7dca6db37..ebc4babc233bcc 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -302,7 +302,6 @@ def __init__(self): if hasattr(self, '_client'): return self._client = AtClient(DEFAULT_DEVICE, DEFAULT_BAUD, DEFAULT_TIMEOUT) - self._client.open_isdr() atexit.register(self._client.close) def list_profiles(self) -> list[Profile]: From 7002a85dc83dfb8a62bb1734aff9be5961eddba0 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 6 Apr 2026 07:29:23 -0700 Subject: [PATCH 07/10] esim: unify serial connection into _ensure_serial Single method for opening the serial port, used by both query (lazy open) and _reconnect_serial (close + reopen). Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/tici/lpa.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index ebc4babc233bcc..f577b416ca7f47 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -81,6 +81,10 @@ def _expect(self) -> list[str]: raise RuntimeError(f"AT command failed: {line}") lines.append(line) + def _ensure_serial(self) -> None: + if self._serial is None: + self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) + def _reconnect_serial(self) -> None: self.channel = None try: @@ -88,7 +92,8 @@ def _reconnect_serial(self) -> None: self._serial.close() except Exception: pass - self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) + self._serial = None + self._ensure_serial() def _get_modem(self): import dbus @@ -114,8 +119,7 @@ def _dbus_query(self, cmd: str) -> list[str]: def query(self, cmd: str) -> list[str]: if self._use_dbus: return self._dbus_query(cmd) - if not self._serial: - self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) + self._ensure_serial() try: self._send(cmd) return self._expect() From 7f1018f1af31040f2fff6466c87d6e0b39bb5047 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 6 Apr 2026 07:30:45 -0700 Subject: [PATCH 08/10] esim: unify _ensure_serial and _reconnect_serial Single method with reconnect flag: _ensure_serial() for lazy open, _ensure_serial(reconnect=True) to close and reopen. Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/tici/lpa.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index f577b416ca7f47..5cd2d0cebf634d 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -81,20 +81,18 @@ def _expect(self) -> list[str]: raise RuntimeError(f"AT command failed: {line}") lines.append(line) - def _ensure_serial(self) -> None: + def _ensure_serial(self, reconnect: bool = False) -> None: + if reconnect: + self.channel = None + try: + if self._serial: + self._serial.close() + except Exception: + pass + self._serial = None if self._serial is None: self._serial = serial.Serial(self._device, baudrate=self._baud, timeout=self._timeout) - def _reconnect_serial(self) -> None: - self.channel = None - try: - if self._serial: - self._serial.close() - except Exception: - pass - self._serial = None - self._ensure_serial() - def _get_modem(self): import dbus bus = dbus.SystemBus() @@ -124,7 +122,7 @@ def query(self, cmd: str) -> list[str]: self._send(cmd) return self._expect() except serial.SerialException: - self._reconnect_serial() + self._ensure_serial(reconnect=True) self._send(cmd) return self._expect() @@ -140,7 +138,7 @@ def _open_isdr_once(self) -> None: try: self._serial.reset_input_buffer() except (OSError, serial.SerialException, termios.error): - self._reconnect_serial() + self._ensure_serial(reconnect=True) for line in self.query(f'AT+CCHO="{ISDR_AID}"'): if line.startswith("+CCHO:") and (ch := line.split(":", 1)[1].strip()): self.channel = ch @@ -159,7 +157,7 @@ def open_isdr(self) -> None: # SIM may be stuck (CME ERROR 13) — reset modem via lte.sh subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True) time.sleep(5) - self._reconnect_serial() + self._ensure_serial(reconnect=True) else: time.sleep(2.0) raise RuntimeError("Failed to open ISD-R after retries") From a7e21f4c6c00390c92ddf8df08834665fe1c9448 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 6 Apr 2026 07:33:47 -0700 Subject: [PATCH 09/10] esim: move base.py changes to separate PR Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/base.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/system/hardware/base.py b/system/hardware/base.py index 2f17115bd61407..1a19f908c62971 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -91,12 +91,8 @@ def nickname_profile(self, iccid: str, nickname: str) -> None: def switch_profile(self, iccid: str) -> None: pass - def process_notifications(self) -> None: - pass - def is_comma_profile(self, iccid: str) -> bool: - profiles = self.list_profiles() - return any(p.iccid == iccid and p.provider == 'Webbing' for p in profiles) + return any(iccid.startswith(prefix) for prefix in ('8985235',)) class HardwareBase(ABC): @staticmethod From 2f39a0d43995697f59b82d05a3bf8600f2fc2e5f Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 6 Apr 2026 21:10:38 -0700 Subject: [PATCH 10/10] esim: remove unnecessary sleeps from AtClient retry loops lte.sh already blocks until the modem is online, so no post-reset sleep needed. open_isdr and send_apdu retry loops recover without delays. Co-Authored-By: Claude Opus 4.6 (1M context) --- system/hardware/tici/lpa.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py index 5cd2d0cebf634d..93cbca2479a545 100644 --- a/system/hardware/tici/lpa.py +++ b/system/hardware/tici/lpa.py @@ -23,6 +23,10 @@ MM = "org.freedesktop.ModemManager1" MM_MODEM = MM + ".Modem" ES10X_MSS = 120 +OPEN_ISDR_RETRIES = 10 +OPEN_ISDR_RETRY_DELAY_S = 0.25 +OPEN_ISDR_RESET_ATTEMPT = 5 +SEND_APDU_RETRIES = 3 DEBUG = os.environ.get("DEBUG") == "1" # TLV Tags @@ -146,24 +150,20 @@ def _open_isdr_once(self) -> None: raise RuntimeError("Failed to open ISD-R application") def open_isdr(self) -> None: - for attempt in range(10): + for attempt in range(OPEN_ISDR_RETRIES): try: self._open_isdr_once() return - except (RuntimeError, TimeoutError, termios.error): - from openpilot.common.swaglog import cloudlog - cloudlog.warning("open_isdr attempt %d failed, retrying", attempt + 1) - if attempt == 3: - # SIM may be stuck (CME ERROR 13) — reset modem via lte.sh + except (RuntimeError, TimeoutError, termios.error, serial.SerialException): + time.sleep(OPEN_ISDR_RETRY_DELAY_S) + if attempt == OPEN_ISDR_RESET_ATTEMPT: + # reset modem via lte.sh subprocess.run(['/usr/comma/lte/lte.sh', 'start'], capture_output=True) - time.sleep(5) - self._ensure_serial(reconnect=True) - else: - time.sleep(2.0) + self._serial = None # serial port will be re-opened on next attempt raise RuntimeError("Failed to open ISD-R after retries") - def send_apdu(self, apdu: bytes, max_retries: int = 3) -> tuple[bytes, int, int]: - for attempt in range(max_retries): + def send_apdu(self, apdu: bytes) -> tuple[bytes, int, int]: + for attempt in range(SEND_APDU_RETRIES): try: if not self.channel: self.open_isdr() @@ -178,9 +178,8 @@ def send_apdu(self, apdu: bytes, max_retries: int = 3) -> tuple[bytes, int, int] raise RuntimeError("Missing +CGLA response") except (RuntimeError, ValueError): self.channel = None - if attempt == max_retries - 1: + if attempt == SEND_APDU_RETRIES - 1: raise - time.sleep(1 + attempt) raise RuntimeError("send_apdu failed")